From f26dae3bf9f120041ff43ce944abf9ddda0d9155 Mon Sep 17 00:00:00 2001 From: Nadav Har'El Date: Tue, 23 Jun 2015 00:36:34 +0300 Subject: [PATCH] sstable: basic compaction function This patch adds the basic compaction function sstables::compact_sstables, which takes a list of input sstables, and creates several (currently one) merged sstable. This implementation is pretty simple once we have all the infrastructure in place (combining reader, writer, and a pipe between them to reduce context switches). This is already working compaction, but not quite complete: We'll need to add compaction strategies (which sstables to compact, and when), better cardinality estimator, sstable management and renaming, and a lot of other details, and we'll probably still need to change the API. But we can already write a test for compacting existing sstables (see the next patch), and I wanted to get this patch out of the way, so we can start working on applying compaction in a real use case. Signed-off-by: Nadav Har'El --- configure.py | 1 + sstables/compaction.cc | 73 ++++++++++++++++++++++++++++++++++++++++++ sstables/compaction.hh | 23 +++++++++++++ sstables/sstables.hh | 2 ++ 4 files changed, 99 insertions(+) create mode 100644 sstables/compaction.cc create mode 100644 sstables/compaction.hh diff --git a/configure.py b/configure.py index e3efe08a1b..13e0967c8f 100755 --- a/configure.py +++ b/configure.py @@ -386,6 +386,7 @@ urchin_core = (['database.cc', 'sstables/key.cc', 'sstables/partition.cc', 'sstables/filter.cc', + 'sstables/compaction.cc', 'log.cc', 'transport/server.cc', 'cql3/abstract_marker.cc', diff --git a/sstables/compaction.cc b/sstables/compaction.cc new file mode 100644 index 0000000000..4f4245f31f --- /dev/null +++ b/sstables/compaction.cc @@ -0,0 +1,73 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#include + +#include "core/future-util.hh" +#include "core/pipe.hh" + +#include "sstables.hh" +#include "compaction.hh" +#include "mutation_reader.hh" + +namespace sstables { + +// compact_sstables compacts the given list of sstables creating one +// (currently) or more (in the future) new sstables. The new sstables +// are created using the "sstable_creator" object passed by the caller. +future<> compact_sstables(std::vector sstables, + schema_ptr schema, sstable_creator& creator) { + std::vector<::mutation_reader> readers; + uint64_t estimated_parititions = 0; + + for (auto sst : sstables) { + // We also capture the sstable, so we keep it alive while the read isn't done + readers.emplace_back([sst, r = make_lw_shared(sst->read_rows(schema))] () mutable { return r->read(); }); + // FIXME: If the sstables have cardinality estimation bitmaps, use that + // for a better estimate for the number of partitions in the merged + // sstable than just adding up the lengths of individual sstables. + estimated_parititions += sst->get_estimated_key_count(); + } + auto combined_reader = make_combined_reader(std::move(readers)); + + // We use a fixed-sized pipe between the producer fiber (which reads the + // individual sstables and merges them) and the consumer fiber (which + // only writes to the sstable). Things would have worked without this + // pipe (the writing fiber would have also performed the reads), but we + // prefer to do less work in the writer (which is a seastar::thread), + // and also want the extra buffer to ensure we do fewer context switches + // to that seastar::thread. + // TODO: better tuning for the size of the pipe. Perhaps should take into + // account the size of the individual mutations? + seastar::pipe output{16}; + auto output_reader = make_lw_shared>(std::move(output.reader)); + auto output_writer = make_lw_shared>(std::move(output.writer)); + + auto done = make_lw_shared(false); + future<> read_done = do_until([done] { return *done; }, [done, output_writer, combined_reader = std::move(combined_reader)] { + return combined_reader().then([done = std::move(done), output_writer = std::move(output_writer)] (auto mopt) { + if (mopt) { + return output_writer->write(std::move(*mopt)); + } else { + *done = true; + return make_ready_future<>(); + } + }); + }); + + ::mutation_reader mutation_queue_reader = [output_reader] () { + return output_reader->read(); + }; + auto newtab = creator.new_tmp(); + future<> write_done = newtab->write_components( + std::move(mutation_queue_reader), estimated_parititions, schema); + + // Wait for both read_done and write_done fibers to finish. + // FIXME: if write_done throws an exception, we get a broken pipe + // exception on read_done, and then we don't handle write_done's + // exception, causing a warning message of "ignored exceptional future". + return read_done.then([write_done = std::move(write_done)] () mutable { return std::move(write_done); }); +} + +} diff --git a/sstables/compaction.hh b/sstables/compaction.hh new file mode 100644 index 0000000000..af5886e0e6 --- /dev/null +++ b/sstables/compaction.hh @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + * + */ + +#pragma once + +#include "sstables.hh" + +namespace sstables { + class sstable_creator { + public: + // new_tmp() creates a new sstable, which is marked temporary + // until all temporary sstables are finalized with commit(). + virtual shared_sstable new_tmp() = 0; + virtual void commit() = 0; + virtual ~sstable_creator() { }; + }; + + + future<> compact_sstables(std::vector sstables, + schema_ptr schema, sstable_creator& creator); +} diff --git a/sstables/sstables.hh b/sstables/sstables.hh index a989a8aa94..552ecdd49a 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -315,4 +315,6 @@ public: friend class test; }; +using shared_sstable = lw_shared_ptr; + }