Compare commits
7 Commits
next
...
scylla-0.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b7d5fbe967 | ||
|
|
149aea32e7 | ||
|
|
97c796b26b | ||
|
|
1545fc505b | ||
|
|
b690eaef38 | ||
|
|
7b5df973fa | ||
|
|
c33815211f |
@@ -1,6 +1,6 @@
|
||||
#!/bin/sh
|
||||
|
||||
VERSION=development
|
||||
VERSION=0.15
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
47
database.cc
47
database.cc
@@ -358,23 +358,32 @@ column_family::for_all_partitions_slow(std::function<bool (const dht::decorated_
|
||||
class lister {
|
||||
public:
|
||||
using dir_entry_types = std::unordered_set<directory_entry_type, enum_hash<directory_entry_type>>;
|
||||
using walker_type = std::function<future<> (directory_entry)>;
|
||||
using filter_type = std::function<bool (const sstring&)>;
|
||||
private:
|
||||
file _f;
|
||||
std::function<future<> (directory_entry de)> _walker;
|
||||
walker_type _walker;
|
||||
filter_type _filter;
|
||||
dir_entry_types _expected_type;
|
||||
subscription<directory_entry> _listing;
|
||||
sstring _dirname;
|
||||
|
||||
public:
|
||||
lister(file f, dir_entry_types type, std::function<future<> (directory_entry)> walker, sstring dirname)
|
||||
lister(file f, dir_entry_types type, walker_type walker, sstring dirname)
|
||||
: _f(std::move(f))
|
||||
, _walker(std::move(walker))
|
||||
, _filter([] (const sstring& fname) { return true; })
|
||||
, _expected_type(type)
|
||||
, _listing(_f.list_directory([this] (directory_entry de) { return _visit(de); }))
|
||||
, _dirname(dirname) {
|
||||
}
|
||||
|
||||
static future<> scan_dir(sstring name, dir_entry_types type, std::function<future<> (directory_entry)> walker);
|
||||
lister(file f, dir_entry_types type, walker_type walker, filter_type filter, sstring dirname)
|
||||
: lister(std::move(f), type, std::move(walker), dirname) {
|
||||
_filter = std::move(filter);
|
||||
}
|
||||
|
||||
static future<> scan_dir(sstring name, dir_entry_types type, walker_type walker, filter_type filter = [] (const sstring& fname) { return true; });
|
||||
protected:
|
||||
future<> _visit(directory_entry de) {
|
||||
|
||||
@@ -383,6 +392,12 @@ protected:
|
||||
if ((!_expected_type.count(*(de.type))) || (de.name[0] == '.')) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
// apply a filter
|
||||
if (!_filter(_dirname + "/" + de.name)) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
return _walker(de);
|
||||
});
|
||||
|
||||
@@ -403,9 +418,9 @@ private:
|
||||
};
|
||||
|
||||
|
||||
future<> lister::scan_dir(sstring name, lister::dir_entry_types type, std::function<future<> (directory_entry)> walker) {
|
||||
return engine().open_directory(name).then([type, walker = std::move(walker), name] (file f) {
|
||||
auto l = make_lw_shared<lister>(std::move(f), type, walker, name);
|
||||
future<> lister::scan_dir(sstring name, lister::dir_entry_types type, walker_type walker, filter_type filter) {
|
||||
return engine().open_directory(name).then([type, walker = std::move(walker), filter = std::move(filter), name] (file f) {
|
||||
auto l = make_lw_shared<lister>(std::move(f), type, walker, filter, name);
|
||||
return l->done().then([l] { });
|
||||
});
|
||||
}
|
||||
@@ -453,6 +468,9 @@ future<sstables::entry_descriptor> column_family::probe_file(sstring sstdir, sst
|
||||
return std::move(fut).then([this, sstdir = std::move(sstdir), comps] (range<partition_key> r) {
|
||||
// Checks whether or not sstable belongs to current shard.
|
||||
if (!belongs_to_current_shard(*_schema, std::move(r))) {
|
||||
dblog.debug("sstable {} not relevant for this shard, ignoring",
|
||||
sstables::sstable::filename(sstdir, _schema->ks_name(), _schema->cf_name(), comps.version, comps.generation, comps.format,
|
||||
sstables::sstable::component_type::Data));
|
||||
sstable::mark_sstable_for_deletion(_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation, comps.version, comps.format);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -672,7 +690,7 @@ column_family::reshuffle_sstables(int64_t start) {
|
||||
// Those SSTables are not known by anyone in the system. So we don't have any kind of
|
||||
// object describing them. There isn't too much of a choice.
|
||||
return work.sstables[comps.generation]->read_toc();
|
||||
}).then([&work] {
|
||||
}, &manifest_json_filter).then([&work] {
|
||||
// Note: cannot be parallel because we will be shuffling things around at this stage. Can't race.
|
||||
return do_for_each(work.sstables, [&work] (auto& pair) {
|
||||
auto&& comps = std::move(work.descriptors.at(pair.first));
|
||||
@@ -838,6 +856,17 @@ lw_shared_ptr<sstable_list> column_family::get_sstables() {
|
||||
return _sstables;
|
||||
}
|
||||
|
||||
inline bool column_family::manifest_json_filter(const sstring& fname) {
|
||||
using namespace boost::filesystem;
|
||||
|
||||
path entry_path(fname);
|
||||
if (!is_directory(status(entry_path)) && entry_path.filename() == path("manifest.json")) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
future<> column_family::populate(sstring sstdir) {
|
||||
// We can catch most errors when we try to load an sstable. But if the TOC
|
||||
// file is the one missing, we won't try to load the sstable at all. This
|
||||
@@ -899,7 +928,7 @@ future<> column_family::populate(sstring sstdir) {
|
||||
futures.push_back(std::move(f));
|
||||
|
||||
return make_ready_future<>();
|
||||
}).then([&futures] {
|
||||
}, &manifest_json_filter).then([&futures] {
|
||||
return when_all(futures.begin(), futures.end()).then([] (std::vector<future<>> ret) {
|
||||
try {
|
||||
for (auto& f : ret) {
|
||||
@@ -919,7 +948,7 @@ future<> column_family::populate(sstring sstdir) {
|
||||
sstables::sstable::format_types format = descriptor->format.value();
|
||||
|
||||
if (engine().cpu_id() != 0) {
|
||||
dblog.info("At directory: {}, partial SSTable with generation {} not relevant for this shard, ignoring", sstdir, v.first);
|
||||
dblog.debug("At directory: {}, partial SSTable with generation {} not relevant for this shard, ignoring", sstdir, v.first);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
// shard 0 is the responsible for removing a partial sstable.
|
||||
|
||||
@@ -351,6 +351,9 @@ private:
|
||||
// one are also complete
|
||||
future<> seal_active_memtable();
|
||||
|
||||
// filter manifest.json files out
|
||||
static bool manifest_json_filter(const sstring& fname);
|
||||
|
||||
seastar::gate _in_flight_seals;
|
||||
|
||||
// Iterate over all partitions. Protocol is the same as std::all_of(),
|
||||
|
||||
@@ -687,7 +687,7 @@ public:
|
||||
"\tkeyfile: (Default: conf/scylla.key) PEM Key file associated with certificate.\n" \
|
||||
"Related information: Client-to-node encryption" \
|
||||
) \
|
||||
val(ssl_storage_port, uint32_t, 7001, Unused, \
|
||||
val(ssl_storage_port, uint32_t, 7001, Used, \
|
||||
"The SSL port for encrypted communication. Unused unless enabled in encryption_options." \
|
||||
) \
|
||||
val(default_log_level, sstring, "warn", Used, \
|
||||
|
||||
@@ -149,9 +149,10 @@ public:
|
||||
virtual void set_local_private_addr(const sstring& addr_str) {};
|
||||
|
||||
static distributed<snitch_ptr>& snitch_instance() {
|
||||
static distributed<snitch_ptr> snitch_inst;
|
||||
// FIXME: leaked intentionally to avoid shutdown problems, see #293
|
||||
static distributed<snitch_ptr>* snitch_inst = new distributed<snitch_ptr>();
|
||||
|
||||
return snitch_inst;
|
||||
return *snitch_inst;
|
||||
}
|
||||
|
||||
static snitch_ptr& get_local_snitch_ptr() {
|
||||
|
||||
4
main.cc
4
main.cc
@@ -299,10 +299,10 @@ int main(int ac, char** av) {
|
||||
return dns::gethostbyname(api_address);
|
||||
}).then([&db, api_address, api_port, &ctx] (dns::hostent e){
|
||||
auto ip = e.addresses[0].in.s_addr;
|
||||
ctx.http_server.start().then([api_address, api_port, ip, &ctx] {
|
||||
return ctx.http_server.start().then([api_address, api_port, ip, &ctx] {
|
||||
return set_server(ctx);
|
||||
}).then([api_address, api_port, ip, &ctx] {
|
||||
ctx.http_server.listen(ipv4_addr{ip, api_port});
|
||||
return ctx.http_server.listen(ipv4_addr{ip, api_port});
|
||||
}).then([api_address, api_port] {
|
||||
print("Seastar HTTP server listening on %s:%s ...\n", api_address, api_port);
|
||||
});
|
||||
|
||||
@@ -1755,7 +1755,6 @@ sstable::get_sstable_key_range(const schema& s, sstring ks, sstring cf, sstring
|
||||
|
||||
void sstable::mark_sstable_for_deletion(sstring ks, sstring cf, sstring dir, int64_t generation, version_types v, format_types f) {
|
||||
auto sst = sstable(ks, cf, dir, generation, v, f);
|
||||
sstlog.info("sstable {} not relevant for this shard, ignoring", sst.get_filename());
|
||||
sst.mark_for_deletion();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user