diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index ba53acbd7d..9bd28e1512 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -377,42 +377,6 @@ schema_ptr size_estimates() { return size_estimates; } -#if 0 - - public static KSMetaData definition() - { - Iterable tables = - Iterables.concat(LegacySchemaTables.All, - Arrays.asList(BuiltIndexes, - Hints, - Batchlog, - Paxos, - Local, - Peers, - PeerEvents, - RangeXfers, - CompactionsInProgress, - CompactionHistory, - SSTableActivity)); - return new KSMetaData(NAME, LocalStrategy.class, Collections.emptyMap(), true, tables); - } - - private static volatile Map> truncationRecords; - - public enum BootstrapState - { - NEEDS_BOOTSTRAP, - COMPLETED, - IN_PROGRESS - } - - private static DecoratedKey decorate(ByteBuffer key) - { - return StorageService.getPartitioner().decorateKey(key); - } - -#endif - static future<> setup_version() { sstring req = "INSERT INTO system.%s (key, release_version, cql_version, thrift_version, native_protocol_version, data_center, rack, partitioner, rpc_address, broadcast_address, listen_address) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr(); @@ -491,104 +455,6 @@ future<> setup(distributed& db, distributed& qp }); } -#if 0 - /** - * Write compaction log, except columfamilies under system keyspace. - * - * @param cfs cfs to compact - * @param toCompact sstables to compact - * @return compaction task id or null if cfs is under system keyspace - */ - public static UUID startCompaction(ColumnFamilyStore cfs, Iterable toCompact) - { - if (NAME.equals(cfs.keyspace.getName())) - return null; - - UUID compactionId = UUIDGen.getTimeUUID(); - Iterable generations = Iterables.transform(toCompact, new Function() - { - public Integer apply(SSTableReader sstable) - { - return sstable.descriptor.generation; - } - }); - String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (?, ?, ?, ?)"; - executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS), compactionId, cfs.keyspace.getName(), cfs.name, Sets.newHashSet(generations)); - forceBlockingFlush(COMPACTIONS_IN_PROGRESS); - return compactionId; - } - - /** - * Deletes the entry for this compaction from the set of compactions in progress. The compaction does not need - * to complete successfully for this to be called. - * @param taskId what was returned from {@code startCompaction} - */ - public static void finishCompaction(UUID taskId) - { - assert taskId != null; - - executeInternal(String.format("DELETE FROM system.%s WHERE id = ?", COMPACTIONS_IN_PROGRESS), taskId); - forceBlockingFlush(COMPACTIONS_IN_PROGRESS); - } - - /** - * Returns a Map whose keys are KS.CF pairs and whose values are maps from sstable generation numbers to the - * task ID of the compaction they were participating in. - */ - public static Map, Map> getUnfinishedCompactions() - { - String req = "SELECT * FROM system.%s"; - UntypedResultSet resultSet = executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS)); - - Map, Map> unfinishedCompactions = new HashMap<>(); - for (UntypedResultSet.Row row : resultSet) - { - String keyspace = row.getString("keyspace_name"); - String columnfamily = row.getString("columnfamily_name"); - Set inputs = row.getSet("inputs", Int32Type.instance); - UUID taskID = row.getUUID("id"); - - Pair kscf = Pair.create(keyspace, columnfamily); - Map generationToTaskID = unfinishedCompactions.get(kscf); - if (generationToTaskID == null) - generationToTaskID = new HashMap<>(inputs.size()); - - for (Integer generation : inputs) - generationToTaskID.put(generation, taskID); - - unfinishedCompactions.put(kscf, generationToTaskID); - } - return unfinishedCompactions; - } - - public static void discardCompactionsInProgress() - { - ColumnFamilyStore compactionLog = Keyspace.open(NAME).getColumnFamilyStore(COMPACTIONS_IN_PROGRESS); - compactionLog.truncateBlocking(); - } - - public static void updateCompactionHistory(String ksname, - String cfname, - long compactedAt, - long bytesIn, - long bytesOut, - Map rowsMerged) - { - // don't write anything when the history table itself is compacted, since that would in turn cause new compactions - if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY)) - return; - String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) VALUES (?, ?, ?, ?, ?, ?, ?)"; - executeInternal(String.format(req, COMPACTION_HISTORY), UUIDGen.getTimeUUID(), ksname, cfname, ByteBufferUtil.bytes(compactedAt), bytesIn, bytesOut, rowsMerged); - } - - public static TabularData getCompactionHistory() throws OpenDataException - { - UntypedResultSet queryResultSet = executeInternal(String.format("SELECT * from system.%s", COMPACTION_HISTORY)); - return CompactionHistoryTabularData.from(queryResultSet); - } -#endif - - typedef std::pair truncation_entry; typedef std::unordered_map truncation_map; static thread_local std::experimental::optional truncation_records; @@ -679,6 +545,21 @@ future<> update_tokens(gms::inet_address ep, std::unordered_set toke }); } +future> update_local_tokens( + const std::unordered_set& add_tokens, + const std::unordered_set& rm_tokens) { + auto tokens = get_saved_tokens(); + for (auto& x : rm_tokens) { + tokens.erase(x); + } + for (auto& x : add_tokens) { + tokens.insert(x); + } + return update_tokens(tokens).then([tokens] { + return tokens; + }); +} + future<> update_preferred_ip(gms::inet_address ep, gms::inet_address preferred_ip) { sstring req = "INSERT INTO system.%s (peer, preferred_ip) VALUES (?, ?)"; return execute_cql(req, PEERS, ep.addr(), preferred_ip).discard_result().then([] { @@ -733,27 +614,6 @@ future<> update_schema_version(utils::UUID version) { return execute_cql(req, LOCAL, sstring(LOCAL), version).discard_result(); } -#if 0 - - private static Set tokensAsSet(Collection tokens) - { - Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory(); - Set s = new HashSet<>(tokens.size()); - for (Token tk : tokens) - s.add(factory.toString(tk)); - return s; - } - - private static Collection deserializeTokens(Collection tokensStrings) - { - Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory(); - List tokens = new ArrayList<>(tokensStrings.size()); - for (String tk : tokensStrings) - tokens.add(factory.fromString(tk)); - return tokens; - } - -#endif /** * Remove stored tokens being used by another node */ @@ -780,25 +640,6 @@ future<> update_tokens(std::unordered_set tokens) { }); } -#if 0 - - /** - * Convenience method to update the list of tokens in the local system keyspace. - * - * @param addTokens tokens to add - * @param rmTokens tokens to remove - * @return the collection of persisted tokens - */ - public static synchronized Collection updateLocalTokens(Collection addTokens, Collection rmTokens) - { - Collection tokens = getSavedTokens(); - tokens.removeAll(rmTokens); - tokens.addAll(addTokens); - updateTokens(tokens); - return tokens; - } -#endif - future<> force_blocking_flush(sstring cfname) { if (!qctx) { return make_ready_future<>(); @@ -811,78 +652,6 @@ future<> force_blocking_flush(sstring cfname) { }); } -#if 0 - /** - * Return a map of stored tokens to IP addresses - * - */ - public static SetMultimap loadTokens() - { - SetMultimap tokenMap = HashMultimap.create(); - for (UntypedResultSet.Row row : executeInternal("SELECT peer, tokens FROM system." + PEERS)) - { - InetAddress peer = row.getInetAddress("peer"); - if (row.has("tokens")) - tokenMap.putAll(peer, deserializeTokens(row.getSet("tokens", UTF8Type.instance))); - } - - return tokenMap; - } - - /** - * Return a map of store host_ids to IP addresses - * - */ - public static Map loadHostIds() - { - Map hostIdMap = new HashMap<>(); - for (UntypedResultSet.Row row : executeInternal("SELECT peer, host_id FROM system." + PEERS)) - { - InetAddress peer = row.getInetAddress("peer"); - if (row.has("host_id")) - { - hostIdMap.put(peer, row.getUUID("host_id")); - } - } - return hostIdMap; - } - - /** - * Get preferred IP for given endpoint if it is known. Otherwise this returns given endpoint itself. - * - * @param ep endpoint address to check - * @return Preferred IP for given endpoint if present, otherwise returns given ep - */ - public static InetAddress getPreferredIP(InetAddress ep) - { - String req = "SELECT preferred_ip FROM system.%s WHERE peer=?"; - UntypedResultSet result = executeInternal(String.format(req, PEERS), ep); - if (!result.isEmpty() && result.one().has("preferred_ip")) - return result.one().getInetAddress("preferred_ip"); - return ep; - } - - /** - * Return a map of IP addresses containing a map of dc and rack info - */ - public static Map> loadDcRackInfo() - { - Map> result = new HashMap<>(); - for (UntypedResultSet.Row row : executeInternal("SELECT peer, data_center, rack from system." + PEERS)) - { - InetAddress peer = row.getInetAddress("peer"); - if (row.has("data_center") && row.has("rack")) - { - Map dcRack = new HashMap<>(); - dcRack.put("data_center", row.getString("data_center")); - dcRack.put("rack", row.getString("rack")); - result.put(peer, dcRack); - } - } - return result; - } - -#endif /** * One of three things will happen if you try to read the system keyspace: * 1. files are present and you can read them: great @@ -910,247 +679,47 @@ future<> check_health() { }); } +std::unordered_set get_saved_tokens() { #if 0 - public static Collection getSavedTokens() - { - String req = "SELECT tokens FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); - return result.isEmpty() || !result.one().has("tokens") - ? Collections.emptyList() - : deserializeTokens(result.one().getSet("tokens", UTF8Type.instance)); - } - - public static int incrementAndGetGeneration() - { - String req = "SELECT gossip_generation FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); - - int generation; - if (result.isEmpty() || !result.one().has("gossip_generation")) - { - // seconds-since-epoch isn't a foolproof new generation - // (where foolproof is "guaranteed to be larger than the last one seen at this ip address"), - // but it's as close as sanely possible - generation = (int) (System.currentTimeMillis() / 1000); - } - else - { - // Other nodes will ignore gossip messages about a node that have a lower generation than previously seen. - final int storedGeneration = result.one().getInt("gossip_generation") + 1; - final int now = (int) (System.currentTimeMillis() / 1000); - if (storedGeneration >= now) - { - logger.warn("Using stored Gossip Generation {} as it is greater than current system time {}. See CASSANDRA-3654 if you experience problems", - storedGeneration, now); - generation = storedGeneration; - } - else - { - generation = now; - } - } - - req = "INSERT INTO system.%s (key, gossip_generation) VALUES ('%s', ?)"; - executeInternal(String.format(req, LOCAL, LOCAL), generation); - forceBlockingFlush(LOCAL); - - return generation; - } - - public static BootstrapState getBootstrapState() - { - String req = "SELECT bootstrapped FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); - - if (result.isEmpty() || !result.one().has("bootstrapped")) - return BootstrapState.NEEDS_BOOTSTRAP; - - return BootstrapState.valueOf(result.one().getString("bootstrapped")); - } - - public static boolean bootstrapComplete() - { - return getBootstrapState() == BootstrapState.COMPLETED; - } - - public static boolean bootstrapInProgress() - { - return getBootstrapState() == BootstrapState.IN_PROGRESS; - } + String req = "SELECT tokens FROM system.%s WHERE key='%s'"; + UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); + return result.isEmpty() || !result.one().has("tokens") + ? Collections.emptyList() + : deserializeTokens(result.one().getSet("tokens", UTF8Type.instance)); #endif + return std::unordered_set(); +} +bool bootstrap_complete() { + return get_bootstrap_state() == bootstrap_state::COMPLETED; +} + +bool bootstrap_in_progress() { + return get_bootstrap_state() == bootstrap_state::IN_PROGRESS; +} + +bootstrap_state get_bootstrap_state() { #if 0 + String req = "SELECT bootstrapped FROM system.%s WHERE key='%s'"; + UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); + + if (result.isEmpty() || !result.one().has("bootstrapped")) + return BootstrapState.NEEDS_BOOTSTRAP; + + return BootstrapState.valueOf(result.one().getString("bootstrapped")); +#endif + return bootstrap_state::NEEDS_BOOTSTRAP; +} + future<> set_bootstrap_state(bootstrap_state state) { +#if 0 sstring req = "INSERT INTO system.%s (key, bootstrapped) VALUES ('%s', '%s')"; return execute_cql(req, LOCAL, LOCAL, state.name()).discard_result().then([] { return force_blocking_flush(LOCAL); }); +#endif + return make_ready_future<>(); } -#endif - -#if 0 - - public static boolean isIndexBuilt(String keyspaceName, String indexName) - { - ColumnFamilyStore cfs = Keyspace.open(NAME).getColumnFamilyStore(BUILT_INDEXES); - QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes(keyspaceName)), - BUILT_INDEXES, - FBUtilities.singleton(cfs.getComparator().makeCellName(indexName), cfs.getComparator()), - System.currentTimeMillis()); - return ColumnFamilyStore.removeDeleted(cfs.getColumnFamily(filter), Integer.MAX_VALUE) != null; - } - - public static void setIndexBuilt(String keyspaceName, String indexName) - { - ColumnFamily cf = ArrayBackedSortedColumns.factory.create(NAME, BUILT_INDEXES); - cf.addColumn(new BufferCell(cf.getComparator().makeCellName(indexName), ByteBufferUtil.EMPTY_BYTE_BUFFER, FBUtilities.timestampMicros())); - new Mutation(NAME, ByteBufferUtil.bytes(keyspaceName), cf).apply(); - } - - public static void setIndexRemoved(String keyspaceName, String indexName) - { - Mutation mutation = new Mutation(NAME, ByteBufferUtil.bytes(keyspaceName)); - mutation.delete(BUILT_INDEXES, BuiltIndexes.comparator.makeCellName(indexName), FBUtilities.timestampMicros()); - mutation.apply(); - } - - /** - * Read the host ID from the system keyspace, creating (and storing) one if - * none exists. - */ - public static UUID getLocalHostId() - { - String req = "SELECT host_id FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); - - // Look up the Host UUID (return it if found) - if (!result.isEmpty() && result.one().has("host_id")) - return result.one().getUUID("host_id"); - - // ID not found, generate a new one, persist, and then return it. - UUID hostId = UUID.randomUUID(); - logger.warn("No host ID found, created {} (Note: This should happen exactly once per node).", hostId); - return setLocalHostId(hostId); - } - - /** - * Sets the local host ID explicitly. Should only be called outside of SystemTable when replacing a node. - */ - public static UUID setLocalHostId(UUID hostId) - { - String req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', ?)"; - executeInternal(String.format(req, LOCAL, LOCAL), hostId); - return hostId; - } - - public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata) - { - String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?"; - UntypedResultSet results = executeInternal(String.format(req, PAXOS), key, metadata.cfId); - if (results.isEmpty()) - return new PaxosState(key, metadata); - UntypedResultSet.Row row = results.one(); - Commit promised = row.has("in_progress_ballot") - ? new Commit(key, row.getUUID("in_progress_ballot"), ArrayBackedSortedColumns.factory.create(metadata)) - : Commit.emptyCommit(key, metadata); - // either we have both a recently accepted ballot and update or we have neither - Commit accepted = row.has("proposal") - ? new Commit(key, row.getUUID("proposal_ballot"), ColumnFamily.fromBytes(row.getBytes("proposal"))) - : Commit.emptyCommit(key, metadata); - // either most_recent_commit and most_recent_commit_at will both be set, or neither - Commit mostRecent = row.has("most_recent_commit") - ? new Commit(key, row.getUUID("most_recent_commit_at"), ColumnFamily.fromBytes(row.getBytes("most_recent_commit"))) - : Commit.emptyCommit(key, metadata); - return new PaxosState(promised, accepted, mostRecent); - } - - public static void savePaxosPromise(Commit promise) - { - String req = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET in_progress_ballot = ? WHERE row_key = ? AND cf_id = ?"; - executeInternal(String.format(req, PAXOS), - UUIDGen.microsTimestamp(promise.ballot), - paxosTtl(promise.update.metadata), - promise.ballot, - promise.key, - promise.update.id()); - } - - public static void savePaxosProposal(Commit proposal) - { - executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ? WHERE row_key = ? AND cf_id = ?", PAXOS), - UUIDGen.microsTimestamp(proposal.ballot), - paxosTtl(proposal.update.metadata), - proposal.ballot, - proposal.update.toBytes(), - proposal.key, - proposal.update.id()); - } - - private static int paxosTtl(CFMetaData metadata) - { - // keep paxos state around for at least 3h - return Math.max(3 * 3600, metadata.getGcGraceSeconds()); - } - - public static void savePaxosCommit(Commit commit) - { - // We always erase the last proposal (with the commit timestamp to no erase more recent proposal in case the commit is old) - // even though that's really just an optimization since SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc. - String cql = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, most_recent_commit_at = ?, most_recent_commit = ? WHERE row_key = ? AND cf_id = ?"; - executeInternal(String.format(cql, PAXOS), - UUIDGen.microsTimestamp(commit.ballot), - paxosTtl(commit.update.metadata), - commit.ballot, - commit.update.toBytes(), - commit.key, - commit.update.id()); - } - - /** - * Returns a RestorableMeter tracking the average read rate of a particular SSTable, restoring the last-seen rate - * from values in system.sstable_activity if present. - * @param keyspace the keyspace the sstable belongs to - * @param table the table the sstable belongs to - * @param generation the generation number for the sstable - */ - public static RestorableMeter getSSTableReadMeter(String keyspace, String table, int generation) - { - String cql = "SELECT * FROM system.%s WHERE keyspace_name=? and columnfamily_name=? and generation=?"; - UntypedResultSet results = executeInternal(String.format(cql, SSTABLE_ACTIVITY), keyspace, table, generation); - - if (results.isEmpty()) - return new RestorableMeter(); - - UntypedResultSet.Row row = results.one(); - double m15rate = row.getDouble("rate_15m"); - double m120rate = row.getDouble("rate_120m"); - return new RestorableMeter(m15rate, m120rate); - } - - /** - * Writes the current read rates for a given SSTable to system.sstable_activity - */ - public static void persistSSTableReadMeter(String keyspace, String table, int generation, RestorableMeter meter) - { - // Store values with a one-day TTL to handle corner cases where cleanup might not occur - String cql = "INSERT INTO system.%s (keyspace_name, columnfamily_name, generation, rate_15m, rate_120m) VALUES (?, ?, ?, ?, ?) USING TTL 864000"; - executeInternal(String.format(cql, SSTABLE_ACTIVITY), - keyspace, - table, - generation, - meter.fifteenMinuteRate(), - meter.twoHourRate()); - } - - /** - * Clears persisted read rates from system.sstable_activity for SSTables that have been deleted. - */ - public static void clearSSTableReadMeter(String keyspace, String table, int generation) - { - String cql = "DELETE FROM system.%s WHERE keyspace_name=? AND columnfamily_name=? and generation=?"; - executeInternal(String.format(cql, SSTABLE_ACTIVITY), keyspace, table, generation); - } -#endif std::vector all_tables() { std::vector r; diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 3887ea7ce8..c7c01874d2 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -105,16 +105,35 @@ std::unordered_map load_dc_rack_info(); #if 0 - - private static volatile Map> truncationRecords; - - public enum BootstrapState + public static KSMetaData definition() { - NEEDS_BOOTSTRAP, - COMPLETED, - IN_PROGRESS + Iterable tables = + Iterables.concat(LegacySchemaTables.All, + Arrays.asList(BuiltIndexes, + Hints, + Batchlog, + Paxos, + Local, + Peers, + PeerEvents, + RangeXfers, + CompactionsInProgress, + CompactionHistory, + SSTableActivity)); + return new KSMetaData(NAME, LocalStrategy.class, Collections.emptyMap(), true, tables); } + private static volatile Map> truncationRecords; + private static volatile Map> truncationRecords; +#endif + +enum class bootstrap_state { + NEEDS_BOOTSTRAP, + COMPLETED, + IN_PROGRESS +}; + +#if 0 private static DecoratedKey decorate(ByteBuffer key) { return StorageService.getPartitioner().decorateKey(key); @@ -324,6 +343,7 @@ load_dc_rack_info(); executeInternal(String.format(req, LOCAL, LOCAL), tokensAsSet(tokens)); forceBlockingFlush(LOCAL); } +#endif /** * Convenience method to update the list of tokens in the local system keyspace. @@ -332,17 +352,11 @@ load_dc_rack_info(); * @param rmTokens tokens to remove * @return the collection of persisted tokens */ - public static synchronized Collection updateLocalTokens(Collection addTokens, Collection rmTokens) - { - Collection tokens = getSavedTokens(); - tokens.removeAll(rmTokens); - tokens.addAll(addTokens); - updateTokens(tokens); - return tokens; - } -#endif + future> update_local_tokens( + const std::unordered_set& add_tokens, + const std::unordered_set& rm_tokens); -#if o +#if 0 /** * Return a map of stored tokens to IP addresses * @@ -456,14 +470,9 @@ load_dc_rack_info(); throw new ConfigurationException("Saved cluster name " + savedClusterName + " != configured name " + DatabaseDescriptor.getClusterName()); } - public static Collection getSavedTokens() - { - String req = "SELECT tokens FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); - return result.isEmpty() || !result.one().has("tokens") - ? Collections.emptyList() - : deserializeTokens(result.one().getSet("tokens", UTF8Type.instance)); - } +#endif + std::unordered_set get_saved_tokens(); +#if 0 public static int incrementAndGetGeneration() { @@ -501,35 +510,14 @@ load_dc_rack_info(); return generation; } +#endif - public static BootstrapState getBootstrapState() - { - String req = "SELECT bootstrapped FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); - - if (result.isEmpty() || !result.one().has("bootstrapped")) - return BootstrapState.NEEDS_BOOTSTRAP; - - return BootstrapState.valueOf(result.one().getString("bootstrapped")); - } - - public static boolean bootstrapComplete() - { - return getBootstrapState() == BootstrapState.COMPLETED; - } - - public static boolean bootstrapInProgress() - { - return getBootstrapState() == BootstrapState.IN_PROGRESS; - } - - public static void setBootstrapState(BootstrapState state) - { - String req = "INSERT INTO system.%s (key, bootstrapped) VALUES ('%s', ?)"; - executeInternal(String.format(req, LOCAL, LOCAL), state.name()); - forceBlockingFlush(LOCAL); - } +bool bootstrap_complete(); +bool bootstrap_in_progress(); +bootstrap_state get_bootstrap_state(); +future<> set_bootstrap_state(bootstrap_state state); +#if 0 public static boolean isIndexBuilt(String keyspaceName, String indexName) { ColumnFamilyStore cfs = Keyspace.open(NAME).getColumnFamilyStore(BUILT_INDEXES); diff --git a/service/storage_service.cc b/service/storage_service.cc index 487dce6c15..3a9f016feb 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -58,6 +58,11 @@ std::experimental::optional get_replace_address() { return {}; } +std::unordered_set get_initial_tokens() { + // FIXME: DatabaseDescriptor.getInitialTokens(); + return std::unordered_set(); +} + bool get_property_join_ring() { // FIXME: Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true"))) return true; @@ -155,7 +160,6 @@ future<> storage_service::prepare_to_join() { future<> storage_service::join_token_ring(int delay) { return seastar::async([this, delay] { _joined = true; -#if 0 // We bootstrap if we haven't successfully bootstrapped before, as long as we are not a seed. // If we are a seed, or if the user manually sets auto_bootstrap to false, // we'll skip streaming data from other nodes and jump directly into the ring. @@ -165,7 +169,8 @@ future<> storage_service::join_token_ring(int delay) { // // We attempted to replace this with a schema-presence check, but you need a meaningful sleep // to get schema info from gossip which defeats the purpose. See CASSANDRA-4427 for the gory details. - Set current = new HashSet<>(); + std::unordered_set current; +#if 0 logger.debug("Bootstrap variables: {} {} {} {}", DatabaseDescriptor.isAutoBootstrap(), SystemKeyspace.bootstrapInProgress(), @@ -235,12 +240,12 @@ future<> storage_service::join_token_ring(int delay) { for (auto token : _bootstrap_tokens) { auto existing = _token_metadata.get_endpoint(token); if (existing) { -#if 0 - long nanoDelay = delay * 1000000L; - if (Gossiper.instance.getEndpointStateForEndpoint(existing).getUpdateTimestamp() > (System.nanoTime() - nanoDelay)) - throw new UnsupportedOperationException("Cannot replace a live node... "); - current.add(existing); -#endif + auto& gossiper = gms::get_local_gossiper(); + auto eps = gossiper.get_endpoint_state_for_endpoint(*existing); + if (eps && eps->get_update_timestamp() > gms::gossiper::clk::now() - std::chrono::milliseconds(delay)) { + throw std::runtime_error("Cannot replace a live node..."); + } + current.insert(*existing); } else { throw std::runtime_error(sprint("Cannot replace token %s which does not exist!", token)); } @@ -257,87 +262,71 @@ future<> storage_service::join_token_ring(int delay) { // assert(!_is_bootstrap_mode); // bootstrap will block until finished } else { size_t num_tokens = _db.local().get_config().num_tokens(); - _bootstrap_tokens = boot_strapper::get_random_tokens(_token_metadata, num_tokens); - logger.info("Generated random tokens. tokens are {}", _bootstrap_tokens); -#if 0 - _bootstrap_tokens = SystemKeyspace.getSavedTokens(); - if (_bootstrap_tokens.isEmpty()) - { - Collection initialTokens = DatabaseDescriptor.getInitialTokens(); - if (initialTokens.size() < 1) - { - _bootstrap_tokens = BootStrapper.getRandomTokens(_token_metadata, DatabaseDescriptor.getNumTokens()); - if (DatabaseDescriptor.getNumTokens() == 1) + _bootstrap_tokens = db::system_keyspace::get_saved_tokens(); + if (_bootstrap_tokens.empty()) { + auto initial_tokens = get_initial_tokens(); + if (initial_tokens.size() < 1) { + _bootstrap_tokens = boot_strapper::get_random_tokens(_token_metadata, num_tokens); + if (num_tokens == 1) { logger.warn("Generated random token {}. Random tokens will result in an unbalanced ring; see http://wiki.apache.org/cassandra/Operations", _bootstrap_tokens); - else + } else { logger.info("Generated random tokens. tokens are {}", _bootstrap_tokens); - } - else - { - _bootstrap_tokens = new ArrayList(initialTokens.size()); - for (String token : initialTokens) - _bootstrap_tokens.add(getPartitioner().getTokenFactory().fromString(token)); + } + } else { + for (auto token : initial_tokens) { + // FIXME: token from string + // _bootstrap_tokens.insert(getPartitioner().getTokenFactory().fromString(token)); + } logger.info("Saved tokens not found. Using configuration value: {}", _bootstrap_tokens); } - } - else - { - if (_bootstrap_tokens.size() != DatabaseDescriptor.getNumTokens()) - throw new ConfigurationException("Cannot change the number of tokens from " + _bootstrap_tokens.size() + " to " + DatabaseDescriptor.getNumTokens()); - else + } else { + if (_bootstrap_tokens.size() != num_tokens) { + throw std::runtime_error(sprint("Cannot change the number of tokens from %ld to %ld", _bootstrap_tokens.size(), num_tokens)); + } else { logger.info("Using saved tokens {}", _bootstrap_tokens); + } } -#endif } - set_tokens(_bootstrap_tokens).get(); #if 0 // if we don't have system_traces keyspace at this point, then create it manually if (Schema.instance.getKSMetaData(TraceKeyspace.NAME) == null) MigrationManager.announceNewKeyspace(TraceKeyspace.definition(), 0, false); +#endif - if (!_is_survey_mode) - { + if (!_is_survey_mode) { // start participating in the ring. - SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED); - set_tokens(_bootstrap_tokens); + //SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED); + set_tokens(_bootstrap_tokens).get(); // remove the existing info about the replaced node. - if (!current.isEmpty()) - for (InetAddress existing : current) - Gossiper.instance.replacedEndpoint(existing); - assert _token_metadata.sortedTokens().size() > 0; - - Auth.setup(); - } - else - { + if (!current.empty()) { + auto& gossiper = gms::get_local_gossiper(); + for (auto existing : current) { + gossiper.replaced_endpoint(existing); + } + } + assert(_token_metadata.sorted_tokens().size() > 0); + //Auth.setup(); + } else { logger.info("Startup complete, but write survey mode is active, not becoming an active ring member. Use JMX (StorageService->joinRing()) to finalize ring joining."); } -#endif }); } -void storage_service::join_ring() { -#if 0 - if (!joined) { +future<> storage_service::join_ring() { + if (!_joined) { logger.info("Joining ring by operator request"); - try - { - joinTokenRing(0); - } - catch (ConfigurationException e) - { - throw new IOException(e.getMessage()); - } + return join_token_ring(0); } else if (_is_survey_mode) { - set_tokens(SystemKeyspace.getSavedTokens()); - SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED); - _is_survey_mode = false; - logger.info("Leaving write survey mode and joining ring at operator request"); - assert _token_metadata.sortedTokens().size() > 0; - - Auth.setup(); + return set_tokens(db::system_keyspace::get_saved_tokens()).then([this] { + //SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED); + _is_survey_mode = false; + logger.info("Leaving write survey mode and joining ring at operator request"); + assert(_token_metadata.sorted_tokens().size() > 0); + //Auth.setup(); + return make_ready_future<>(); + }); } -#endif + return make_ready_future<>(); } future<> storage_service::bootstrap(std::unordered_set tokens) { @@ -489,10 +478,10 @@ void storage_service::handle_state_normal(inet_address endpoint) { _token_metadata.update_normal_tokens(tokens_to_update_in_metadata, endpoint); for (auto ep : endpoints_to_remove) { remove_endpoint(ep); -#if 0 - if (DatabaseDescriptor.isReplacing() && DatabaseDescriptor.getReplaceAddress().equals(ep)) - Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260 -#endif + auto replace_addr = get_replace_address(); + if (is_replacing() && replace_addr && *replace_addr == ep) { + gossiper.replacement_quarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260 + } } logger.debug("ep={} tokens_to_update_in_system_keyspace = {}", endpoint, tokens_to_update_in_system_keyspace); if (!tokens_to_update_in_system_keyspace.empty()) { @@ -506,7 +495,7 @@ void storage_service::handle_state_normal(inet_address endpoint) { }).get(); } if (!local_tokens_to_remove.empty()) { - // SystemKeyspace.updateLocalTokens(Collections.emptyList(), local_tokens_to_remove); + db::system_keyspace::update_local_tokens(std::unordered_set(), local_tokens_to_remove).discard_result().get(); } if (is_moving) { @@ -710,8 +699,8 @@ void storage_service::on_change(inet_address endpoint, application_state state, void storage_service::on_remove(gms::inet_address endpoint) { logger.debug("on_remove endpoint={}", endpoint); + _token_metadata.remove_endpoint(endpoint); #if 0 - _token_metadata.removeEndpoint(endpoint); PendingRangeCalculatorService.instance.update(); #endif } diff --git a/service/storage_service.hh b/service/storage_service.hh index fce933d7da..81f3cf38e6 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -371,7 +371,7 @@ private: future<> prepare_to_join(); future<> join_token_ring(int delay); public: - void join_ring(); + future<> join_ring(); bool is_joined() { return _joined; }