diff --git a/database.cc b/database.cc index 86b401a107..dba966c7ac 100644 --- a/database.cc +++ b/database.cc @@ -763,6 +763,8 @@ column_family::load_new_sstables(std::vector new_tab auto last = sst->get_last_partition_key(*_schema); if (belongs_to_current_shard(*_schema, first, last)) { this->add_sstable(sst); + } else { + sst->mark_for_deletion(); } return make_ready_future<>(); }); @@ -855,79 +857,79 @@ future<> column_family::populate(sstring sstdir) { auto verifier = make_lw_shared>(); auto descriptor = make_lw_shared(); - return do_with(std::vector>(), [this, sstdir, verifier, descriptor] (std::vector>& futures) { - return lister::scan_dir(sstdir, { directory_entry_type::regular }, [this, sstdir, verifier, descriptor, &futures] (directory_entry de) { - // FIXME: The secondary indexes are in this level, but with a directory type, (starting with ".") - auto f = probe_file(sstdir, de.name).then([verifier, descriptor] (auto entry) { - if (verifier->count(entry.generation)) { - if (verifier->at(entry.generation) == status::has_toc_file) { - if (entry.component == sstables::sstable::component_type::TOC) { - throw sstables::malformed_sstable_exception("Invalid State encountered. TOC file already processed"); + return do_with(std::vector>(), [this, sstdir, verifier, descriptor] (std::vector>& futures) { + return lister::scan_dir(sstdir, { directory_entry_type::regular }, [this, sstdir, verifier, descriptor, &futures] (directory_entry de) { + // FIXME: The secondary indexes are in this level, but with a directory type, (starting with ".") + auto f = probe_file(sstdir, de.name).then([verifier, descriptor] (auto entry) { + if (verifier->count(entry.generation)) { + if (verifier->at(entry.generation) == status::has_toc_file) { + if (entry.component == sstables::sstable::component_type::TOC) { + throw sstables::malformed_sstable_exception("Invalid State encountered. TOC file already processed"); + } else if (entry.component == sstables::sstable::component_type::TemporaryTOC) { + throw sstables::malformed_sstable_exception("Invalid State encountered. Temporary TOC file found after TOC file was processed"); + } + } else if (entry.component == sstables::sstable::component_type::TOC) { + verifier->at(entry.generation) = status::has_toc_file; } else if (entry.component == sstables::sstable::component_type::TemporaryTOC) { - throw sstables::malformed_sstable_exception("Invalid State encountered. Temporary TOC file found after TOC file was processed"); + verifier->at(entry.generation) = status::has_temporary_toc_file; } - } else if (entry.component == sstables::sstable::component_type::TOC) { - verifier->at(entry.generation) = status::has_toc_file; - } else if (entry.component == sstables::sstable::component_type::TemporaryTOC) { - verifier->at(entry.generation) = status::has_temporary_toc_file; - } - } else { - if (entry.component == sstables::sstable::component_type::TOC) { - verifier->emplace(entry.generation, status::has_toc_file); - } else if (entry.component == sstables::sstable::component_type::TemporaryTOC) { - verifier->emplace(entry.generation, status::has_temporary_toc_file); } else { - verifier->emplace(entry.generation, status::has_some_file); + if (entry.component == sstables::sstable::component_type::TOC) { + verifier->emplace(entry.generation, status::has_toc_file); + } else if (entry.component == sstables::sstable::component_type::TemporaryTOC) { + verifier->emplace(entry.generation, status::has_temporary_toc_file); + } else { + verifier->emplace(entry.generation, status::has_some_file); + } } - } - // Retrieve both version and format used for this column family. - if (!descriptor->version) { - descriptor->version = entry.version; - } - if (!descriptor->format) { - descriptor->format = entry.format; - } - }); - - // push future returned by probe_file into an array of futures, - // so that the supplied callback will not block scan_dir() from - // reading the next entry in the directory. - futures.push_back(std::move(f)); - - return make_ready_future<>(); - }).then([&futures] { - return when_all(futures.begin(), futures.end()).then([] (std::vector> ret) { - try { - for (auto& f : ret) { - f.get(); + // Retrieve both version and format used for this column family. + if (!descriptor->version) { + descriptor->version = entry.version; } - } catch(...) { - throw; - } - }); - }).then([verifier, sstdir, descriptor, this] { - return parallel_for_each(*verifier, [sstdir = std::move(sstdir), descriptor, this] (auto v) { - if (v.second == status::has_temporary_toc_file) { - unsigned long gen = v.first; - assert(descriptor->version); - sstables::sstable::version_types version = descriptor->version.value(); - assert(descriptor->format); - sstables::sstable::format_types format = descriptor->format.value(); - - if (engine().cpu_id() != 0) { - dblog.info("At directory: {}, partial SSTable with generation {} not relevant for this shard, ignoring", sstdir, v.first); - return make_ready_future<>(); + if (!descriptor->format) { + descriptor->format = entry.format; } - // shard 0 is the responsible for removing a partial sstable. - return sstables::sstable::remove_sstable_with_temp_toc(_schema->ks_name(), _schema->cf_name(), sstdir, gen, version, format); - } else if (v.second != status::has_toc_file) { - throw sstables::malformed_sstable_exception(sprint("At directory: %s: no TOC found for SSTable with generation %d!. Refusing to boot", sstdir, v.first)); - } + }); + + // push future returned by probe_file into an array of futures, + // so that the supplied callback will not block scan_dir() from + // reading the next entry in the directory. + futures.push_back(std::move(f)); + return make_ready_future<>(); + }).then([&futures] { + return when_all(futures.begin(), futures.end()).then([] (std::vector> ret) { + try { + for (auto& f : ret) { + f.get(); + } + } catch(...) { + throw; + } + }); + }).then([verifier, sstdir, descriptor, this] { + return parallel_for_each(*verifier, [sstdir = std::move(sstdir), descriptor, this] (auto v) { + if (v.second == status::has_temporary_toc_file) { + unsigned long gen = v.first; + assert(descriptor->version); + sstables::sstable::version_types version = descriptor->version.value(); + assert(descriptor->format); + sstables::sstable::format_types format = descriptor->format.value(); + + if (engine().cpu_id() != 0) { + dblog.info("At directory: {}, partial SSTable with generation {} not relevant for this shard, ignoring", sstdir, v.first); + return make_ready_future<>(); + } + // shard 0 is the responsible for removing a partial sstable. + return sstables::sstable::remove_sstable_with_temp_toc(_schema->ks_name(), _schema->cf_name(), sstdir, gen, version, format); + } else if (v.second != status::has_toc_file) { + throw sstables::malformed_sstable_exception(sprint("At directory: %s: no TOC found for SSTable with generation %d!. Refusing to boot", sstdir, v.first)); + } + return make_ready_future<>(); + }); }); }); - }); } utils::UUID database::empty_version = utils::UUID_gen::get_name_UUID(bytes{}); diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 3577c7bfd5..9d0d3b22c5 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1361,7 +1361,6 @@ future<> sstable::write_components(const memtable& mt) { future<> sstable::write_components(::mutation_reader mr, uint64_t estimated_partitions, schema_ptr schema, uint64_t max_sstable_size) { return seastar::async([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema), max_sstable_size] () mutable { - // FIXME: write all components generate_toc(schema->get_compressor_params().get_compressor(), schema->bloom_filter_fp_chance()); write_toc(); create_data().get(); diff --git a/sstables/sstables.hh b/sstables/sstables.hh index f3cbf86399..4d6bcd1fa6 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -230,8 +230,11 @@ public: _summary.header.min_index_interval; } - // mark_for_deletion() specifies that the on-disk files for this sstable - // should be deleted as soon as the in-memory object is destructed. + // mark_for_deletion() specifies that a sstable isn't relevant to the + // current shard, and thus can be deleted by the deletion manager, if + // all shards sharing it agree. In case the sstable is unshared, it's + // guaranteed that all of its on-disk files will be deleted as soon as + // the in-memory object is destroyed. void mark_for_deletion() { _marked_for_deletion = true; }