From 5cb5050ca1e8b43ba49d6c64d7bd2f887cbd3cfa Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 11 Aug 2015 09:49:14 +0800 Subject: [PATCH 01/11] system_keyspace: Stub get_saved_tokens --- db/system_keyspace.cc | 18 ++++++++++-------- db/system_keyspace.hh | 11 +++-------- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index ba53acbd7d..881a53db1a 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -910,15 +910,17 @@ future<> check_health() { }); } +std::unordered_set get_saved_tokens() { +#if 0 + String req = "SELECT tokens FROM system.%s WHERE key='%s'"; + UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); + return result.isEmpty() || !result.one().has("tokens") + ? Collections.emptyList() + : deserializeTokens(result.one().getSet("tokens", UTF8Type.instance)); +#endif + return std::unordered_set(); +} #if 0 - public static Collection getSavedTokens() - { - String req = "SELECT tokens FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); - return result.isEmpty() || !result.one().has("tokens") - ? Collections.emptyList() - : deserializeTokens(result.one().getSet("tokens", UTF8Type.instance)); - } public static int incrementAndGetGeneration() { diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 3887ea7ce8..b316e1a38d 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -456,14 +456,9 @@ load_dc_rack_info(); throw new ConfigurationException("Saved cluster name " + savedClusterName + " != configured name " + DatabaseDescriptor.getClusterName()); } - public static Collection getSavedTokens() - { - String req = "SELECT tokens FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); - return result.isEmpty() || !result.one().has("tokens") - ? Collections.emptyList() - : deserializeTokens(result.one().getSet("tokens", UTF8Type.instance)); - } +#endif + std::unordered_set get_saved_tokens(); +#if 0 public static int incrementAndGetGeneration() { From 3ea91504ba677ce54cfe7bcf9a05f7e8a8f370f7 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 11 Aug 2015 10:41:11 +0800 Subject: [PATCH 02/11] storage_service: Enable get_saved_tokens and get_initial_tokens --- service/storage_service.cc | 48 ++++++++++++++++++-------------------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 487dce6c15..a458e00368 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -58,6 +58,11 @@ std::experimental::optional get_replace_address() { return {}; } +std::unordered_set get_initial_tokens() { + // FIXME: DatabaseDescriptor.getInitialTokens(); + return std::unordered_set(); +} + bool get_property_join_ring() { // FIXME: Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true"))) return true; @@ -257,37 +262,30 @@ future<> storage_service::join_token_ring(int delay) { // assert(!_is_bootstrap_mode); // bootstrap will block until finished } else { size_t num_tokens = _db.local().get_config().num_tokens(); - _bootstrap_tokens = boot_strapper::get_random_tokens(_token_metadata, num_tokens); - logger.info("Generated random tokens. tokens are {}", _bootstrap_tokens); -#if 0 - _bootstrap_tokens = SystemKeyspace.getSavedTokens(); - if (_bootstrap_tokens.isEmpty()) - { - Collection initialTokens = DatabaseDescriptor.getInitialTokens(); - if (initialTokens.size() < 1) - { - _bootstrap_tokens = BootStrapper.getRandomTokens(_token_metadata, DatabaseDescriptor.getNumTokens()); - if (DatabaseDescriptor.getNumTokens() == 1) + _bootstrap_tokens = db::system_keyspace::get_saved_tokens(); + if (_bootstrap_tokens.empty()) { + auto initial_tokens = get_initial_tokens(); + if (initial_tokens.size() < 1) { + _bootstrap_tokens = boot_strapper::get_random_tokens(_token_metadata, num_tokens); + if (num_tokens == 1) { logger.warn("Generated random token {}. Random tokens will result in an unbalanced ring; see http://wiki.apache.org/cassandra/Operations", _bootstrap_tokens); - else + } else { logger.info("Generated random tokens. tokens are {}", _bootstrap_tokens); - } - else - { - _bootstrap_tokens = new ArrayList(initialTokens.size()); - for (String token : initialTokens) - _bootstrap_tokens.add(getPartitioner().getTokenFactory().fromString(token)); + } + } else { + for (auto token : initial_tokens) { + // FIXME: token from string + // _bootstrap_tokens.insert(getPartitioner().getTokenFactory().fromString(token)); + } logger.info("Saved tokens not found. Using configuration value: {}", _bootstrap_tokens); } - } - else - { - if (_bootstrap_tokens.size() != DatabaseDescriptor.getNumTokens()) - throw new ConfigurationException("Cannot change the number of tokens from " + _bootstrap_tokens.size() + " to " + DatabaseDescriptor.getNumTokens()); - else + } else { + if (_bootstrap_tokens.size() != num_tokens) { + throw std::runtime_error(sprint("Cannot change the number of tokens from %ld to %ld", _bootstrap_tokens.size(), num_tokens)); + } else { logger.info("Using saved tokens {}", _bootstrap_tokens); + } } -#endif } set_tokens(_bootstrap_tokens).get(); #if 0 From 645700d261cc9afd0f8aa3142fd0e5a27168da7e Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 11 Aug 2015 10:51:54 +0800 Subject: [PATCH 03/11] storage_service: Implement join_ring Join the ring by operator request. --- service/storage_service.cc | 31 ++++++++++++------------------- service/storage_service.hh | 2 +- 2 files changed, 13 insertions(+), 20 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index a458e00368..467fa7c866 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -314,28 +314,21 @@ future<> storage_service::join_token_ring(int delay) { }); } -void storage_service::join_ring() { -#if 0 - if (!joined) { +future<> storage_service::join_ring() { + if (!_joined) { logger.info("Joining ring by operator request"); - try - { - joinTokenRing(0); - } - catch (ConfigurationException e) - { - throw new IOException(e.getMessage()); - } + return join_token_ring(0); } else if (_is_survey_mode) { - set_tokens(SystemKeyspace.getSavedTokens()); - SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED); - _is_survey_mode = false; - logger.info("Leaving write survey mode and joining ring at operator request"); - assert _token_metadata.sortedTokens().size() > 0; - - Auth.setup(); + return set_tokens(db::system_keyspace::get_saved_tokens()).then([this] { + //SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED); + _is_survey_mode = false; + logger.info("Leaving write survey mode and joining ring at operator request"); + assert(_token_metadata.sorted_tokens().size() > 0); + //Auth.setup(); + return make_ready_future<>(); + }); } -#endif + return make_ready_future<>(); } future<> storage_service::bootstrap(std::unordered_set tokens) { diff --git a/service/storage_service.hh b/service/storage_service.hh index fce933d7da..81f3cf38e6 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -371,7 +371,7 @@ private: future<> prepare_to_join(); future<> join_token_ring(int delay); public: - void join_ring(); + future<> join_ring(); bool is_joined() { return _joined; } From 6874663c9de6eb37740c7dcb8d9c3674a91a537d Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 11 Aug 2015 11:29:41 +0800 Subject: [PATCH 04/11] storage_service: Enable current in prepare_to_join --- service/storage_service.cc | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 467fa7c866..5a2d7c5072 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -160,7 +160,6 @@ future<> storage_service::prepare_to_join() { future<> storage_service::join_token_ring(int delay) { return seastar::async([this, delay] { _joined = true; -#if 0 // We bootstrap if we haven't successfully bootstrapped before, as long as we are not a seed. // If we are a seed, or if the user manually sets auto_bootstrap to false, // we'll skip streaming data from other nodes and jump directly into the ring. @@ -170,7 +169,8 @@ future<> storage_service::join_token_ring(int delay) { // // We attempted to replace this with a schema-presence check, but you need a meaningful sleep // to get schema info from gossip which defeats the purpose. See CASSANDRA-4427 for the gory details. - Set current = new HashSet<>(); + std::unordered_set current; +#if 0 logger.debug("Bootstrap variables: {} {} {} {}", DatabaseDescriptor.isAutoBootstrap(), SystemKeyspace.bootstrapInProgress(), @@ -240,12 +240,12 @@ future<> storage_service::join_token_ring(int delay) { for (auto token : _bootstrap_tokens) { auto existing = _token_metadata.get_endpoint(token); if (existing) { -#if 0 - long nanoDelay = delay * 1000000L; - if (Gossiper.instance.getEndpointStateForEndpoint(existing).getUpdateTimestamp() > (System.nanoTime() - nanoDelay)) - throw new UnsupportedOperationException("Cannot replace a live node... "); - current.add(existing); -#endif + auto& gossiper = gms::get_local_gossiper(); + auto eps = gossiper.get_endpoint_state_for_endpoint(*existing); + if (eps && eps->get_update_timestamp() > gms::gossiper::clk::now() - std::chrono::milliseconds(delay)) { + throw std::runtime_error("Cannot replace a live node..."); + } + current.insert(*existing); } else { throw std::runtime_error(sprint("Cannot replace token %s which does not exist!", token)); } From 951c0d192bafe92db997876f6a91c3a1bf3c88df Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 11 Aug 2015 11:30:12 +0800 Subject: [PATCH 05/11] storage_service: Enable _is_survey_mode logic in join_token_ring --- service/storage_service.cc | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 5a2d7c5072..f5139456fa 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -287,30 +287,28 @@ future<> storage_service::join_token_ring(int delay) { } } } - set_tokens(_bootstrap_tokens).get(); #if 0 // if we don't have system_traces keyspace at this point, then create it manually if (Schema.instance.getKSMetaData(TraceKeyspace.NAME) == null) MigrationManager.announceNewKeyspace(TraceKeyspace.definition(), 0, false); +#endif - if (!_is_survey_mode) - { + if (!_is_survey_mode) { // start participating in the ring. - SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED); - set_tokens(_bootstrap_tokens); + //SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED); + set_tokens(_bootstrap_tokens).get(); // remove the existing info about the replaced node. - if (!current.isEmpty()) - for (InetAddress existing : current) - Gossiper.instance.replacedEndpoint(existing); - assert _token_metadata.sortedTokens().size() > 0; - - Auth.setup(); - } - else - { + if (!current.empty()) { + auto& gossiper = gms::get_local_gossiper(); + for (auto existing : current) { + gossiper.replaced_endpoint(existing); + } + } + assert(_token_metadata.sorted_tokens().size() > 0); + //Auth.setup(); + } else { logger.info("Startup complete, but write survey mode is active, not becoming an active ring member. Use JMX (StorageService->joinRing()) to finalize ring joining."); } -#endif }); } From 96fe749141d3148a1b32547e72b96565e413533b Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 11 Aug 2015 13:59:14 +0800 Subject: [PATCH 06/11] db/system_keyspace: Stub get_bootstrap_state and friends --- db/system_keyspace.cc | 45 +++++++++++++++++++++-------------------- db/system_keyspace.hh | 47 ++++++++++++------------------------------- 2 files changed, 36 insertions(+), 56 deletions(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 881a53db1a..c396d85efb 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -958,37 +958,38 @@ std::unordered_set get_saved_tokens() { return generation; } - - public static BootstrapState getBootstrapState() - { - String req = "SELECT bootstrapped FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); - - if (result.isEmpty() || !result.one().has("bootstrapped")) - return BootstrapState.NEEDS_BOOTSTRAP; - - return BootstrapState.valueOf(result.one().getString("bootstrapped")); - } - - public static boolean bootstrapComplete() - { - return getBootstrapState() == BootstrapState.COMPLETED; - } - - public static boolean bootstrapInProgress() - { - return getBootstrapState() == BootstrapState.IN_PROGRESS; - } #endif +bool bootstrap_complete() { + return get_bootstrap_state() == bootstrap_state::COMPLETED; +} + +bool bootstrap_in_progress() { + return get_bootstrap_state() == bootstrap_state::IN_PROGRESS; +} + +bootstrap_state get_bootstrap_state() { #if 0 + String req = "SELECT bootstrapped FROM system.%s WHERE key='%s'"; + UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); + + if (result.isEmpty() || !result.one().has("bootstrapped")) + return BootstrapState.NEEDS_BOOTSTRAP; + + return BootstrapState.valueOf(result.one().getString("bootstrapped")); +#endif + return bootstrap_state::NEEDS_BOOTSTRAP; +} + future<> set_bootstrap_state(bootstrap_state state) { +#if 0 sstring req = "INSERT INTO system.%s (key, bootstrapped) VALUES ('%s', '%s')"; return execute_cql(req, LOCAL, LOCAL, state.name()).discard_result().then([] { return force_blocking_flush(LOCAL); }); -} #endif + return make_ready_future<>(); +} #if 0 diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index b316e1a38d..3f1f6ee32d 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -105,16 +105,16 @@ std::unordered_map load_dc_rack_info(); #if 0 - private static volatile Map> truncationRecords; +#endif - public enum BootstrapState - { - NEEDS_BOOTSTRAP, - COMPLETED, - IN_PROGRESS - } +enum class bootstrap_state { + NEEDS_BOOTSTRAP, + COMPLETED, + IN_PROGRESS +}; +#if 0 private static DecoratedKey decorate(ByteBuffer key) { return StorageService.getPartitioner().decorateKey(key); @@ -496,35 +496,14 @@ load_dc_rack_info(); return generation; } +#endif - public static BootstrapState getBootstrapState() - { - String req = "SELECT bootstrapped FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); - - if (result.isEmpty() || !result.one().has("bootstrapped")) - return BootstrapState.NEEDS_BOOTSTRAP; - - return BootstrapState.valueOf(result.one().getString("bootstrapped")); - } - - public static boolean bootstrapComplete() - { - return getBootstrapState() == BootstrapState.COMPLETED; - } - - public static boolean bootstrapInProgress() - { - return getBootstrapState() == BootstrapState.IN_PROGRESS; - } - - public static void setBootstrapState(BootstrapState state) - { - String req = "INSERT INTO system.%s (key, bootstrapped) VALUES ('%s', ?)"; - executeInternal(String.format(req, LOCAL, LOCAL), state.name()); - forceBlockingFlush(LOCAL); - } +bool bootstrap_complete(); +bool bootstrap_in_progress(); +bootstrap_state get_bootstrap_state(); +future<> set_bootstrap_state(bootstrap_state state); +#if 0 public static boolean isIndexBuilt(String keyspaceName, String indexName) { ColumnFamilyStore cfs = Keyspace.open(NAME).getColumnFamilyStore(BUILT_INDEXES); From 95dd3075975d7e3763d29624c0cacafdb2ff7d8c Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 11 Aug 2015 14:10:34 +0800 Subject: [PATCH 07/11] db/system_keyspace: Remove duplicated commented out code I'm not sure what happened. We have the same commented code in both .hh and .cc. It is very confusing when enabling some of the code. Let's remove the duplicated code in .cc and leave the in .hh only. --- db/system_keyspace.cc | 449 ------------------------------------------ db/system_keyspace.hh | 21 +- 2 files changed, 20 insertions(+), 450 deletions(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index c396d85efb..4ed7597dea 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -377,42 +377,6 @@ schema_ptr size_estimates() { return size_estimates; } -#if 0 - - public static KSMetaData definition() - { - Iterable tables = - Iterables.concat(LegacySchemaTables.All, - Arrays.asList(BuiltIndexes, - Hints, - Batchlog, - Paxos, - Local, - Peers, - PeerEvents, - RangeXfers, - CompactionsInProgress, - CompactionHistory, - SSTableActivity)); - return new KSMetaData(NAME, LocalStrategy.class, Collections.emptyMap(), true, tables); - } - - private static volatile Map> truncationRecords; - - public enum BootstrapState - { - NEEDS_BOOTSTRAP, - COMPLETED, - IN_PROGRESS - } - - private static DecoratedKey decorate(ByteBuffer key) - { - return StorageService.getPartitioner().decorateKey(key); - } - -#endif - static future<> setup_version() { sstring req = "INSERT INTO system.%s (key, release_version, cql_version, thrift_version, native_protocol_version, data_center, rack, partitioner, rpc_address, broadcast_address, listen_address) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr(); @@ -491,104 +455,6 @@ future<> setup(distributed& db, distributed& qp }); } -#if 0 - /** - * Write compaction log, except columfamilies under system keyspace. - * - * @param cfs cfs to compact - * @param toCompact sstables to compact - * @return compaction task id or null if cfs is under system keyspace - */ - public static UUID startCompaction(ColumnFamilyStore cfs, Iterable toCompact) - { - if (NAME.equals(cfs.keyspace.getName())) - return null; - - UUID compactionId = UUIDGen.getTimeUUID(); - Iterable generations = Iterables.transform(toCompact, new Function() - { - public Integer apply(SSTableReader sstable) - { - return sstable.descriptor.generation; - } - }); - String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (?, ?, ?, ?)"; - executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS), compactionId, cfs.keyspace.getName(), cfs.name, Sets.newHashSet(generations)); - forceBlockingFlush(COMPACTIONS_IN_PROGRESS); - return compactionId; - } - - /** - * Deletes the entry for this compaction from the set of compactions in progress. The compaction does not need - * to complete successfully for this to be called. - * @param taskId what was returned from {@code startCompaction} - */ - public static void finishCompaction(UUID taskId) - { - assert taskId != null; - - executeInternal(String.format("DELETE FROM system.%s WHERE id = ?", COMPACTIONS_IN_PROGRESS), taskId); - forceBlockingFlush(COMPACTIONS_IN_PROGRESS); - } - - /** - * Returns a Map whose keys are KS.CF pairs and whose values are maps from sstable generation numbers to the - * task ID of the compaction they were participating in. - */ - public static Map, Map> getUnfinishedCompactions() - { - String req = "SELECT * FROM system.%s"; - UntypedResultSet resultSet = executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS)); - - Map, Map> unfinishedCompactions = new HashMap<>(); - for (UntypedResultSet.Row row : resultSet) - { - String keyspace = row.getString("keyspace_name"); - String columnfamily = row.getString("columnfamily_name"); - Set inputs = row.getSet("inputs", Int32Type.instance); - UUID taskID = row.getUUID("id"); - - Pair kscf = Pair.create(keyspace, columnfamily); - Map generationToTaskID = unfinishedCompactions.get(kscf); - if (generationToTaskID == null) - generationToTaskID = new HashMap<>(inputs.size()); - - for (Integer generation : inputs) - generationToTaskID.put(generation, taskID); - - unfinishedCompactions.put(kscf, generationToTaskID); - } - return unfinishedCompactions; - } - - public static void discardCompactionsInProgress() - { - ColumnFamilyStore compactionLog = Keyspace.open(NAME).getColumnFamilyStore(COMPACTIONS_IN_PROGRESS); - compactionLog.truncateBlocking(); - } - - public static void updateCompactionHistory(String ksname, - String cfname, - long compactedAt, - long bytesIn, - long bytesOut, - Map rowsMerged) - { - // don't write anything when the history table itself is compacted, since that would in turn cause new compactions - if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY)) - return; - String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) VALUES (?, ?, ?, ?, ?, ?, ?)"; - executeInternal(String.format(req, COMPACTION_HISTORY), UUIDGen.getTimeUUID(), ksname, cfname, ByteBufferUtil.bytes(compactedAt), bytesIn, bytesOut, rowsMerged); - } - - public static TabularData getCompactionHistory() throws OpenDataException - { - UntypedResultSet queryResultSet = executeInternal(String.format("SELECT * from system.%s", COMPACTION_HISTORY)); - return CompactionHistoryTabularData.from(queryResultSet); - } -#endif - - typedef std::pair truncation_entry; typedef std::unordered_map truncation_map; static thread_local std::experimental::optional truncation_records; @@ -733,27 +599,6 @@ future<> update_schema_version(utils::UUID version) { return execute_cql(req, LOCAL, sstring(LOCAL), version).discard_result(); } -#if 0 - - private static Set tokensAsSet(Collection tokens) - { - Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory(); - Set s = new HashSet<>(tokens.size()); - for (Token tk : tokens) - s.add(factory.toString(tk)); - return s; - } - - private static Collection deserializeTokens(Collection tokensStrings) - { - Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory(); - List tokens = new ArrayList<>(tokensStrings.size()); - for (String tk : tokensStrings) - tokens.add(factory.fromString(tk)); - return tokens; - } - -#endif /** * Remove stored tokens being used by another node */ @@ -780,25 +625,6 @@ future<> update_tokens(std::unordered_set tokens) { }); } -#if 0 - - /** - * Convenience method to update the list of tokens in the local system keyspace. - * - * @param addTokens tokens to add - * @param rmTokens tokens to remove - * @return the collection of persisted tokens - */ - public static synchronized Collection updateLocalTokens(Collection addTokens, Collection rmTokens) - { - Collection tokens = getSavedTokens(); - tokens.removeAll(rmTokens); - tokens.addAll(addTokens); - updateTokens(tokens); - return tokens; - } -#endif - future<> force_blocking_flush(sstring cfname) { if (!qctx) { return make_ready_future<>(); @@ -811,78 +637,6 @@ future<> force_blocking_flush(sstring cfname) { }); } -#if 0 - /** - * Return a map of stored tokens to IP addresses - * - */ - public static SetMultimap loadTokens() - { - SetMultimap tokenMap = HashMultimap.create(); - for (UntypedResultSet.Row row : executeInternal("SELECT peer, tokens FROM system." + PEERS)) - { - InetAddress peer = row.getInetAddress("peer"); - if (row.has("tokens")) - tokenMap.putAll(peer, deserializeTokens(row.getSet("tokens", UTF8Type.instance))); - } - - return tokenMap; - } - - /** - * Return a map of store host_ids to IP addresses - * - */ - public static Map loadHostIds() - { - Map hostIdMap = new HashMap<>(); - for (UntypedResultSet.Row row : executeInternal("SELECT peer, host_id FROM system." + PEERS)) - { - InetAddress peer = row.getInetAddress("peer"); - if (row.has("host_id")) - { - hostIdMap.put(peer, row.getUUID("host_id")); - } - } - return hostIdMap; - } - - /** - * Get preferred IP for given endpoint if it is known. Otherwise this returns given endpoint itself. - * - * @param ep endpoint address to check - * @return Preferred IP for given endpoint if present, otherwise returns given ep - */ - public static InetAddress getPreferredIP(InetAddress ep) - { - String req = "SELECT preferred_ip FROM system.%s WHERE peer=?"; - UntypedResultSet result = executeInternal(String.format(req, PEERS), ep); - if (!result.isEmpty() && result.one().has("preferred_ip")) - return result.one().getInetAddress("preferred_ip"); - return ep; - } - - /** - * Return a map of IP addresses containing a map of dc and rack info - */ - public static Map> loadDcRackInfo() - { - Map> result = new HashMap<>(); - for (UntypedResultSet.Row row : executeInternal("SELECT peer, data_center, rack from system." + PEERS)) - { - InetAddress peer = row.getInetAddress("peer"); - if (row.has("data_center") && row.has("rack")) - { - Map dcRack = new HashMap<>(); - dcRack.put("data_center", row.getString("data_center")); - dcRack.put("rack", row.getString("rack")); - result.put(peer, dcRack); - } - } - return result; - } - -#endif /** * One of three things will happen if you try to read the system keyspace: * 1. files are present and you can read them: great @@ -920,45 +674,6 @@ std::unordered_set get_saved_tokens() { #endif return std::unordered_set(); } -#if 0 - - public static int incrementAndGetGeneration() - { - String req = "SELECT gossip_generation FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); - - int generation; - if (result.isEmpty() || !result.one().has("gossip_generation")) - { - // seconds-since-epoch isn't a foolproof new generation - // (where foolproof is "guaranteed to be larger than the last one seen at this ip address"), - // but it's as close as sanely possible - generation = (int) (System.currentTimeMillis() / 1000); - } - else - { - // Other nodes will ignore gossip messages about a node that have a lower generation than previously seen. - final int storedGeneration = result.one().getInt("gossip_generation") + 1; - final int now = (int) (System.currentTimeMillis() / 1000); - if (storedGeneration >= now) - { - logger.warn("Using stored Gossip Generation {} as it is greater than current system time {}. See CASSANDRA-3654 if you experience problems", - storedGeneration, now); - generation = storedGeneration; - } - else - { - generation = now; - } - } - - req = "INSERT INTO system.%s (key, gossip_generation) VALUES ('%s', ?)"; - executeInternal(String.format(req, LOCAL, LOCAL), generation); - forceBlockingFlush(LOCAL); - - return generation; - } -#endif bool bootstrap_complete() { return get_bootstrap_state() == bootstrap_state::COMPLETED; @@ -991,170 +706,6 @@ future<> set_bootstrap_state(bootstrap_state state) { return make_ready_future<>(); } -#if 0 - - public static boolean isIndexBuilt(String keyspaceName, String indexName) - { - ColumnFamilyStore cfs = Keyspace.open(NAME).getColumnFamilyStore(BUILT_INDEXES); - QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes(keyspaceName)), - BUILT_INDEXES, - FBUtilities.singleton(cfs.getComparator().makeCellName(indexName), cfs.getComparator()), - System.currentTimeMillis()); - return ColumnFamilyStore.removeDeleted(cfs.getColumnFamily(filter), Integer.MAX_VALUE) != null; - } - - public static void setIndexBuilt(String keyspaceName, String indexName) - { - ColumnFamily cf = ArrayBackedSortedColumns.factory.create(NAME, BUILT_INDEXES); - cf.addColumn(new BufferCell(cf.getComparator().makeCellName(indexName), ByteBufferUtil.EMPTY_BYTE_BUFFER, FBUtilities.timestampMicros())); - new Mutation(NAME, ByteBufferUtil.bytes(keyspaceName), cf).apply(); - } - - public static void setIndexRemoved(String keyspaceName, String indexName) - { - Mutation mutation = new Mutation(NAME, ByteBufferUtil.bytes(keyspaceName)); - mutation.delete(BUILT_INDEXES, BuiltIndexes.comparator.makeCellName(indexName), FBUtilities.timestampMicros()); - mutation.apply(); - } - - /** - * Read the host ID from the system keyspace, creating (and storing) one if - * none exists. - */ - public static UUID getLocalHostId() - { - String req = "SELECT host_id FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); - - // Look up the Host UUID (return it if found) - if (!result.isEmpty() && result.one().has("host_id")) - return result.one().getUUID("host_id"); - - // ID not found, generate a new one, persist, and then return it. - UUID hostId = UUID.randomUUID(); - logger.warn("No host ID found, created {} (Note: This should happen exactly once per node).", hostId); - return setLocalHostId(hostId); - } - - /** - * Sets the local host ID explicitly. Should only be called outside of SystemTable when replacing a node. - */ - public static UUID setLocalHostId(UUID hostId) - { - String req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', ?)"; - executeInternal(String.format(req, LOCAL, LOCAL), hostId); - return hostId; - } - - public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata) - { - String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?"; - UntypedResultSet results = executeInternal(String.format(req, PAXOS), key, metadata.cfId); - if (results.isEmpty()) - return new PaxosState(key, metadata); - UntypedResultSet.Row row = results.one(); - Commit promised = row.has("in_progress_ballot") - ? new Commit(key, row.getUUID("in_progress_ballot"), ArrayBackedSortedColumns.factory.create(metadata)) - : Commit.emptyCommit(key, metadata); - // either we have both a recently accepted ballot and update or we have neither - Commit accepted = row.has("proposal") - ? new Commit(key, row.getUUID("proposal_ballot"), ColumnFamily.fromBytes(row.getBytes("proposal"))) - : Commit.emptyCommit(key, metadata); - // either most_recent_commit and most_recent_commit_at will both be set, or neither - Commit mostRecent = row.has("most_recent_commit") - ? new Commit(key, row.getUUID("most_recent_commit_at"), ColumnFamily.fromBytes(row.getBytes("most_recent_commit"))) - : Commit.emptyCommit(key, metadata); - return new PaxosState(promised, accepted, mostRecent); - } - - public static void savePaxosPromise(Commit promise) - { - String req = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET in_progress_ballot = ? WHERE row_key = ? AND cf_id = ?"; - executeInternal(String.format(req, PAXOS), - UUIDGen.microsTimestamp(promise.ballot), - paxosTtl(promise.update.metadata), - promise.ballot, - promise.key, - promise.update.id()); - } - - public static void savePaxosProposal(Commit proposal) - { - executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ? WHERE row_key = ? AND cf_id = ?", PAXOS), - UUIDGen.microsTimestamp(proposal.ballot), - paxosTtl(proposal.update.metadata), - proposal.ballot, - proposal.update.toBytes(), - proposal.key, - proposal.update.id()); - } - - private static int paxosTtl(CFMetaData metadata) - { - // keep paxos state around for at least 3h - return Math.max(3 * 3600, metadata.getGcGraceSeconds()); - } - - public static void savePaxosCommit(Commit commit) - { - // We always erase the last proposal (with the commit timestamp to no erase more recent proposal in case the commit is old) - // even though that's really just an optimization since SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc. - String cql = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, most_recent_commit_at = ?, most_recent_commit = ? WHERE row_key = ? AND cf_id = ?"; - executeInternal(String.format(cql, PAXOS), - UUIDGen.microsTimestamp(commit.ballot), - paxosTtl(commit.update.metadata), - commit.ballot, - commit.update.toBytes(), - commit.key, - commit.update.id()); - } - - /** - * Returns a RestorableMeter tracking the average read rate of a particular SSTable, restoring the last-seen rate - * from values in system.sstable_activity if present. - * @param keyspace the keyspace the sstable belongs to - * @param table the table the sstable belongs to - * @param generation the generation number for the sstable - */ - public static RestorableMeter getSSTableReadMeter(String keyspace, String table, int generation) - { - String cql = "SELECT * FROM system.%s WHERE keyspace_name=? and columnfamily_name=? and generation=?"; - UntypedResultSet results = executeInternal(String.format(cql, SSTABLE_ACTIVITY), keyspace, table, generation); - - if (results.isEmpty()) - return new RestorableMeter(); - - UntypedResultSet.Row row = results.one(); - double m15rate = row.getDouble("rate_15m"); - double m120rate = row.getDouble("rate_120m"); - return new RestorableMeter(m15rate, m120rate); - } - - /** - * Writes the current read rates for a given SSTable to system.sstable_activity - */ - public static void persistSSTableReadMeter(String keyspace, String table, int generation, RestorableMeter meter) - { - // Store values with a one-day TTL to handle corner cases where cleanup might not occur - String cql = "INSERT INTO system.%s (keyspace_name, columnfamily_name, generation, rate_15m, rate_120m) VALUES (?, ?, ?, ?, ?) USING TTL 864000"; - executeInternal(String.format(cql, SSTABLE_ACTIVITY), - keyspace, - table, - generation, - meter.fifteenMinuteRate(), - meter.twoHourRate()); - } - - /** - * Clears persisted read rates from system.sstable_activity for SSTables that have been deleted. - */ - public static void clearSSTableReadMeter(String keyspace, String table, int generation) - { - String cql = "DELETE FROM system.%s WHERE keyspace_name=? AND columnfamily_name=? and generation=?"; - executeInternal(String.format(cql, SSTABLE_ACTIVITY), keyspace, table, generation); - } -#endif - std::vector all_tables() { std::vector r; auto legacy_tables = db::schema_tables::all_tables(); diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 3f1f6ee32d..2b90a50ae1 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -105,6 +105,25 @@ std::unordered_map load_dc_rack_info(); #if 0 + public static KSMetaData definition() + { + Iterable tables = + Iterables.concat(LegacySchemaTables.All, + Arrays.asList(BuiltIndexes, + Hints, + Batchlog, + Paxos, + Local, + Peers, + PeerEvents, + RangeXfers, + CompactionsInProgress, + CompactionHistory, + SSTableActivity)); + return new KSMetaData(NAME, LocalStrategy.class, Collections.emptyMap(), true, tables); + } + + private static volatile Map> truncationRecords; private static volatile Map> truncationRecords; #endif @@ -342,7 +361,7 @@ enum class bootstrap_state { } #endif -#if o +#if 0 /** * Return a map of stored tokens to IP addresses * From 4d3f333ec0745a09aa37e8a41629fe42e4976d55 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 11 Aug 2015 14:43:12 +0800 Subject: [PATCH 08/11] storage_service: Enable call to remove_endpoint in on_remove --- service/storage_service.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index f5139456fa..5753db09d7 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -699,8 +699,8 @@ void storage_service::on_change(inet_address endpoint, application_state state, void storage_service::on_remove(gms::inet_address endpoint) { logger.debug("on_remove endpoint={}", endpoint); + _token_metadata.remove_endpoint(endpoint); #if 0 - _token_metadata.removeEndpoint(endpoint); PendingRangeCalculatorService.instance.update(); #endif } From b3f7507e0af03c07683ddee254bfe2df786c556c Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 11 Aug 2015 14:47:40 +0800 Subject: [PATCH 09/11] storage_service: Enable gossiper.replacement_quarantine in handle_state_normal --- service/storage_service.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 5753db09d7..f796054439 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -478,10 +478,10 @@ void storage_service::handle_state_normal(inet_address endpoint) { _token_metadata.update_normal_tokens(tokens_to_update_in_metadata, endpoint); for (auto ep : endpoints_to_remove) { remove_endpoint(ep); -#if 0 - if (DatabaseDescriptor.isReplacing() && DatabaseDescriptor.getReplaceAddress().equals(ep)) - Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260 -#endif + auto replace_addr = get_replace_address(); + if (is_replacing() && replace_addr && *replace_addr == ep) { + gossiper.replacement_quarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260 + } } logger.debug("ep={} tokens_to_update_in_system_keyspace = {}", endpoint, tokens_to_update_in_system_keyspace); if (!tokens_to_update_in_system_keyspace.empty()) { From ce927105d84b555ea1cc6d21bd04967dcc36f07e Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 11 Aug 2015 15:06:47 +0800 Subject: [PATCH 10/11] db/system_keyspace: Implement update_local_tokens --- db/system_keyspace.cc | 15 +++++++++++++++ db/system_keyspace.hh | 13 ++++--------- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 4ed7597dea..9bd28e1512 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -545,6 +545,21 @@ future<> update_tokens(gms::inet_address ep, std::unordered_set toke }); } +future> update_local_tokens( + const std::unordered_set& add_tokens, + const std::unordered_set& rm_tokens) { + auto tokens = get_saved_tokens(); + for (auto& x : rm_tokens) { + tokens.erase(x); + } + for (auto& x : add_tokens) { + tokens.insert(x); + } + return update_tokens(tokens).then([tokens] { + return tokens; + }); +} + future<> update_preferred_ip(gms::inet_address ep, gms::inet_address preferred_ip) { sstring req = "INSERT INTO system.%s (peer, preferred_ip) VALUES (?, ?)"; return execute_cql(req, PEERS, ep.addr(), preferred_ip).discard_result().then([] { diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 2b90a50ae1..c7c01874d2 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -343,6 +343,7 @@ enum class bootstrap_state { executeInternal(String.format(req, LOCAL, LOCAL), tokensAsSet(tokens)); forceBlockingFlush(LOCAL); } +#endif /** * Convenience method to update the list of tokens in the local system keyspace. @@ -351,15 +352,9 @@ enum class bootstrap_state { * @param rmTokens tokens to remove * @return the collection of persisted tokens */ - public static synchronized Collection updateLocalTokens(Collection addTokens, Collection rmTokens) - { - Collection tokens = getSavedTokens(); - tokens.removeAll(rmTokens); - tokens.addAll(addTokens); - updateTokens(tokens); - return tokens; - } -#endif + future> update_local_tokens( + const std::unordered_set& add_tokens, + const std::unordered_set& rm_tokens); #if 0 /** From dd34f4b0a49e83e8550a674ca811d7c9cb7e29f8 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 11 Aug 2015 15:07:15 +0800 Subject: [PATCH 11/11] storage_service: Enable update_local_tokens --- service/storage_service.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index f796054439..3a9f016feb 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -495,7 +495,7 @@ void storage_service::handle_state_normal(inet_address endpoint) { }).get(); } if (!local_tokens_to_remove.empty()) { - // SystemKeyspace.updateLocalTokens(Collections.emptyList(), local_tokens_to_remove); + db::system_keyspace::update_local_tokens(std::unordered_set(), local_tokens_to_remove).discard_result().get(); } if (is_moving) {