sstable: basic compaction function

This patch adds the basic compaction function sstables::compact_sstables,
which takes a list of input sstables, and creates several (currently one)
merged sstable. This implementation is pretty simple once we have all
the infrastructure in place (combining reader, writer, and a pipe between
them to reduce context switches).

This is already working compaction, but not quite complete: We'll need
to add compaction strategies (which sstables to compact, and when),
better cardinality estimator, sstable management and renaming, and a lot
of other details, and we'll probably still need to change the API.
But we can already write a test for compacting existing sstables (see
the next patch), and I wanted to get this patch out of the way, so we can
start working on applying compaction in a real use case.

Signed-off-by: Nadav Har'El <nyh@cloudius-systems.com>
This commit is contained in:
Nadav Har'El
2015-06-23 00:36:34 +03:00
committed by Avi Kivity
parent 6063d4502f
commit f26dae3bf9
4 changed files with 99 additions and 0 deletions

View File

@@ -386,6 +386,7 @@ urchin_core = (['database.cc',
'sstables/key.cc',
'sstables/partition.cc',
'sstables/filter.cc',
'sstables/compaction.cc',
'log.cc',
'transport/server.cc',
'cql3/abstract_marker.cc',

73
sstables/compaction.cc Normal file
View File

@@ -0,0 +1,73 @@
/*
* Copyright 2015 Cloudius Systems
*/
#include <vector>
#include "core/future-util.hh"
#include "core/pipe.hh"
#include "sstables.hh"
#include "compaction.hh"
#include "mutation_reader.hh"
namespace sstables {
// compact_sstables compacts the given list of sstables creating one
// (currently) or more (in the future) new sstables. The new sstables
// are created using the "sstable_creator" object passed by the caller.
future<> compact_sstables(std::vector<shared_sstable> sstables,
schema_ptr schema, sstable_creator& creator) {
std::vector<::mutation_reader> readers;
uint64_t estimated_parititions = 0;
for (auto sst : sstables) {
// We also capture the sstable, so we keep it alive while the read isn't done
readers.emplace_back([sst, r = make_lw_shared(sst->read_rows(schema))] () mutable { return r->read(); });
// FIXME: If the sstables have cardinality estimation bitmaps, use that
// for a better estimate for the number of partitions in the merged
// sstable than just adding up the lengths of individual sstables.
estimated_parititions += sst->get_estimated_key_count();
}
auto combined_reader = make_combined_reader(std::move(readers));
// We use a fixed-sized pipe between the producer fiber (which reads the
// individual sstables and merges them) and the consumer fiber (which
// only writes to the sstable). Things would have worked without this
// pipe (the writing fiber would have also performed the reads), but we
// prefer to do less work in the writer (which is a seastar::thread),
// and also want the extra buffer to ensure we do fewer context switches
// to that seastar::thread.
// TODO: better tuning for the size of the pipe. Perhaps should take into
// account the size of the individual mutations?
seastar::pipe<mutation> output{16};
auto output_reader = make_lw_shared<seastar::pipe_reader<mutation>>(std::move(output.reader));
auto output_writer = make_lw_shared<seastar::pipe_writer<mutation>>(std::move(output.writer));
auto done = make_lw_shared<bool>(false);
future<> read_done = do_until([done] { return *done; }, [done, output_writer, combined_reader = std::move(combined_reader)] {
return combined_reader().then([done = std::move(done), output_writer = std::move(output_writer)] (auto mopt) {
if (mopt) {
return output_writer->write(std::move(*mopt));
} else {
*done = true;
return make_ready_future<>();
}
});
});
::mutation_reader mutation_queue_reader = [output_reader] () {
return output_reader->read();
};
auto newtab = creator.new_tmp();
future<> write_done = newtab->write_components(
std::move(mutation_queue_reader), estimated_parititions, schema);
// Wait for both read_done and write_done fibers to finish.
// FIXME: if write_done throws an exception, we get a broken pipe
// exception on read_done, and then we don't handle write_done's
// exception, causing a warning message of "ignored exceptional future".
return read_done.then([write_done = std::move(write_done)] () mutable { return std::move(write_done); });
}
}

23
sstables/compaction.hh Normal file
View File

@@ -0,0 +1,23 @@
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*
*/
#pragma once
#include "sstables.hh"
namespace sstables {
class sstable_creator {
public:
// new_tmp() creates a new sstable, which is marked temporary
// until all temporary sstables are finalized with commit().
virtual shared_sstable new_tmp() = 0;
virtual void commit() = 0;
virtual ~sstable_creator() { };
};
future<> compact_sstables(std::vector<shared_sstable> sstables,
schema_ptr schema, sstable_creator& creator);
}

View File

@@ -315,4 +315,6 @@ public:
friend class test;
};
using shared_sstable = lw_shared_ptr<sstable>;
}