mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-31 12:06:44 +00:00
Merge "cleanups and improvements" from Raphael
This commit is contained in:
128
database.cc
128
database.cc
@@ -763,6 +763,8 @@ column_family::load_new_sstables(std::vector<sstables::entry_descriptor> new_tab
|
||||
auto last = sst->get_last_partition_key(*_schema);
|
||||
if (belongs_to_current_shard(*_schema, first, last)) {
|
||||
this->add_sstable(sst);
|
||||
} else {
|
||||
sst->mark_for_deletion();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
@@ -855,79 +857,79 @@ future<> column_family::populate(sstring sstdir) {
|
||||
auto verifier = make_lw_shared<std::unordered_map<unsigned long, status>>();
|
||||
auto descriptor = make_lw_shared<sstable_descriptor>();
|
||||
|
||||
return do_with(std::vector<future<>>(), [this, sstdir, verifier, descriptor] (std::vector<future<>>& futures) {
|
||||
return lister::scan_dir(sstdir, { directory_entry_type::regular }, [this, sstdir, verifier, descriptor, &futures] (directory_entry de) {
|
||||
// FIXME: The secondary indexes are in this level, but with a directory type, (starting with ".")
|
||||
auto f = probe_file(sstdir, de.name).then([verifier, descriptor] (auto entry) {
|
||||
if (verifier->count(entry.generation)) {
|
||||
if (verifier->at(entry.generation) == status::has_toc_file) {
|
||||
if (entry.component == sstables::sstable::component_type::TOC) {
|
||||
throw sstables::malformed_sstable_exception("Invalid State encountered. TOC file already processed");
|
||||
return do_with(std::vector<future<>>(), [this, sstdir, verifier, descriptor] (std::vector<future<>>& futures) {
|
||||
return lister::scan_dir(sstdir, { directory_entry_type::regular }, [this, sstdir, verifier, descriptor, &futures] (directory_entry de) {
|
||||
// FIXME: The secondary indexes are in this level, but with a directory type, (starting with ".")
|
||||
auto f = probe_file(sstdir, de.name).then([verifier, descriptor] (auto entry) {
|
||||
if (verifier->count(entry.generation)) {
|
||||
if (verifier->at(entry.generation) == status::has_toc_file) {
|
||||
if (entry.component == sstables::sstable::component_type::TOC) {
|
||||
throw sstables::malformed_sstable_exception("Invalid State encountered. TOC file already processed");
|
||||
} else if (entry.component == sstables::sstable::component_type::TemporaryTOC) {
|
||||
throw sstables::malformed_sstable_exception("Invalid State encountered. Temporary TOC file found after TOC file was processed");
|
||||
}
|
||||
} else if (entry.component == sstables::sstable::component_type::TOC) {
|
||||
verifier->at(entry.generation) = status::has_toc_file;
|
||||
} else if (entry.component == sstables::sstable::component_type::TemporaryTOC) {
|
||||
throw sstables::malformed_sstable_exception("Invalid State encountered. Temporary TOC file found after TOC file was processed");
|
||||
verifier->at(entry.generation) = status::has_temporary_toc_file;
|
||||
}
|
||||
} else if (entry.component == sstables::sstable::component_type::TOC) {
|
||||
verifier->at(entry.generation) = status::has_toc_file;
|
||||
} else if (entry.component == sstables::sstable::component_type::TemporaryTOC) {
|
||||
verifier->at(entry.generation) = status::has_temporary_toc_file;
|
||||
}
|
||||
} else {
|
||||
if (entry.component == sstables::sstable::component_type::TOC) {
|
||||
verifier->emplace(entry.generation, status::has_toc_file);
|
||||
} else if (entry.component == sstables::sstable::component_type::TemporaryTOC) {
|
||||
verifier->emplace(entry.generation, status::has_temporary_toc_file);
|
||||
} else {
|
||||
verifier->emplace(entry.generation, status::has_some_file);
|
||||
if (entry.component == sstables::sstable::component_type::TOC) {
|
||||
verifier->emplace(entry.generation, status::has_toc_file);
|
||||
} else if (entry.component == sstables::sstable::component_type::TemporaryTOC) {
|
||||
verifier->emplace(entry.generation, status::has_temporary_toc_file);
|
||||
} else {
|
||||
verifier->emplace(entry.generation, status::has_some_file);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Retrieve both version and format used for this column family.
|
||||
if (!descriptor->version) {
|
||||
descriptor->version = entry.version;
|
||||
}
|
||||
if (!descriptor->format) {
|
||||
descriptor->format = entry.format;
|
||||
}
|
||||
});
|
||||
|
||||
// push future returned by probe_file into an array of futures,
|
||||
// so that the supplied callback will not block scan_dir() from
|
||||
// reading the next entry in the directory.
|
||||
futures.push_back(std::move(f));
|
||||
|
||||
return make_ready_future<>();
|
||||
}).then([&futures] {
|
||||
return when_all(futures.begin(), futures.end()).then([] (std::vector<future<>> ret) {
|
||||
try {
|
||||
for (auto& f : ret) {
|
||||
f.get();
|
||||
// Retrieve both version and format used for this column family.
|
||||
if (!descriptor->version) {
|
||||
descriptor->version = entry.version;
|
||||
}
|
||||
} catch(...) {
|
||||
throw;
|
||||
}
|
||||
});
|
||||
}).then([verifier, sstdir, descriptor, this] {
|
||||
return parallel_for_each(*verifier, [sstdir = std::move(sstdir), descriptor, this] (auto v) {
|
||||
if (v.second == status::has_temporary_toc_file) {
|
||||
unsigned long gen = v.first;
|
||||
assert(descriptor->version);
|
||||
sstables::sstable::version_types version = descriptor->version.value();
|
||||
assert(descriptor->format);
|
||||
sstables::sstable::format_types format = descriptor->format.value();
|
||||
|
||||
if (engine().cpu_id() != 0) {
|
||||
dblog.info("At directory: {}, partial SSTable with generation {} not relevant for this shard, ignoring", sstdir, v.first);
|
||||
return make_ready_future<>();
|
||||
if (!descriptor->format) {
|
||||
descriptor->format = entry.format;
|
||||
}
|
||||
// shard 0 is the responsible for removing a partial sstable.
|
||||
return sstables::sstable::remove_sstable_with_temp_toc(_schema->ks_name(), _schema->cf_name(), sstdir, gen, version, format);
|
||||
} else if (v.second != status::has_toc_file) {
|
||||
throw sstables::malformed_sstable_exception(sprint("At directory: %s: no TOC found for SSTable with generation %d!. Refusing to boot", sstdir, v.first));
|
||||
}
|
||||
});
|
||||
|
||||
// push future returned by probe_file into an array of futures,
|
||||
// so that the supplied callback will not block scan_dir() from
|
||||
// reading the next entry in the directory.
|
||||
futures.push_back(std::move(f));
|
||||
|
||||
return make_ready_future<>();
|
||||
}).then([&futures] {
|
||||
return when_all(futures.begin(), futures.end()).then([] (std::vector<future<>> ret) {
|
||||
try {
|
||||
for (auto& f : ret) {
|
||||
f.get();
|
||||
}
|
||||
} catch(...) {
|
||||
throw;
|
||||
}
|
||||
});
|
||||
}).then([verifier, sstdir, descriptor, this] {
|
||||
return parallel_for_each(*verifier, [sstdir = std::move(sstdir), descriptor, this] (auto v) {
|
||||
if (v.second == status::has_temporary_toc_file) {
|
||||
unsigned long gen = v.first;
|
||||
assert(descriptor->version);
|
||||
sstables::sstable::version_types version = descriptor->version.value();
|
||||
assert(descriptor->format);
|
||||
sstables::sstable::format_types format = descriptor->format.value();
|
||||
|
||||
if (engine().cpu_id() != 0) {
|
||||
dblog.info("At directory: {}, partial SSTable with generation {} not relevant for this shard, ignoring", sstdir, v.first);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
// shard 0 is the responsible for removing a partial sstable.
|
||||
return sstables::sstable::remove_sstable_with_temp_toc(_schema->ks_name(), _schema->cf_name(), sstdir, gen, version, format);
|
||||
} else if (v.second != status::has_toc_file) {
|
||||
throw sstables::malformed_sstable_exception(sprint("At directory: %s: no TOC found for SSTable with generation %d!. Refusing to boot", sstdir, v.first));
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
utils::UUID database::empty_version = utils::UUID_gen::get_name_UUID(bytes{});
|
||||
|
||||
@@ -1361,7 +1361,6 @@ future<> sstable::write_components(const memtable& mt) {
|
||||
future<> sstable::write_components(::mutation_reader mr,
|
||||
uint64_t estimated_partitions, schema_ptr schema, uint64_t max_sstable_size) {
|
||||
return seastar::async([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema), max_sstable_size] () mutable {
|
||||
// FIXME: write all components
|
||||
generate_toc(schema->get_compressor_params().get_compressor(), schema->bloom_filter_fp_chance());
|
||||
write_toc();
|
||||
create_data().get();
|
||||
|
||||
@@ -230,8 +230,11 @@ public:
|
||||
_summary.header.min_index_interval;
|
||||
}
|
||||
|
||||
// mark_for_deletion() specifies that the on-disk files for this sstable
|
||||
// should be deleted as soon as the in-memory object is destructed.
|
||||
// mark_for_deletion() specifies that a sstable isn't relevant to the
|
||||
// current shard, and thus can be deleted by the deletion manager, if
|
||||
// all shards sharing it agree. In case the sstable is unshared, it's
|
||||
// guaranteed that all of its on-disk files will be deleted as soon as
|
||||
// the in-memory object is destroyed.
|
||||
void mark_for_deletion() {
|
||||
_marked_for_deletion = true;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user