/* * Copyright (C) 2014 Cloudius Systems, Ltd. */ /* * This file is part of Scylla. * * Scylla is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * Scylla is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with Scylla. If not, see . */ #include "log.hh" #include "database.hh" #include "unimplemented.hh" #include "core/future-util.hh" #include "db/commitlog/commitlog_entry.hh" #include "db/system_keyspace.hh" #include "db/consistency_level.hh" #include "db/serializer.hh" #include "db/commitlog/commitlog.hh" #include "db/config.hh" #include "to_string.hh" #include "query-result-writer.hh" #include "nway_merger.hh" #include "cql3/column_identifier.hh" #include "core/seastar.hh" #include #include #include #include #include "sstables/sstables.hh" #include "sstables/compaction.hh" #include #include #include "locator/simple_snitch.hh" #include #include #include #include #include "frozen_mutation.hh" #include "mutation_partition_applier.hh" #include "core/do_with.hh" #include "service/migration_manager.hh" #include "service/storage_service.hh" #include "mutation_query.hh" #include "sstable_mutation_readers.hh" #include #include #include "utils/latency.hh" #include "utils/flush_queue.hh" #include "schema_registry.hh" #include "service/priority_manager.hh" using namespace std::chrono_literals; logging::logger dblog("database"); // Slight extension to the flush_queue type. class column_family::memtable_flush_queue : public utils::flush_queue { public: template auto run_cf_flush(db::replay_position rp, Func&& func, Post&& post) { // special case: empty rp, yet still data. // We generate a few memtables with no valid, "high_rp", yet // still containing data -> actual flush. // And to make matters worse, we can initiate a flush of N such // tables at the same time. // Just queue them at the end of the queue and treat them as such. if (rp == db::replay_position() && !empty()) { rp = highest_key(); } return run_with_ordered_post_op(rp, std::forward(func), std::forward(post)); } }; column_family::column_family(schema_ptr schema, config config, db::commitlog& cl, compaction_manager& compaction_manager) : _schema(std::move(schema)) , _config(std::move(config)) , _memtables(make_lw_shared(memtable_list{})) , _sstables(make_lw_shared()) , _cache(_schema, sstables_as_mutation_source(), sstables_as_key_source(), global_cache_tracker()) , _commitlog(&cl) , _compaction_manager(compaction_manager) , _flush_queue(std::make_unique()) { add_memtable(); if (!_config.enable_disk_writes) { dblog.warn("Writes disabled, column family no durable."); } } column_family::column_family(schema_ptr schema, config config, no_commitlog cl, compaction_manager& compaction_manager) : _schema(std::move(schema)) , _config(std::move(config)) , _memtables(make_lw_shared(memtable_list{})) , _sstables(make_lw_shared()) , _cache(_schema, sstables_as_mutation_source(), sstables_as_key_source(), global_cache_tracker()) , _commitlog(nullptr) , _compaction_manager(compaction_manager) , _flush_queue(std::make_unique()) { add_memtable(); if (!_config.enable_disk_writes) { dblog.warn("Writes disabled, column family no durable."); } } partition_presence_checker column_family::make_partition_presence_checker(lw_shared_ptr old_sstables) { return [this, old_sstables = std::move(old_sstables)] (const partition_key& key) { for (auto&& s : *old_sstables) { if (s.second->filter_has_key(*_schema, key)) { return partition_presence_checker_result::maybe_exists; } } return partition_presence_checker_result::definitely_doesnt_exist; }; } mutation_source column_family::sstables_as_mutation_source() { return mutation_source([this] (schema_ptr s, const query::partition_range& r, const io_priority_class& pc) { return make_sstable_reader(std::move(s), r, pc); }); } // define in .cc, since sstable is forward-declared in .hh column_family::~column_family() { } logalloc::occupancy_stats column_family::occupancy() const { logalloc::occupancy_stats res; for (auto m : *_memtables.get()) { res += m->region().occupancy(); } return res; } static bool belongs_to_current_shard(const mutation& m) { return dht::shard_of(m.token()) == engine().cpu_id(); } class range_sstable_reader final : public mutation_reader::impl { const query::partition_range& _pr; lw_shared_ptr _sstables; mutation_reader _reader; // Use a pointer instead of copying, so we don't need to regenerate the reader if // the priority changes. const io_priority_class* _pc; public: range_sstable_reader(schema_ptr s, lw_shared_ptr sstables, const query::partition_range& pr, const io_priority_class& pc) : _pr(pr) , _sstables(std::move(sstables)) , _pc(&pc) { std::vector readers; for (const lw_shared_ptr& sst : *_sstables | boost::adaptors::map_values) { // FIXME: make sstable::read_range_rows() return ::mutation_reader so that we can drop this wrapper. mutation_reader reader = make_mutation_reader(sst, s, pr); if (sst->is_shared()) { reader = make_filtering_reader(std::move(reader), belongs_to_current_shard); } readers.emplace_back(std::move(reader)); } _reader = make_combined_reader(std::move(readers)); } range_sstable_reader(range_sstable_reader&&) = delete; // reader takes reference to member fields virtual future operator()() override { return _reader(); } }; class single_key_sstable_reader final : public mutation_reader::impl { schema_ptr _schema; sstables::key _key; mutation_opt _m; bool _done = false; lw_shared_ptr _sstables; // Use a pointer instead of copying, so we don't need to regenerate the reader if // the priority changes. const io_priority_class* _pc; public: single_key_sstable_reader(schema_ptr schema, lw_shared_ptr sstables, const partition_key& key, const io_priority_class& pc) : _schema(std::move(schema)) , _key(sstables::key::from_partition_key(*_schema, key)) , _sstables(std::move(sstables)) , _pc(&pc) { } virtual future operator()() override { if (_done) { return make_ready_future(); } return parallel_for_each(*_sstables | boost::adaptors::map_values, [this](const lw_shared_ptr& sstable) { return sstable->read_row(_schema, _key).then([this](mutation_opt mo) { apply(_m, std::move(mo)); }); }).then([this] { _done = true; return std::move(_m); }); } }; mutation_reader column_family::make_sstable_reader(schema_ptr s, const query::partition_range& pr, const io_priority_class& pc) const { if (pr.is_singular() && pr.start()->value().has_key()) { const dht::ring_position& pos = pr.start()->value(); if (dht::shard_of(pos.token()) != engine().cpu_id()) { return make_empty_reader(); // range doesn't belong to this shard } return make_mutation_reader(std::move(s), _sstables, *pos.key(), pc); } else { // range_sstable_reader is not movable so we need to wrap it return make_mutation_reader(std::move(s), _sstables, pr, pc); } } key_source column_family::sstables_as_key_source() const { return key_source([this] (const query::partition_range& range, const io_priority_class& pc) { std::vector readers; readers.reserve(_sstables->size()); std::transform(_sstables->begin(), _sstables->end(), std::back_inserter(readers), [&] (auto&& entry) { auto& sst = entry.second; auto rd = sstables::make_key_reader(_schema, sst, range, pc); if (sst->is_shared()) { rd = make_filtering_reader(std::move(rd), [] (const dht::decorated_key& dk) { return dht::shard_of(dk.token()) == engine().cpu_id(); }); } return rd; }); return make_combined_reader(_schema, std::move(readers)); }); } // Exposed for testing, not performance critical. future column_family::find_partition(schema_ptr s, const dht::decorated_key& key) const { return do_with(query::partition_range::make_singular(key), [s = std::move(s), this] (auto& range) { return do_with(this->make_reader(s, range), [] (mutation_reader& reader) { return reader().then([] (mutation_opt&& mo) -> std::unique_ptr { if (!mo) { return {}; } return std::make_unique(std::move(mo->partition())); }); }); }); } future column_family::find_partition_slow(schema_ptr s, const partition_key& key) const { return find_partition(s, dht::global_partitioner().decorate_key(*s, key)); } future column_family::find_row(schema_ptr s, const dht::decorated_key& partition_key, clustering_key clustering_key) const { return find_partition(std::move(s), partition_key).then([clustering_key = std::move(clustering_key)] (const_mutation_partition_ptr p) { if (!p) { return make_ready_future(); } auto r = p->find_row(clustering_key); if (r) { // FIXME: remove copy if only one data source return make_ready_future(std::make_unique(*r)); } else { return make_ready_future(); } }); } mutation_reader column_family::make_reader(schema_ptr s, const query::partition_range& range, const io_priority_class& pc) const { if (query::is_wrap_around(range, *s)) { // make_combined_reader() can't handle streams that wrap around yet. fail(unimplemented::cause::WRAP_AROUND); } std::vector readers; readers.reserve(_memtables->size() + _sstables->size()); // We're assuming that cache and memtables are both read atomically // for single-key queries, so we don't need to special case memtable // undergoing a move to cache. At any given point in time between // deferring points the sum of data in memtable and cache is coherent. If // single-key queries for each data source were performed across deferring // points, it would be possible that partitions which are ahead of the // memtable cursor would be placed behind the cache cursor, resulting in // those partitions being missing in the combined reader. // // We need to handle this in range queries though, as they are always // deferring. scanning_reader from memtable.cc is falling back to reading // the sstable when memtable is flushed. After memtable is moved to cache, // new readers will no longer use the old memtable, but until then // performance may suffer. We should fix this when we add support for // range queries in cache, so that scans can always be satisfied form // memtable and cache only, as long as data is not evicted. // // https://github.com/scylladb/scylla/issues/309 // https://github.com/scylladb/scylla/issues/185 for (auto&& mt : *_memtables) { readers.emplace_back(mt->make_reader(s, range)); } if (_config.enable_cache) { readers.emplace_back(_cache.make_reader(s, range, pc)); } else { readers.emplace_back(make_sstable_reader(s, range, pc)); } return make_combined_reader(std::move(readers)); } // Not performance critical. Currently used for testing only. template future column_family::for_all_partitions(schema_ptr s, Func&& func) const { static_assert(std::is_same>::value, "bad Func signature"); struct iteration_state { mutation_reader reader; Func func; bool ok = true; bool empty = false; public: bool done() const { return !ok || empty; } iteration_state(schema_ptr s, const column_family& cf, Func&& func) : reader(cf.make_reader(std::move(s))) , func(std::move(func)) { } }; return do_with(iteration_state(std::move(s), *this, std::move(func)), [] (iteration_state& is) { return do_until([&is] { return is.done(); }, [&is] { return is.reader().then([&is](mutation_opt&& mo) { if (!mo) { is.empty = true; } else { is.ok = is.func(mo->decorated_key(), mo->partition()); } }); }).then([&is] { return is.ok; }); }); } future column_family::for_all_partitions_slow(schema_ptr s, std::function func) const { return for_all_partitions(std::move(s), std::move(func)); } class lister { public: using dir_entry_types = std::unordered_set>; using walker_type = std::function (directory_entry)>; using filter_type = std::function; private: file _f; walker_type _walker; filter_type _filter; dir_entry_types _expected_type; subscription _listing; sstring _dirname; public: lister(file f, dir_entry_types type, walker_type walker, sstring dirname) : _f(std::move(f)) , _walker(std::move(walker)) , _filter([] (const sstring& fname) { return true; }) , _expected_type(type) , _listing(_f.list_directory([this] (directory_entry de) { return _visit(de); })) , _dirname(dirname) { } lister(file f, dir_entry_types type, walker_type walker, filter_type filter, sstring dirname) : lister(std::move(f), type, std::move(walker), dirname) { _filter = std::move(filter); } static future<> scan_dir(sstring name, dir_entry_types type, walker_type walker, filter_type filter = [] (const sstring& fname) { return true; }); protected: future<> _visit(directory_entry de) { return guarantee_type(std::move(de)).then([this] (directory_entry de) { // Hide all synthetic directories and hidden files. if ((!_expected_type.count(*(de.type))) || (de.name[0] == '.')) { return make_ready_future<>(); } // apply a filter if (!_filter(_dirname + "/" + de.name)) { return make_ready_future<>(); } return _walker(de); }); } future<> done() { return _listing.done(); } private: future guarantee_type(directory_entry de) { if (de.type) { return make_ready_future(std::move(de)); } else { auto f = engine().file_type(_dirname + "/" + de.name); return f.then([de = std::move(de)] (std::experimental::optional t) mutable { de.type = t; return make_ready_future(std::move(de)); }); } } }; future<> lister::scan_dir(sstring name, lister::dir_entry_types type, walker_type walker, filter_type filter) { return engine().open_directory(name).then([type, walker = std::move(walker), filter = std::move(filter), name] (file f) { auto l = make_lw_shared(std::move(f), type, walker, filter, name); return l->done().then([l] { }); }); } static std::vector parse_fname(sstring filename) { std::vector comps; boost::split(comps , filename ,boost::is_any_of(".-")); return comps; } static bool belongs_to_current_shard(const schema& s, const partition_key& first, const partition_key& last) { auto key_shard = [&s] (const partition_key& pk) { auto token = dht::global_partitioner().get_token(s, pk); return dht::shard_of(token); }; auto s1 = key_shard(first); auto s2 = key_shard(last); auto me = engine().cpu_id(); return (s1 <= me) && (me <= s2); } static bool belongs_to_current_shard(const schema& s, range r) { assert(r.start()); assert(r.end()); return belongs_to_current_shard(s, r.start()->value(), r.end()->value()); } future column_family::probe_file(sstring sstdir, sstring fname) { using namespace sstables; entry_descriptor comps = entry_descriptor::make_descriptor(fname); // Every table will have a TOC. Using a specific file as a criteria, as // opposed to, say verifying _sstables.count() to be zero is more robust // against parallel loading of the directory contents. if (comps.component != sstable::component_type::TOC) { return make_ready_future(std::move(comps)); } update_sstables_known_generation(comps.generation); assert(_sstables->count(comps.generation) == 0); auto fut = sstable::get_sstable_key_range(*_schema, _schema->ks_name(), _schema->cf_name(), sstdir, comps.generation, comps.version, comps.format); return std::move(fut).then([this, sstdir = std::move(sstdir), comps] (range r) { // Checks whether or not sstable belongs to current shard. if (!belongs_to_current_shard(*_schema, std::move(r))) { dblog.debug("sstable {} not relevant for this shard, ignoring", sstables::sstable::filename(sstdir, _schema->ks_name(), _schema->cf_name(), comps.version, comps.generation, comps.format, sstables::sstable::component_type::Data)); sstable::mark_sstable_for_deletion(_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation, comps.version, comps.format); return make_ready_future<>(); } auto sst = std::make_unique(_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation, comps.version, comps.format); auto fut = sst->load(); return std::move(fut).then([this, sst = std::move(sst)] () mutable { add_sstable(std::move(*sst)); return make_ready_future<>(); }); }).then_wrapped([fname, comps] (future<> f) { try { f.get(); } catch (malformed_sstable_exception& e) { dblog.error("malformed sstable {}: {}. Refusing to boot", fname, e.what()); throw; } catch(...) { dblog.error("Unrecognized error while processing {}: {}. Refusing to boot", fname, std::current_exception()); throw; } return make_ready_future(std::move(comps)); }); } void column_family::update_stats_for_new_sstable(uint64_t new_sstable_data_size) { _stats.live_disk_space_used += new_sstable_data_size; _stats.total_disk_space_used += new_sstable_data_size; _stats.live_sstable_count++; } void column_family::add_sstable(sstables::sstable&& sstable) { add_sstable(make_lw_shared(std::move(sstable))); } void column_family::add_sstable(lw_shared_ptr sstable) { auto generation = sstable->generation(); // allow in-progress reads to continue using old list _sstables = make_lw_shared(*_sstables); update_stats_for_new_sstable(sstable->data_size()); _sstables->emplace(generation, std::move(sstable)); } void column_family::add_memtable() { // allow in-progress reads to continue using old list _memtables = make_lw_shared(memtable_list(*_memtables)); _memtables->emplace_back(make_lw_shared(_schema, _config.dirty_memory_region_group)); } future<> column_family::update_cache(memtable& m, lw_shared_ptr old_sstables) { if (_config.enable_cache) { // be careful to use the old sstable list, since the new one will hit every // mutation in m. return _cache.update(m, make_partition_presence_checker(std::move(old_sstables))); } else { return make_ready_future<>(); } } future<> column_family::seal_active_memtable() { auto old = _memtables->back(); dblog.debug("Sealing active memtable, partitions: {}, occupancy: {}", old->partition_count(), old->occupancy()); if (!_config.enable_disk_writes) { return make_ready_future<>(); } if (old->empty()) { dblog.debug("Memtable is empty"); return make_ready_future<>(); } add_memtable(); assert(_highest_flushed_rp < old->replay_position() || (_highest_flushed_rp == db::replay_position() && old->replay_position() == db::replay_position()) ); _highest_flushed_rp = old->replay_position(); return _flush_queue->run_cf_flush(old->replay_position(), [old, this] { return repeat([this, old] { return with_lock(_sstables_lock.for_read(), [this, old] { _flush_queue->check_open_gate(); return try_flush_memtable_to_sstable(old); }); }); }, [old, this] { if (_commitlog) { _commitlog->discard_completed_segments(_schema->id(), old->replay_position()); } }); // FIXME: release commit log // FIXME: provide back-pressure to upper layers } future column_family::try_flush_memtable_to_sstable(lw_shared_ptr old) { // FIXME: better way of ensuring we don't attempt to // overwrite an existing table. auto gen = _sstable_generation++ * smp::count + engine().cpu_id(); auto newtab = make_lw_shared(_schema->ks_name(), _schema->cf_name(), _config.datadir, gen, sstables::sstable::version_types::ka, sstables::sstable::format_types::big); auto memtable_size = old->occupancy().total_space(); _config.cf_stats->pending_memtables_flushes_count++; _config.cf_stats->pending_memtables_flushes_bytes += memtable_size; newtab->set_unshared(); dblog.debug("Flushing to {}", newtab->get_filename()); // Note that due to our sharded architecture, it is possible that // in the face of a value change some shards will backup sstables // while others won't. // // This is, in theory, possible to mitigate through a rwlock. // However, this doesn't differ from the situation where all tables // are coming from a single shard and the toggle happens in the // middle of them. // // The code as is guarantees that we'll never partially backup a // single sstable, so that is enough of a guarantee. auto&& priority = service::get_local_memtable_flush_priority(); return newtab->write_components(*old, incremental_backups_enabled(), priority).then([this, newtab, old] { return newtab->open_data(); }).then_wrapped([this, old, newtab, memtable_size] (future<> ret) { _config.cf_stats->pending_memtables_flushes_count--; _config.cf_stats->pending_memtables_flushes_bytes -= memtable_size; dblog.debug("Flushing done"); try { ret.get(); // We must add sstable before we call update_cache(), because // memtable's data after moving to cache can be evicted at any time. auto old_sstables = _sstables; add_sstable(newtab); old->mark_flushed(newtab); trigger_compaction(); return update_cache(*old, std::move(old_sstables)).then_wrapped([this, old] (future<> f) { try { f.get(); } catch(...) { dblog.error("failed to move memtable to cache: {}", std::current_exception()); } _memtables->erase(boost::range::find(*_memtables, old)); dblog.debug("Memtable replaced"); return make_ready_future(stop_iteration::yes); }); } catch (...) { dblog.error("failed to write sstable: {}", std::current_exception()); } return sleep(10s).then([] { return make_ready_future(stop_iteration::no); }); }); } void column_family::start() { // FIXME: add option to disable automatic compaction. start_compaction(); } future<> column_family::stop() { seal_active_memtable(); return _compaction_manager.remove(this).then([this] { return _flush_queue->close(); }); } future> column_family::reshuffle_sstables(int64_t start) { struct work { int64_t current_gen; sstable_list sstables; std::unordered_map descriptors; std::vector reshuffled; work(int64_t start) : current_gen(start ? start : 1) {} }; return do_with(work(start), [this] (work& work) { return lister::scan_dir(_config.datadir, { directory_entry_type::regular }, [this, &work] (directory_entry de) { auto comps = sstables::entry_descriptor::make_descriptor(de.name); if (comps.component != sstables::sstable::component_type::TOC) { return make_ready_future<>(); } else if (comps.generation < work.current_gen) { return make_ready_future<>(); } auto sst = make_lw_shared(_schema->ks_name(), _schema->cf_name(), _config.datadir, comps.generation, comps.version, comps.format); work.sstables.emplace(comps.generation, std::move(sst)); work.descriptors.emplace(comps.generation, std::move(comps)); // FIXME: This is the only place in which we actually issue disk activity aside from // directory metadata operations. // // But without the TOC information, we don't know which files we should link. // The alternative to that would be to change create link to try creating a // link for all possible files and handling the failures gracefuly, but that's not // exactly fast either. // // Those SSTables are not known by anyone in the system. So we don't have any kind of // object describing them. There isn't too much of a choice. return work.sstables[comps.generation]->read_toc(); }, &manifest_json_filter).then([&work] { // Note: cannot be parallel because we will be shuffling things around at this stage. Can't race. return do_for_each(work.sstables, [&work] (auto& pair) { auto&& comps = std::move(work.descriptors.at(pair.first)); comps.generation = work.current_gen; work.reshuffled.push_back(std::move(comps)); if (pair.first == work.current_gen) { ++work.current_gen; return make_ready_future<>(); } return pair.second->set_generation(work.current_gen++); }); }).then([&work] { return make_ready_future>(std::move(work.reshuffled)); }); }); } void column_family::rebuild_sstable_list(const std::vector& new_sstables, const std::vector& sstables_to_remove) { // Build a new list of _sstables: We remove from the existing list the // tables we compacted (by now, there might be more sstables flushed // later), and we add the new tables generated by the compaction. // We create a new list rather than modifying it in-place, so that // on-going reads can continue to use the old list. auto current_sstables = _sstables; _sstables = make_lw_shared(); // zeroing live_disk_space_used and live_sstable_count because the // sstable list is re-created below. _stats.live_disk_space_used = 0; _stats.live_sstable_count = 0; std::unordered_set s( sstables_to_remove.begin(), sstables_to_remove.end()); for (const auto& oldtab : *current_sstables) { // Checks if oldtab is a sstable not being compacted. if (!s.count(oldtab.second)) { update_stats_for_new_sstable(oldtab.second->data_size()); _sstables->emplace(oldtab.first, oldtab.second); } } for (const auto& newtab : new_sstables) { // FIXME: rename the new sstable(s). Verify a rename doesn't cause // problems for the sstable object. update_stats_for_new_sstable(newtab->data_size()); _sstables->emplace(newtab->generation(), newtab); } for (const auto& oldtab : sstables_to_remove) { oldtab->mark_for_deletion(); } } future<> column_family::compact_sstables(sstables::compaction_descriptor descriptor, bool cleanup) { if (!descriptor.sstables.size()) { // if there is nothing to compact, just return. return make_ready_future<>(); } return with_lock(_sstables_lock.for_read(), [this, descriptor = std::move(descriptor), cleanup] { auto sstables_to_compact = make_lw_shared>(std::move(descriptor.sstables)); auto new_tables = make_lw_shared>(); auto create_sstable = [this, new_tables] { auto gen = this->calculate_generation_for_new_table(); // FIXME: use "tmp" marker in names of incomplete sstable auto sst = make_lw_shared(_schema->ks_name(), _schema->cf_name(), _config.datadir, gen, sstables::sstable::version_types::ka, sstables::sstable::format_types::big); sst->set_unshared(); new_tables->emplace_back(sst); return sst; }; return sstables::compact_sstables(*sstables_to_compact, *this, create_sstable, descriptor.max_sstable_bytes, descriptor.level, cleanup).then([this, new_tables, sstables_to_compact] { this->rebuild_sstable_list(*new_tables, *sstables_to_compact); }); }); } static bool needs_cleanup(const lw_shared_ptr& sst, const lw_shared_ptr>>& owned_ranges, schema_ptr s) { auto first = sst->get_first_partition_key(*s); auto last = sst->get_last_partition_key(*s); auto first_token = dht::global_partitioner().get_token(*s, first); auto last_token = dht::global_partitioner().get_token(*s, last); range sst_token_range = range::make(first_token, last_token); // return true iff sst partition range isn't fully contained in any of the owned ranges. for (auto& r : *owned_ranges) { if (r.contains(sst_token_range, dht::token_comparator())) { return false; } } return true; } future<> column_family::cleanup_sstables(sstables::compaction_descriptor descriptor) { std::vector> r = service::get_local_storage_service().get_local_ranges(_schema->ks_name()); auto owned_ranges = make_lw_shared>>(std::move(r)); auto sstables_to_cleanup = make_lw_shared>(std::move(descriptor.sstables)); return parallel_for_each(*sstables_to_cleanup, [this, owned_ranges = std::move(owned_ranges), sstables_to_cleanup] (auto& sst) { if (!owned_ranges->empty() && !needs_cleanup(sst, owned_ranges, _schema)) { return make_ready_future<>(); } std::vector sstable_to_compact({ sst }); return this->compact_sstables(sstables::compaction_descriptor(std::move(sstable_to_compact)), true); }); } future<> column_family::load_new_sstables(std::vector new_tables) { return parallel_for_each(new_tables, [this] (auto comps) { auto sst = make_lw_shared(_schema->ks_name(), _schema->cf_name(), _config.datadir, comps.generation, comps.version, comps.format); return sst->load().then([this, sst] { return sst->mutate_sstable_level(0); }).then([this, sst] { auto first = sst->get_first_partition_key(*_schema); auto last = sst->get_last_partition_key(*_schema); if (belongs_to_current_shard(*_schema, first, last)) { this->add_sstable(sst); } else { sst->mark_for_deletion(); } return make_ready_future<>(); }); }); } // FIXME: this is just an example, should be changed to something more general // Note: We assume that the column_family does not get destroyed during compaction. future<> column_family::compact_all_sstables() { std::vector sstables; sstables.reserve(_sstables->size()); for (auto&& entry : *_sstables) { sstables.push_back(entry.second); } // FIXME: check if the lower bound min_compaction_threshold() from schema // should be taken into account before proceeding with compaction. return compact_sstables(sstables::compaction_descriptor(std::move(sstables))); } void column_family::start_compaction() { set_compaction_strategy(_schema->compaction_strategy()); } void column_family::trigger_compaction() { // Submitting compaction job to compaction manager. if (!_compaction_disabled) { _stats.pending_compactions++; _compaction_manager.submit(this); } } future<> column_family::run_compaction(sstables::compaction_descriptor descriptor) { return compact_sstables(std::move(descriptor)).then([this] { _stats.pending_compactions--; }); } void column_family::set_compaction_strategy(sstables::compaction_strategy_type strategy) { _compaction_strategy = make_compaction_strategy(strategy, _schema->compaction_strategy_options()); } bool column_family::compaction_manager_queued() const { return _compaction_manager_queued; } void column_family::set_compaction_manager_queued(bool compaction_manager_queued) { _compaction_manager_queued = compaction_manager_queued; } bool column_family::pending_compactions() const { return _stats.pending_compactions > 0; } size_t column_family::sstables_count() { return _sstables->size(); } int64_t column_family::get_unleveled_sstables() const { // TODO: when we support leveled compaction, we should return the number of // SSTables in L0. If leveled compaction is enabled in this column family, // then we should return zero, as we currently do. return 0; } lw_shared_ptr column_family::get_sstables() { return _sstables; } inline bool column_family::manifest_json_filter(const sstring& fname) { using namespace boost::filesystem; path entry_path(fname); if (!is_directory(status(entry_path)) && entry_path.filename() == path("manifest.json")) { return false; } return true; } future<> column_family::populate(sstring sstdir) { // We can catch most errors when we try to load an sstable. But if the TOC // file is the one missing, we won't try to load the sstable at all. This // case is still an invalid case, but it is way easier for us to treat it // by waiting for all files to be loaded, and then checking if we saw a // file during scan_dir, without its corresponding TOC. enum class status { has_some_file, has_toc_file, has_temporary_toc_file, }; struct sstable_descriptor { std::experimental::optional version; std::experimental::optional format; }; auto verifier = make_lw_shared>(); auto descriptor = make_lw_shared(); return do_with(std::vector>(), [this, sstdir, verifier, descriptor] (std::vector>& futures) { return lister::scan_dir(sstdir, { directory_entry_type::regular }, [this, sstdir, verifier, descriptor, &futures] (directory_entry de) { // FIXME: The secondary indexes are in this level, but with a directory type, (starting with ".") auto f = probe_file(sstdir, de.name).then([verifier, descriptor] (auto entry) { if (verifier->count(entry.generation)) { if (verifier->at(entry.generation) == status::has_toc_file) { if (entry.component == sstables::sstable::component_type::TOC) { throw sstables::malformed_sstable_exception("Invalid State encountered. TOC file already processed"); } else if (entry.component == sstables::sstable::component_type::TemporaryTOC) { throw sstables::malformed_sstable_exception("Invalid State encountered. Temporary TOC file found after TOC file was processed"); } } else if (entry.component == sstables::sstable::component_type::TOC) { verifier->at(entry.generation) = status::has_toc_file; } else if (entry.component == sstables::sstable::component_type::TemporaryTOC) { verifier->at(entry.generation) = status::has_temporary_toc_file; } } else { if (entry.component == sstables::sstable::component_type::TOC) { verifier->emplace(entry.generation, status::has_toc_file); } else if (entry.component == sstables::sstable::component_type::TemporaryTOC) { verifier->emplace(entry.generation, status::has_temporary_toc_file); } else { verifier->emplace(entry.generation, status::has_some_file); } } // Retrieve both version and format used for this column family. if (!descriptor->version) { descriptor->version = entry.version; } if (!descriptor->format) { descriptor->format = entry.format; } }); // push future returned by probe_file into an array of futures, // so that the supplied callback will not block scan_dir() from // reading the next entry in the directory. futures.push_back(std::move(f)); return make_ready_future<>(); }, &manifest_json_filter).then([&futures] { return when_all(futures.begin(), futures.end()).then([] (std::vector> ret) { try { for (auto& f : ret) { f.get(); } } catch(...) { throw; } }); }).then([verifier, sstdir, descriptor, this] { return parallel_for_each(*verifier, [sstdir = std::move(sstdir), descriptor, this] (auto v) { if (v.second == status::has_temporary_toc_file) { unsigned long gen = v.first; assert(descriptor->version); sstables::sstable::version_types version = descriptor->version.value(); assert(descriptor->format); sstables::sstable::format_types format = descriptor->format.value(); if (engine().cpu_id() != 0) { dblog.debug("At directory: {}, partial SSTable with generation {} not relevant for this shard, ignoring", sstdir, v.first); return make_ready_future<>(); } // shard 0 is the responsible for removing a partial sstable. return sstables::sstable::remove_sstable_with_temp_toc(_schema->ks_name(), _schema->cf_name(), sstdir, gen, version, format); } else if (v.second != status::has_toc_file) { throw sstables::malformed_sstable_exception(sprint("At directory: %s: no TOC found for SSTable with generation %d!. Refusing to boot", sstdir, v.first)); } return make_ready_future<>(); }); }); }); } utils::UUID database::empty_version = utils::UUID_gen::get_name_UUID(bytes{}); database::database() : database(db::config()) {} database::database(const db::config& cfg) : _cfg(std::make_unique(cfg)) , _version(empty_version) { _memtable_total_space = size_t(_cfg->memtable_total_space_in_mb()) << 20; if (!_memtable_total_space) { _memtable_total_space = memory::stats().total_memory() / 2; } // Start compaction manager with two tasks for handling compaction jobs. _compaction_manager.start(2); setup_collectd(); dblog.info("Row: max_vector_size: {}, internal_count: {}", size_t(row::max_vector_size), size_t(row::internal_count)); } void database::setup_collectd() { _collectd.push_back( scollectd::add_polled_metric(scollectd::type_instance_id("memory" , scollectd::per_cpu_plugin_instance , "bytes", "dirty") , scollectd::make_typed(scollectd::data_type::GAUGE, [this] { return _dirty_memory_region_group.memory_used(); }))); _collectd.push_back( scollectd::add_polled_metric(scollectd::type_instance_id("memtables" , scollectd::per_cpu_plugin_instance , "queue_length", "pending_flushes") , scollectd::make_typed(scollectd::data_type::GAUGE, _cf_stats.pending_memtables_flushes_count) )); _collectd.push_back( scollectd::add_polled_metric(scollectd::type_instance_id("memtables" , scollectd::per_cpu_plugin_instance , "bytes", "pending_flushes") , scollectd::make_typed(scollectd::data_type::GAUGE, _cf_stats.pending_memtables_flushes_bytes) )); } database::~database() { } void database::update_version(const utils::UUID& version) { _version = version; } const utils::UUID& database::get_version() const { return _version; } future<> database::populate_keyspace(sstring datadir, sstring ks_name) { auto ksdir = datadir + "/" + ks_name; auto i = _keyspaces.find(ks_name); if (i == _keyspaces.end()) { dblog.warn("Skipping undefined keyspace: {}", ks_name); } else { dblog.info("Populating Keyspace {}", ks_name); return lister::scan_dir(ksdir, { directory_entry_type::directory }, [this, ksdir, ks_name] (directory_entry de) { auto comps = parse_fname(de.name); if (comps.size() < 2) { dblog.error("Keyspace {}: Skipping malformed CF {} ", ksdir, de.name); return make_ready_future<>(); } sstring cfname = comps[0]; auto sstdir = ksdir + "/" + de.name; try { auto& cf = find_column_family(ks_name, cfname); dblog.info("Keyspace {}: Reading CF {} ", ksdir, cfname); // FIXME: Increase parallelism. return cf.populate(sstdir); } catch (no_such_column_family&) { dblog.warn("{}, CF {}: schema not loaded!", ksdir, comps[0]); return make_ready_future<>(); } }); } return make_ready_future<>(); } future<> database::populate(sstring datadir) { return lister::scan_dir(datadir, { directory_entry_type::directory }, [this, datadir] (directory_entry de) { auto& ks_name = de.name; if (ks_name == "system") { return make_ready_future<>(); } return populate_keyspace(datadir, ks_name); }); } template static future<> do_parse_system_tables(distributed& proxy, const sstring& _cf_name, Func&& func) { using namespace db::schema_tables; static_assert(std::is_same, std::result_of_t>::value, "bad Func signature"); auto cf_name = make_lw_shared(_cf_name); return db::system_keyspace::query(proxy, *cf_name).then([] (auto rs) { auto names = std::set(); for (auto& r : rs->rows()) { auto keyspace_name = r.template get_nonnull("keyspace_name"); names.emplace(keyspace_name); } return std::move(names); }).then([&proxy, cf_name, func = std::forward(func)] (std::set&& names) mutable { return parallel_for_each(names.begin(), names.end(), [&proxy, cf_name, func = std::forward(func)] (sstring name) mutable { if (name == "system") { return make_ready_future<>(); } return read_schema_partition_for_keyspace(proxy, *cf_name, name).then([func, cf_name] (auto&& v) mutable { return do_with(std::move(v), [func = std::forward(func), cf_name] (auto& v) { return func(v).then_wrapped([cf_name, &v] (future<> f) { try { f.get(); } catch (std::exception& e) { dblog.error("Skipping: {}. Exception occurred when loading system table {}: {}", v.first, *cf_name, e.what()); } }); }); }); }); }); } future<> database::parse_system_tables(distributed& proxy) { using namespace db::schema_tables; return do_parse_system_tables(proxy, db::schema_tables::KEYSPACES, [this] (schema_result_value_type &v) { auto ksm = create_keyspace_from_schema_partition(v); return create_keyspace(ksm); }).then([&proxy, this] { return do_parse_system_tables(proxy, db::schema_tables::COLUMNFAMILIES, [this, &proxy] (schema_result_value_type &v) { return create_tables_from_tables_partition(proxy, v.second).then([this] (std::map tables) { for (auto& t: tables) { auto s = t.second; auto& ks = this->find_keyspace(s->ks_name()); auto cfg = ks.make_column_family_config(*s); this->add_column_family(std::move(s), std::move(cfg)); } }); }); }); } future<> database::init_system_keyspace() { bool durable = _cfg->data_file_directories().size() > 0; db::system_keyspace::make(*this, durable, _cfg->volatile_system_keyspace_for_testing()); // FIXME support multiple directories return touch_directory(_cfg->data_file_directories()[0] + "/" + db::system_keyspace::NAME).then([this] { return populate_keyspace(_cfg->data_file_directories()[0], db::system_keyspace::NAME).then([this]() { return init_commitlog(); }); }); } future<> database::load_sstables(distributed& proxy) { return parse_system_tables(proxy).then([this] { return populate(_cfg->data_file_directories()[0]); }); } future<> database::init_commitlog() { return db::commitlog::create_commitlog(*_cfg).then([this](db::commitlog&& log) { _commitlog = std::make_unique(std::move(log)); _commitlog->add_flush_handler([this](db::cf_id_type id, db::replay_position pos) { if (_column_families.count(id) == 0) { // the CF has been removed. _commitlog->discard_completed_segments(id, pos); return; } _column_families[id]->flush(pos); }).release(); // we have longer life time than CL. Ignore reg anchor }); } unsigned database::shard_of(const dht::token& t) { return dht::shard_of(t); } unsigned database::shard_of(const mutation& m) { return shard_of(m.token()); } unsigned database::shard_of(const frozen_mutation& m) { // FIXME: This lookup wouldn't be necessary if we // sent the partition key in legacy form or together // with token. schema_ptr schema = find_schema(m.column_family_id()); return shard_of(dht::global_partitioner().get_token(*schema, m.key(*schema))); } void database::add_keyspace(sstring name, keyspace k) { if (_keyspaces.count(name) != 0) { throw std::invalid_argument("Keyspace " + name + " already exists"); } _keyspaces.emplace(std::move(name), std::move(k)); } void database::update_keyspace(const sstring& name) { throw std::runtime_error("update keyspace not implemented"); } void database::drop_keyspace(const sstring& name) { _keyspaces.erase(name); } void database::add_column_family(schema_ptr schema, column_family::config cfg) { schema = local_schema_registry().learn(schema); schema->registry_entry()->mark_synced(); auto uuid = schema->id(); lw_shared_ptr cf; if (cfg.enable_commitlog && _commitlog) { cf = make_lw_shared(schema, std::move(cfg), *_commitlog, _compaction_manager); } else { cf = make_lw_shared(schema, std::move(cfg), column_family::no_commitlog(), _compaction_manager); } auto ks = _keyspaces.find(schema->ks_name()); if (ks == _keyspaces.end()) { throw std::invalid_argument("Keyspace " + schema->ks_name() + " not defined"); } if (_column_families.count(uuid) != 0) { throw std::invalid_argument("UUID " + uuid.to_sstring() + " already mapped"); } auto kscf = std::make_pair(schema->ks_name(), schema->cf_name()); if (_ks_cf_to_uuid.count(kscf) != 0) { throw std::invalid_argument("Column family " + schema->cf_name() + " exists"); } ks->second.add_column_family(schema); cf->start(); _column_families.emplace(uuid, std::move(cf)); _ks_cf_to_uuid.emplace(std::move(kscf), uuid); } future<> database::drop_column_family(db_clock::time_point dropped_at, const sstring& ks_name, const sstring& cf_name) { auto uuid = find_uuid(ks_name, cf_name); auto& ks = find_keyspace(ks_name); auto cf = _column_families.at(uuid); _column_families.erase(uuid); ks.metadata()->remove_column_family(cf->schema()); _ks_cf_to_uuid.erase(std::make_pair(ks_name, cf_name)); return truncate(dropped_at, ks, *cf).then([this, cf] { return cf->stop(); }).then([this, cf] { return make_ready_future<>(); }); } const utils::UUID& database::find_uuid(const sstring& ks, const sstring& cf) const throw (std::out_of_range) { return _ks_cf_to_uuid.at(std::make_pair(ks, cf)); } const utils::UUID& database::find_uuid(const schema_ptr& schema) const throw (std::out_of_range) { return find_uuid(schema->ks_name(), schema->cf_name()); } keyspace& database::find_keyspace(const sstring& name) throw (no_such_keyspace) { try { return _keyspaces.at(name); } catch (...) { std::throw_with_nested(no_such_keyspace(name)); } } const keyspace& database::find_keyspace(const sstring& name) const throw (no_such_keyspace) { try { return _keyspaces.at(name); } catch (...) { std::throw_with_nested(no_such_keyspace(name)); } } bool database::has_keyspace(const sstring& name) const { return _keyspaces.count(name) != 0; } std::vector database::get_non_system_keyspaces() const { std::vector res; for (auto const &i : _keyspaces) { if (i.first != db::system_keyspace::NAME) { res.push_back(i.first); } } return res; } column_family& database::find_column_family(const sstring& ks_name, const sstring& cf_name) throw (no_such_column_family) { try { return find_column_family(find_uuid(ks_name, cf_name)); } catch (...) { std::throw_with_nested(no_such_column_family(ks_name, cf_name)); } } const column_family& database::find_column_family(const sstring& ks_name, const sstring& cf_name) const throw (no_such_column_family) { try { return find_column_family(find_uuid(ks_name, cf_name)); } catch (...) { std::throw_with_nested(no_such_column_family(ks_name, cf_name)); } } column_family& database::find_column_family(const utils::UUID& uuid) throw (no_such_column_family) { try { return *_column_families.at(uuid); } catch (...) { std::throw_with_nested(no_such_column_family(uuid)); } } const column_family& database::find_column_family(const utils::UUID& uuid) const throw (no_such_column_family) { try { return *_column_families.at(uuid); } catch (...) { std::throw_with_nested(no_such_column_family(uuid)); } } void keyspace::create_replication_strategy(const std::map& options) { using namespace locator; auto& ss = service::get_local_storage_service(); _replication_strategy = abstract_replication_strategy::create_replication_strategy( _metadata->name(), _metadata->strategy_name(), ss.get_token_metadata(), options); } locator::abstract_replication_strategy& keyspace::get_replication_strategy() { return *_replication_strategy; } const locator::abstract_replication_strategy& keyspace::get_replication_strategy() const { return *_replication_strategy; } void keyspace::set_replication_strategy(std::unique_ptr replication_strategy) { _replication_strategy = std::move(replication_strategy); } column_family::config keyspace::make_column_family_config(const schema& s) const { column_family::config cfg; cfg.datadir = column_family_directory(s.cf_name(), s.id()); cfg.enable_disk_reads = _config.enable_disk_reads; cfg.enable_disk_writes = _config.enable_disk_writes; cfg.enable_commitlog = _config.enable_commitlog; cfg.enable_cache = _config.enable_cache; cfg.max_memtable_size = _config.max_memtable_size; cfg.dirty_memory_region_group = _config.dirty_memory_region_group; cfg.cf_stats = _config.cf_stats; cfg.enable_incremental_backups = _config.enable_incremental_backups; return cfg; } sstring keyspace::column_family_directory(const sstring& name, utils::UUID uuid) const { auto uuid_sstring = uuid.to_sstring(); boost::erase_all(uuid_sstring, "-"); return sprint("%s/%s-%s", _config.datadir, name, uuid_sstring); } future<> keyspace::make_directory_for_column_family(const sstring& name, utils::UUID uuid) { return touch_directory(column_family_directory(name, uuid)); } no_such_keyspace::no_such_keyspace(const sstring& ks_name) : runtime_error{sprint("Can't find a keyspace %s", ks_name)} { } no_such_column_family::no_such_column_family(const utils::UUID& uuid) : runtime_error{sprint("Can't find a column family with UUID %s", uuid)} { } no_such_column_family::no_such_column_family(const sstring& ks_name, const sstring& cf_name) : runtime_error{sprint("Can't find a column family %s in keyspace %s", cf_name, ks_name)} { } column_family& database::find_column_family(const schema_ptr& schema) throw (no_such_column_family) { return find_column_family(schema->id()); } const column_family& database::find_column_family(const schema_ptr& schema) const throw (no_such_column_family) { return find_column_family(schema->id()); } void keyspace_metadata::validate() const { using namespace locator; auto& ss = service::get_local_storage_service(); abstract_replication_strategy::validate_replication_strategy(name(), strategy_name(), ss.get_token_metadata(), strategy_options()); } schema_ptr database::find_schema(const sstring& ks_name, const sstring& cf_name) const throw (no_such_column_family) { try { return find_schema(find_uuid(ks_name, cf_name)); } catch (std::out_of_range&) { std::throw_with_nested(no_such_column_family(ks_name, cf_name)); } } schema_ptr database::find_schema(const utils::UUID& uuid) const throw (no_such_column_family) { return find_column_family(uuid).schema(); } bool database::has_schema(const sstring& ks_name, const sstring& cf_name) const { return _ks_cf_to_uuid.count(std::make_pair(ks_name, cf_name)) > 0; } void database::create_in_memory_keyspace(const lw_shared_ptr& ksm) { keyspace ks(ksm, std::move(make_keyspace_config(*ksm))); ks.create_replication_strategy(ksm->strategy_options()); _keyspaces.emplace(ksm->name(), std::move(ks)); } future<> database::create_keyspace(const lw_shared_ptr& ksm) { auto i = _keyspaces.find(ksm->name()); if (i != _keyspaces.end()) { return make_ready_future<>(); } create_in_memory_keyspace(ksm); auto& datadir = _keyspaces.at(ksm->name()).datadir(); if (datadir != "") { return touch_directory(datadir); } else { return make_ready_future<>(); } } std::set database::existing_index_names(const sstring& cf_to_exclude) const { std::set names; for (auto& p : _column_families) { auto& cf = *p.second; if (!cf_to_exclude.empty() && cf.schema()->cf_name() == cf_to_exclude) { continue; } for (auto& cd : cf.schema()->all_columns_in_select_order()) { if (cd.idx_info.index_name) { names.emplace(*cd.idx_info.index_name); } } } return names; } // Based on: // - org.apache.cassandra.db.AbstractCell#reconcile() // - org.apache.cassandra.db.BufferExpiringCell#reconcile() // - org.apache.cassandra.db.BufferDeletedCell#reconcile() int compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) { if (left.timestamp() != right.timestamp()) { return left.timestamp() > right.timestamp() ? 1 : -1; } if (left.is_live() != right.is_live()) { return left.is_live() ? -1 : 1; } if (left.is_live()) { auto c = compare_unsigned(left.value(), right.value()); if (c != 0) { return c; } if (left.is_live_and_has_ttl() && right.is_live_and_has_ttl() && left.expiry() != right.expiry()) { return left.expiry() < right.expiry() ? -1 : 1; } } else { // Both are deleted if (left.deletion_time() != right.deletion_time()) { // Origin compares big-endian serialized deletion time. That's because it // delegates to AbstractCell.reconcile() which compares values after // comparing timestamps, which in case of deleted cells will hold // serialized expiry. return (uint32_t) left.deletion_time().time_since_epoch().count() < (uint32_t) right.deletion_time().time_since_epoch().count() ? -1 : 1; } } return 0; } struct query_state { explicit query_state(schema_ptr s, const query::read_command& cmd, const std::vector& ranges) : schema(std::move(s)) , cmd(cmd) , builder(cmd.slice) , limit(cmd.row_limit) , current_partition_range(ranges.begin()) , range_end(ranges.end()){ } schema_ptr schema; const query::read_command& cmd; query::result::builder builder; uint32_t limit; bool range_empty = false; // Avoid ubsan false-positive when moving after construction std::vector::const_iterator current_partition_range; std::vector::const_iterator range_end; mutation_reader reader; bool done() const { return !limit || current_partition_range == range_end; } }; future> column_family::query(schema_ptr s, const query::read_command& cmd, const std::vector& partition_ranges) { utils::latency_counter lc; _stats.reads.set_latency(lc); return do_with(query_state(std::move(s), cmd, partition_ranges), [this] (query_state& qs) { return do_until(std::bind(&query_state::done, &qs), [this, &qs] { auto&& range = *qs.current_partition_range++; qs.reader = make_reader(qs.schema, range, service::get_local_sstable_query_read_priority()); qs.range_empty = false; return do_until([&qs] { return !qs.limit || qs.range_empty; }, [&qs] { return qs.reader().then([&qs](mutation_opt mo) { if (mo) { auto p_builder = qs.builder.add_partition(*mo->schema(), mo->key()); auto is_distinct = qs.cmd.slice.options.contains(query::partition_slice::option::distinct); auto limit = !is_distinct ? qs.limit : 1; mo->partition().query(p_builder, *qs.schema, qs.cmd.timestamp, limit); qs.limit -= p_builder.row_count(); } else { qs.range_empty = true; } }); }); }).then([&qs] { return make_ready_future>( make_lw_shared(qs.builder.build())); }); }).finally([lc, this]() mutable { _stats.reads.mark(lc); if (lc.is_start()) { _stats.estimated_read.add(lc.latency(), _stats.reads.count); } }); } mutation_source column_family::as_mutation_source() const { return mutation_source([this] (schema_ptr s, const query::partition_range& range, const io_priority_class& pc) { return this->make_reader(std::move(s), range, pc); }); } future> database::query(schema_ptr s, const query::read_command& cmd, const std::vector& ranges) { column_family& cf = find_column_family(cmd.cf_id); return cf.query(std::move(s), cmd, ranges); } future database::query_mutations(schema_ptr s, const query::read_command& cmd, const query::partition_range& range) { column_family& cf = find_column_family(cmd.cf_id); return mutation_query(std::move(s), cf.as_mutation_source(), range, cmd.slice, cmd.row_limit, cmd.timestamp); } std::unordered_set database::get_initial_tokens() { std::unordered_set tokens; sstring tokens_string = get_config().initial_token(); try { boost::split(tokens, tokens_string, boost::is_any_of(sstring(","))); } catch (...) { throw std::runtime_error(sprint("Unable to parse initial_token=%s", tokens_string)); } tokens.erase(""); return tokens; } std::experimental::optional database::get_replace_address() { auto& cfg = get_config(); sstring replace_address = cfg.replace_address(); sstring replace_address_first_boot = cfg.replace_address_first_boot(); try { if (!replace_address.empty()) { return gms::inet_address(replace_address); } else if (!replace_address_first_boot.empty()) { return gms::inet_address(replace_address_first_boot); } return std::experimental::nullopt; } catch (...) { return std::experimental::nullopt; } } bool database::is_replacing() { sstring replace_address_first_boot = get_config().replace_address_first_boot(); if (!replace_address_first_boot.empty() && db::system_keyspace::bootstrap_complete()) { dblog.info("Replace address on first boot requested; this node is already bootstrapped"); return false; } return bool(get_replace_address()); } std::ostream& operator<<(std::ostream& out, const atomic_cell_or_collection& c) { return out << to_hex(c._data); } std::ostream& operator<<(std::ostream& os, const mutation& m) { const ::schema& s = *m.schema(); fprint(os, "{%s.%s key %s data ", s.ks_name(), s.cf_name(), m.decorated_key()); os << m.partition() << "}"; return os; } std::ostream& operator<<(std::ostream& out, const column_family& cf) { return fprint(out, "{column_family: %s/%s}", cf._schema->ks_name(), cf._schema->cf_name()); } std::ostream& operator<<(std::ostream& out, const database& db) { out << "{\n"; for (auto&& e : db._column_families) { auto&& cf = *e.second; out << "(" << e.first.to_sstring() << ", " << cf.schema()->cf_name() << ", " << cf.schema()->ks_name() << "): " << cf << "\n"; } out << "}"; return out; } void column_family::apply(const mutation& m, const db::replay_position& rp) { utils::latency_counter lc; _stats.writes.set_latency(lc); active_memtable().apply(m, rp); seal_on_overflow(); _stats.writes.mark(lc); if (lc.is_start()) { _stats.estimated_write.add(lc.latency(), _stats.writes.count); } } void column_family::apply(const frozen_mutation& m, const schema_ptr& m_schema, const db::replay_position& rp) { utils::latency_counter lc; _stats.writes.set_latency(lc); check_valid_rp(rp); active_memtable().apply(m, m_schema, rp); seal_on_overflow(); _stats.writes.mark(lc); if (lc.is_start()) { _stats.estimated_write.add(lc.latency(), _stats.writes.count); } } void column_family::seal_on_overflow() { ++_mutation_count; if (active_memtable().occupancy().total_space() >= _config.max_memtable_size) { // FIXME: if sparse, do some in-memory compaction first // FIXME: maybe merge with other in-memory memtables _mutation_count = 0; seal_active_memtable(); } } void column_family::check_valid_rp(const db::replay_position& rp) const { if (rp < _highest_flushed_rp) { throw replay_position_reordered_exception(); } } future<> database::apply_in_memory(const frozen_mutation& m, const schema_ptr& m_schema, const db::replay_position& rp) { try { auto& cf = find_column_family(m.column_family_id()); cf.apply(m, m_schema, rp); } catch (no_such_column_family&) { dblog.error("Attempting to mutate non-existent table {}", m.column_family_id()); } return make_ready_future<>(); } future<> database::do_apply(schema_ptr s, const frozen_mutation& m) { // I'm doing a nullcheck here since the init code path for db etc // is a little in flux and commitlog is created only when db is // initied from datadir. auto uuid = m.column_family_id(); auto& cf = find_column_family(uuid); if (!s->is_synced()) { throw std::runtime_error(sprint("attempted to mutate using not synced schema of %s.%s, version=%s", s->ks_name(), s->cf_name(), s->version())); } if (cf.commitlog() != nullptr) { commitlog_entry_writer cew(s, m); return cf.commitlog()->add_entry(uuid, cew).then([&m, this, s](auto rp) { try { return this->apply_in_memory(m, s, rp); } catch (replay_position_reordered_exception&) { // expensive, but we're assuming this is super rare. // if we failed to apply the mutation due to future re-ordering // (which should be the ever only reason for rp mismatch in CF) // let's just try again, add the mutation to the CL once more, // and assume success in inevitable eventually. dblog.debug("replay_position reordering detected"); return this->apply(s, m); } }); } return apply_in_memory(m, s, db::replay_position()); } future<> database::throttle() { if (_dirty_memory_region_group.memory_used() < _memtable_total_space && _throttled_requests.empty()) { // All is well, go ahead return make_ready_future<>(); } // We must throttle, wait a bit if (_throttled_requests.empty()) { _throttling_timer.arm_periodic(10ms); } _throttled_requests.emplace_back(); return _throttled_requests.back().get_future(); } void database::unthrottle() { // Release one request per free 1MB we have // FIXME: improve this if (_dirty_memory_region_group.memory_used() >= _memtable_total_space) { return; } size_t avail = (_memtable_total_space - _dirty_memory_region_group.memory_used()) >> 20; avail = std::min(_throttled_requests.size(), avail); for (size_t i = 0; i < avail; ++i) { _throttled_requests.front().set_value(); _throttled_requests.pop_front(); } if (_throttled_requests.empty()) { _throttling_timer.cancel(); } } future<> database::apply(schema_ptr s, const frozen_mutation& m) { if (dblog.is_enabled(logging::log_level::trace)) { dblog.trace("apply {}", m.pretty_printer(s)); } return throttle().then([this, &m, s = std::move(s)] { return do_apply(std::move(s), m); }); } keyspace::config database::make_keyspace_config(const keyspace_metadata& ksm) { // FIXME support multiple directories keyspace::config cfg; if (_cfg->data_file_directories().size() > 0) { cfg.datadir = sprint("%s/%s", _cfg->data_file_directories()[0], ksm.name()); cfg.enable_disk_writes = !_cfg->enable_in_memory_data_store(); cfg.enable_disk_reads = true; // we allways read from disk cfg.enable_commitlog = ksm.durable_writes() && _cfg->enable_commitlog() && !_cfg->enable_in_memory_data_store(); cfg.enable_cache = _cfg->enable_cache(); cfg.max_memtable_size = _memtable_total_space * _cfg->memtable_cleanup_threshold(); } else { cfg.datadir = ""; cfg.enable_disk_writes = false; cfg.enable_disk_reads = false; cfg.enable_commitlog = false; cfg.enable_cache = false; cfg.max_memtable_size = std::numeric_limits::max(); } cfg.dirty_memory_region_group = &_dirty_memory_region_group; cfg.cf_stats = &_cf_stats; cfg.enable_incremental_backups = _cfg->incremental_backups(); return cfg; } namespace db { std::ostream& operator<<(std::ostream& os, db::consistency_level cl) { switch (cl) { case db::consistency_level::ANY: return os << "ANY"; case db::consistency_level::ONE: return os << "ONE"; case db::consistency_level::TWO: return os << "TWO"; case db::consistency_level::THREE: return os << "THREE"; case db::consistency_level::QUORUM: return os << "QUORUM"; case db::consistency_level::ALL: return os << "ALL"; case db::consistency_level::LOCAL_QUORUM: return os << "LOCAL_QUORUM"; case db::consistency_level::EACH_QUORUM: return os << "EACH_QUORUM"; case db::consistency_level::SERIAL: return os << "SERIAL"; case db::consistency_level::LOCAL_SERIAL: return os << "LOCAL_SERIAL"; case db::consistency_level::LOCAL_ONE: return os << "LOCAL"; default: abort(); } } } std::ostream& operator<<(std::ostream& os, const exploded_clustering_prefix& ecp) { // Can't pass to_hex() to transformed(), since it is overloaded, so wrap: auto enhex = [] (auto&& x) { return to_hex(x); }; return fprint(os, "prefix{%s}", ::join(":", ecp._v | boost::adaptors::transformed(enhex))); } std::ostream& operator<<(std::ostream& os, const atomic_cell_view& acv) { if (acv.is_live()) { return fprint(os, "atomic_cell{%s;ts=%d;expiry=%d,ttl=%d}", to_hex(acv.value()), acv.timestamp(), acv.is_live_and_has_ttl() ? acv.expiry().time_since_epoch().count() : -1, acv.is_live_and_has_ttl() ? acv.ttl().count() : 0); } else { return fprint(os, "atomic_cell{DEAD;ts=%d;deletion_time=%d}", acv.timestamp(), acv.deletion_time().time_since_epoch().count()); } } std::ostream& operator<<(std::ostream& os, const atomic_cell& ac) { return os << atomic_cell_view(ac); } future<> database::stop() { return _compaction_manager.stop().then([this] { // try to ensure that CL has done disk flushing if (_commitlog != nullptr) { return _commitlog->shutdown(); } return make_ready_future<>(); }).then([this] { return parallel_for_each(_column_families, [this] (auto& val_pair) { return val_pair.second->stop(); }); }); } future<> database::flush_all_memtables() { return parallel_for_each(_column_families, [this] (auto& cfp) { return cfp.second->flush(); }); } future<> database::truncate(db_clock::time_point truncated_at, sstring ksname, sstring cfname) { auto& ks = find_keyspace(ksname); auto& cf = find_column_family(ksname, cfname); return truncate(truncated_at, ks, cf); } future<> database::truncate(db_clock::time_point truncated_at, const keyspace& ks, column_family& cf) { const auto durable = ks.metadata()->durable_writes(); const auto auto_snapshot = get_config().auto_snapshot(); future<> f = make_ready_future<>(); if (durable || auto_snapshot) { // TODO: // this is not really a guarantee at all that we've actually // gotten all things to disk. Again, need queue-ish or something. f = cf.flush(); } else { cf.clear(); } return cf.run_with_compaction_disabled([truncated_at, f = std::move(f), &cf, auto_snapshot, cfname = cf.schema()->cf_name()]() mutable { return f.then([truncated_at, &cf, auto_snapshot, cfname = std::move(cfname)] { dblog.debug("Discarding sstable data for truncated CF + indexes"); // TODO: notify truncation future<> f = make_ready_future<>(); if (auto_snapshot) { auto name = sprint("%d-%s", truncated_at.time_since_epoch().count(), cfname); f = cf.snapshot(name); } return f.then([&cf, truncated_at] { return cf.discard_sstables(truncated_at).then([&cf, truncated_at](db::replay_position rp) { // TODO: indexes. return db::system_keyspace::save_truncation_record(cf, truncated_at, rp); }); }); }); }); } const sstring& database::get_snitch_name() const { return _cfg->endpoint_snitch(); } // For the filesystem operations, this code will assume that all keyspaces are visible in all shards // (as we have been doing for a lot of the other operations, like the snapshot itself). future<> database::clear_snapshot(sstring tag, std::vector keyspace_names) { std::vector> keyspaces; if (keyspace_names.empty()) { // if keyspace names are not given - apply to all existing local keyspaces for (auto& ks: _keyspaces) { keyspaces.push_back(std::reference_wrapper(ks.second)); } } else { for (auto& ksname: keyspace_names) { try { keyspaces.push_back(std::reference_wrapper(find_keyspace(ksname))); } catch (no_such_keyspace& e) { return make_exception_future(std::current_exception()); } } } return parallel_for_each(keyspaces, [this, tag] (auto& ks) { return parallel_for_each(ks.get().metadata()->cf_meta_data(), [this, tag] (auto& pair) { auto& cf = this->find_column_family(pair.second); return cf.clear_snapshot(tag); }).then_wrapped([] (future<> f) { dblog.debug("Cleared out snapshot directories"); }); }); } future<> update_schema_version_and_announce(distributed& proxy) { return db::schema_tables::calculate_schema_digest(proxy).then([&proxy] (utils::UUID uuid) { return proxy.local().get_db().invoke_on_all([uuid] (database& db) { db.update_version(uuid); return make_ready_future<>(); }).then([uuid] { return db::system_keyspace::update_schema_version(uuid).then([uuid] { return service::get_local_migration_manager().passive_announce(uuid); }); }); }); } // Snapshots: snapshotting the files themselves is easy: if more than one CF // happens to link an SSTable twice, all but one will fail, and we will end up // with one copy. // // The problem for us, is that the snapshot procedure is supposed to leave a // manifest file inside its directory. So if we just call snapshot() from // multiple shards, only the last one will succeed, writing its own SSTables to // the manifest leaving all other shards' SSTables unaccounted for. // // Moreover, for things like drop table, the operation should only proceed when the // snapshot is complete. That includes the manifest file being correctly written, // and for this reason we need to wait for all shards to finish their snapshotting // before we can move on. // // To know which files we must account for in the manifest, we will keep an // SSTable set. Theoretically, we could just rescan the snapshot directory and // see what's in there. But we would need to wait for all shards to finish // before we can do that anyway. That is the hard part, and once that is done // keeping the files set is not really a big deal. // // This code assumes that all shards will be snapshotting at the same time. So // far this is a safe assumption, but if we ever want to take snapshots from a // group of shards only, this code will have to be updated to account for that. struct snapshot_manager { std::unordered_set files; semaphore requests; semaphore manifest_write; snapshot_manager() : requests(0), manifest_write(0) {} }; static thread_local std::unordered_map> pending_snapshots; static future<> seal_snapshot(sstring jsondir) { std::ostringstream ss; int n = 0; ss << "{" << std::endl << "\t\"files\" : [ "; for (auto&& rf: pending_snapshots.at(jsondir)->files) { if (n++ > 0) { ss << ", "; } ss << "\"" << rf << "\""; } ss << " ]" << std::endl << "}" << std::endl; auto json = ss.str(); auto jsonfile = jsondir + "/manifest.json"; dblog.debug("Storing manifest {}", jsonfile); return recursive_touch_directory(jsondir).then([jsonfile, json = std::move(json)] { return open_file_dma(jsonfile, open_flags::wo | open_flags::create | open_flags::truncate).then([json](file f) { return do_with(make_file_output_stream(std::move(f)), [json] (output_stream& out) { return out.write(json.c_str(), json.size()).then([&out] { return out.flush(); }).then([&out] { return out.close(); }); }); }); }).then([jsondir] { return sync_directory(std::move(jsondir)); }).finally([jsondir] { pending_snapshots.erase(jsondir); return make_ready_future<>(); }); } future<> column_family::snapshot(sstring name) { return flush().then([this, name = std::move(name)]() { auto tables = boost::copy_range>(*_sstables | boost::adaptors::map_values); return do_with(std::move(tables), [this, name](std::vector & tables) { auto jsondir = _config.datadir + "/snapshots/" + name; return parallel_for_each(tables, [name](sstables::shared_sstable sstable) { auto dir = sstable->get_dir() + "/snapshots/" + name; return recursive_touch_directory(dir).then([sstable, dir] { return sstable->create_links(dir).then_wrapped([] (future<> f) { // If the SSTables are shared, one of the CPUs will fail here. // That is completely fine, though. We only need one link. try { f.get(); } catch (std::system_error& e) { if (e.code() != std::error_code(EEXIST, std::system_category())) { throw; } } return make_ready_future<>(); }); }); }).then([jsondir, &tables] { // This is not just an optimization. If we have no files, jsondir may not have been created, // and sync_directory would throw. if (tables.size()) { return sync_directory(std::move(jsondir)); } else { return make_ready_future<>(); } }).finally([this, &tables, jsondir] { auto shard = std::hash()(jsondir) % smp::count; std::unordered_set table_names; for (auto& sst : tables) { auto f = sst->get_filename(); auto rf = f.substr(sst->get_dir().size() + 1); table_names.insert(std::move(rf)); } return smp::submit_to(shard, [requester = engine().cpu_id(), jsondir = std::move(jsondir), tables = std::move(table_names), datadir = _config.datadir] { if (pending_snapshots.count(jsondir) == 0) { pending_snapshots.emplace(jsondir, make_lw_shared()); } auto snapshot = pending_snapshots.at(jsondir); for (auto&& sst: tables) { snapshot->files.insert(std::move(sst)); } snapshot->requests.signal(1); auto my_work = make_ready_future<>(); if (requester == engine().cpu_id()) { my_work = snapshot->requests.wait(smp::count).then([jsondir = std::move(jsondir), snapshot] () mutable { return seal_snapshot(jsondir).then([snapshot] { snapshot->manifest_write.signal(smp::count); return make_ready_future<>(); }); }); } return my_work.then([snapshot] { return snapshot->manifest_write.wait(1); }).then([snapshot] {}); }); }); }); }); } future column_family::snapshot_exists(sstring tag) { sstring jsondir = _config.datadir + "/snapshots/" + tag; return engine().open_directory(std::move(jsondir)).then_wrapped([] (future f) { try { f.get0(); return make_ready_future(true); } catch (std::system_error& e) { if (e.code() != std::error_code(ENOENT, std::system_category())) { throw; } return make_ready_future(false); } }); } enum class missing { no, yes }; static missing file_missing(future<> f) { try { f.get(); return missing::no; } catch (std::system_error& e) { if (e.code() != std::error_code(ENOENT, std::system_category())) { throw; } return missing::yes; } } future<> column_family::clear_snapshot(sstring tag) { sstring jsondir = _config.datadir + "/snapshots/"; sstring parent = _config.datadir; if (!tag.empty()) { jsondir += tag; parent += "/snapshots/"; } lister::dir_entry_types dir_and_files = { directory_entry_type::regular, directory_entry_type::directory }; return lister::scan_dir(jsondir, dir_and_files, [this, curr_dir = jsondir, dir_and_files, tag] (directory_entry de) { // FIXME: We really need a better directory walker. This should eventually be part of the seastar infrastructure. // It's hard to write this in a fully recursive manner because we need to keep information about the parent directory, // so we can remove the file. For now, we'll take advantage of the fact that we will at most visit 2 levels and keep // it ugly but simple. auto recurse = make_ready_future<>(); if (de.type == directory_entry_type::directory) { // Should only recurse when tag is empty, meaning delete all snapshots if (!tag.empty()) { throw std::runtime_error(sprint("Unexpected directory %s found at %s! Aborting", de.name, curr_dir)); } auto newdir = curr_dir + "/" + de.name; recurse = lister::scan_dir(newdir, dir_and_files, [this, curr_dir = newdir] (directory_entry de) { return remove_file(curr_dir + "/" + de.name); }); } return recurse.then([fname = curr_dir + "/" + de.name] { return remove_file(fname); }); }).then_wrapped([jsondir] (future<> f) { // Fine if directory does not exist. If it did, we delete it if (file_missing(std::move(f)) == missing::no) { return remove_file(jsondir); } return make_ready_future<>(); }).then([parent] { return sync_directory(parent).then_wrapped([] (future<> f) { // Should always exist for empty tags, but may not exist for a single tag if we never took // snapshots. We will check this here just to mask out the exception, without silencing // unexpected ones. file_missing(std::move(f)); return make_ready_future<>(); }); }); } future> column_family::get_snapshot_details() { std::unordered_map all_snapshots; return do_with(std::move(all_snapshots), [this] (auto& all_snapshots) { return engine().file_exists(_config.datadir + "/snapshots").then([this, &all_snapshots](bool file_exists) { if (!file_exists) { return make_ready_future<>(); } return lister::scan_dir(_config.datadir + "/snapshots", { directory_entry_type::directory }, [this, &all_snapshots] (directory_entry de) { auto snapshot_name = de.name; auto snapshot = _config.datadir + "/snapshots/" + snapshot_name; all_snapshots.emplace(snapshot_name, snapshot_details()); return lister::scan_dir(snapshot, { directory_entry_type::regular }, [this, &all_snapshots, snapshot, snapshot_name] (directory_entry de) { return file_size(snapshot + "/" + de.name).then([this, &all_snapshots, snapshot_name, name = de.name] (auto size) { // The manifest is the only file expected to be in this directory not belonging to the SSTable. // For it, we account the total size, but zero it for the true size calculation. // // All the others should just generate an exception: there is something wrong, so don't blindly // add it to the size. if (name != "manifest.json") { sstables::entry_descriptor::make_descriptor(name); all_snapshots.at(snapshot_name).total += size; } else { size = 0; } return make_ready_future(size); }).then([this, &all_snapshots, snapshot_name, name = de.name] (auto size) { // FIXME: When we support multiple data directories, the file may not necessarily // live in this same location. May have to test others as well. return file_size(_config.datadir + "/" + name).then_wrapped([&all_snapshots, snapshot_name, size] (auto fut) { try { // File exists in the main SSTable directory. Snapshots are not contributing to size fut.get0(); } catch (std::system_error& e) { if (e.code() != std::error_code(ENOENT, std::system_category())) { throw; } all_snapshots.at(snapshot_name).live += size; } return make_ready_future<>(); }); }); }); }); }).then([&all_snapshots] { return std::move(all_snapshots); }); }); } future<> column_family::flush() { // FIXME: this will synchronously wait for this write to finish, but doesn't guarantee // anything about previous writes. _stats.pending_flushes++; return seal_active_memtable().finally([this]() mutable { _stats.pending_flushes--; // In origin memtable_switch_count is incremented inside // ColumnFamilyMeetrics Flush.run _stats.memtable_switch_count++; return make_ready_future<>(); }); } future<> column_family::flush(const db::replay_position& pos) { // Technically possible if we've already issued the // sstable write, but it is not done yet. if (pos < _highest_flushed_rp) { return make_ready_future<>(); } // TODO: Origin looks at "secondary" memtables // It also consideres "minReplayPosition", which is simply where // the CL "started" (the first ever RP in this run). // We ignore this for now and just say that if we're asked for // a CF and it exists, we pretty much have to have data that needs // flushing. Let's do it. return seal_active_memtable(); } void column_family::clear() { _cache.clear(); _memtables->clear(); add_memtable(); } // NOTE: does not need to be futurized, but might eventually, depending on // if we implement notifications, whatnot. future column_family::discard_sstables(db_clock::time_point truncated_at) { assert(_stats.pending_compactions == 0); return with_lock(_sstables_lock.for_read(), [this, truncated_at] { db::replay_position rp; auto gc_trunc = to_gc_clock(truncated_at); auto pruned = make_lw_shared(); for (auto&p : *_sstables) { if (p.second->max_data_age() <= gc_trunc) { rp = std::max(p.second->get_stats_metadata().position, rp); p.second->mark_for_deletion(); continue; } pruned->emplace(p.first, p.second); } _sstables = std::move(pruned); dblog.debug("cleaning out row cache"); _cache.clear(); return make_ready_future(rp); }); } std::ostream& operator<<(std::ostream& os, const user_types_metadata& m) { os << "org.apache.cassandra.config.UTMetaData@" << &m; return os; } std::ostream& operator<<(std::ostream& os, const keyspace_metadata& m) { os << "KSMetaData{"; os << "name=" << m._name; os << ", strategyClass=" << m._strategy_name; os << ", strategyOptions={"; int n = 0; for (auto& p : m._strategy_options) { if (n++ != 0) { os << ", "; } os << p.first << "=" << p.second; } os << "}"; os << ", cfMetaData={"; n = 0; for (auto& p : m._cf_meta_data) { if (n++ != 0) { os << ", "; } os << p.first << "=" << p.second; } os << "}"; os << ", durable_writes=" << m._durable_writes; os << ", userTypes=" << m._user_types; os << "}"; return os; } void column_family::set_schema(schema_ptr s) { dblog.debug("Changing schema version of {}.{} ({}) from {} to {}", _schema->ks_name(), _schema->cf_name(), _schema->id(), _schema->version(), s->version()); for (auto& m : *_memtables) { m->set_schema(s); } _cache.set_schema(s); _schema = std::move(s); }