system keyspace: implement get_bootstrap_state

To avoid spreading the futures all over, we will resort to a cache with this,
the same way we did for the dc/rack information.

Signed-off-by: Glauber Costa <glommer@cloudius-systems.com>
This commit is contained in:
Glauber Costa
2015-08-13 14:58:04 -05:00
parent 20590db87f
commit 0177c7fed1

View File

@@ -409,6 +409,7 @@ future<> force_blocking_flush(sstring cfname);
// updates are propagated correctly.
struct local_cache {
std::unordered_map<gms::inet_address, locator::endpoint_dc_rack> _cached_dc_rack_info;
bootstrap_state _state;
future<> stop() {
return make_ready_future<>();
}
@@ -435,6 +436,25 @@ static future<> build_dc_rack_info() {
});
}
static future<> build_bootstrap_info() {
sstring req = "SELECT bootstrapped FROM system.%s WHERE key = ? ";
return execute_cql(req, LOCAL, sstring(LOCAL)).then([] (auto msg) {
static auto state_map = std::unordered_map<sstring, bootstrap_state>({
{ "NEEDS_BOOTSTRAP", bootstrap_state::NEEDS_BOOTSTRAP },
{ "COMPLETED", bootstrap_state::COMPLETED },
{ "IN_PROGRESS", bootstrap_state::IN_PROGRESS }
});
bootstrap_state state = bootstrap_state::NEEDS_BOOTSTRAP;
if (!msg->empty() && msg->one().has("bootstrapped")) {
state = state_map.at(msg->one().template get_as<sstring>("bootstrapped"));
}
return _local_cache.invoke_on_all([state] (local_cache& lc) {
lc._state = state;
});
});
}
future<> setup(distributed<database>& db, distributed<cql3::query_processor>& qp) {
auto new_ctx = std::make_unique<query_context>(db, qp);
qctx.swap(new_ctx);
@@ -449,6 +469,8 @@ future<> setup(distributed<database>& db, distributed<cql3::query_processor>& qp
});
}).then([] {
return build_dc_rack_info();
}).then([] {
return build_bootstrap_info();
}).then([] {
return check_health();
}).then([] {
@@ -716,16 +738,7 @@ bool bootstrap_in_progress() {
}
bootstrap_state get_bootstrap_state() {
#if 0
String req = "SELECT bootstrapped FROM system.%s WHERE key='%s'";
UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
if (result.isEmpty() || !result.one().has("bootstrapped"))
return BootstrapState.NEEDS_BOOTSTRAP;
return BootstrapState.valueOf(result.one().getString("bootstrapped"));
#endif
return bootstrap_state::NEEDS_BOOTSTRAP;
return _local_cache.local()._state;
}
future<> set_bootstrap_state(bootstrap_state state) {
@@ -736,8 +749,12 @@ future<> set_bootstrap_state(bootstrap_state state) {
}).at(state);
sstring req = "INSERT INTO system.%s (key, bootstrapped) VALUES (?, ?)";
return execute_cql(req, LOCAL, sstring(LOCAL), state_name).discard_result().then([] {
return force_blocking_flush(LOCAL);
return execute_cql(req, LOCAL, sstring(LOCAL), state_name).discard_result().then([state] {
return force_blocking_flush(LOCAL).then([state] {
return _local_cache.invoke_on_all([state] (local_cache& lc) {
lc._state = state;
});
});
});
}