mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-09 08:23:29 +00:00
Merge "Fixes for snapshots" from Glauber
This commit is contained in:
60
database.cc
60
database.cc
@@ -1658,42 +1658,44 @@ struct snapshot_manager {
|
||||
static thread_local std::unordered_map<sstring, lw_shared_ptr<snapshot_manager>> pending_snapshots;
|
||||
|
||||
static future<>
|
||||
seal_snapshot(sstring jsondir, std::unordered_set<sstring> tables) {
|
||||
seal_snapshot(sstring jsondir) {
|
||||
std::ostringstream ss;
|
||||
int n = 0;
|
||||
ss << "{" << std::endl << "\t\"files\" : { ";
|
||||
for (auto&& rf: tables) {
|
||||
ss << "{" << std::endl << "\t\"files\" : [ ";
|
||||
for (auto&& rf: pending_snapshots.at(jsondir)->files) {
|
||||
if (n++ > 0) {
|
||||
ss << ", ";
|
||||
}
|
||||
ss << "\"" << rf << "\"";
|
||||
}
|
||||
ss << " }" << std::endl << "}" << std::endl;
|
||||
ss << " ]" << std::endl << "}" << std::endl;
|
||||
|
||||
auto json = ss.str();
|
||||
auto jsonfile = jsondir + "/manifest.json";
|
||||
|
||||
dblog.debug("Storing manifest {}", jsonfile);
|
||||
|
||||
return engine().open_file_dma(jsonfile, open_flags::wo | open_flags::create | open_flags::truncate).then([json](file f) {
|
||||
return do_with(make_file_output_stream(std::move(f)), [json] (output_stream<char>& out) {
|
||||
return out.write(json.c_str(), json.size()).then([&out] {
|
||||
return out.flush();
|
||||
}).then([&out] {
|
||||
return out.close();
|
||||
return recursive_touch_directory(jsondir).then([jsonfile, json = std::move(json)] {
|
||||
return engine().open_file_dma(jsonfile, open_flags::wo | open_flags::create | open_flags::truncate).then([json](file f) {
|
||||
return do_with(make_file_output_stream(std::move(f)), [json] (output_stream<char>& out) {
|
||||
return out.write(json.c_str(), json.size()).then([&out] {
|
||||
return out.flush();
|
||||
}).then([&out] {
|
||||
return out.close();
|
||||
});
|
||||
});
|
||||
});
|
||||
}).then([jsondir = std::move(jsondir)] {
|
||||
}).then([jsondir] {
|
||||
return sync_directory(std::move(jsondir));
|
||||
}).finally([jsondir] {
|
||||
pending_snapshots.erase(jsondir);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
future<> column_family::snapshot(sstring name) {
|
||||
return flush().then([this, name = std::move(name)]() {
|
||||
auto tables = boost::copy_range<std::vector<sstables::shared_sstable>>(*_sstables | boost::adaptors::map_values);
|
||||
if (tables.size() == 0) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return do_with(std::move(tables), [this, name](std::vector<sstables::shared_sstable> & tables) {
|
||||
auto jsondir = _config.datadir + "/snapshots/" + name;
|
||||
|
||||
@@ -1702,22 +1704,29 @@ future<> column_family::snapshot(sstring name) {
|
||||
return recursive_touch_directory(dir).then([sstable, dir] {
|
||||
return sstable->create_links(dir);
|
||||
});
|
||||
}).then([jsondir] {
|
||||
return sync_directory(std::move(jsondir));
|
||||
}).then([this, &tables, name, jsondir] {
|
||||
auto shard = std::hash<sstring>()(name) % smp::count;
|
||||
}).then([jsondir, &tables] {
|
||||
// This is not just an optimization. If we have no files, jsondir may not have been created,
|
||||
// and sync_directory would throw.
|
||||
if (tables.size()) {
|
||||
return sync_directory(std::move(jsondir));
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
}).then([this, &tables, jsondir] {
|
||||
auto shard = std::hash<sstring>()(jsondir) % smp::count;
|
||||
std::unordered_set<sstring> table_names;
|
||||
for (auto& sst : tables) {
|
||||
auto f = sst->get_filename();
|
||||
auto rf = f.substr(sst->get_dir().size() + 1);
|
||||
table_names.insert(std::move(rf));
|
||||
}
|
||||
return smp::submit_to(shard, [requester = engine().cpu_id(), name = std::move(name),
|
||||
return smp::submit_to(shard, [requester = engine().cpu_id(), jsondir = std::move(jsondir),
|
||||
tables = std::move(table_names), datadir = _config.datadir] {
|
||||
if (pending_snapshots.count(name) == 0) {
|
||||
pending_snapshots.emplace(name, make_lw_shared<snapshot_manager>());
|
||||
|
||||
if (pending_snapshots.count(jsondir) == 0) {
|
||||
pending_snapshots.emplace(jsondir, make_lw_shared<snapshot_manager>());
|
||||
}
|
||||
auto snapshot = pending_snapshots.at(name);
|
||||
auto snapshot = pending_snapshots.at(jsondir);
|
||||
for (auto&& sst: tables) {
|
||||
snapshot->files.insert(std::move(sst));
|
||||
}
|
||||
@@ -1725,13 +1734,10 @@ future<> column_family::snapshot(sstring name) {
|
||||
snapshot->requests.signal(1);
|
||||
auto my_work = make_ready_future<>();
|
||||
if (requester == engine().cpu_id()) {
|
||||
auto jsondir = datadir + "/snapshots/" + name;
|
||||
my_work = snapshot->requests.wait(smp::count).then([jsondir = std::move(jsondir),
|
||||
snapshot,
|
||||
name = std::move(name)] () mutable {
|
||||
return seal_snapshot(jsondir, std::move(snapshot->files)).then([name = std::move(name), snapshot] {
|
||||
snapshot] () mutable {
|
||||
return seal_snapshot(jsondir).then([snapshot] {
|
||||
snapshot->manifest_write.signal(smp::count);
|
||||
pending_snapshots.erase(name);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user