mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-26 03:20:37 +00:00
Merge "storage_service update" from Asias
"I'm leaving the following functions get_saved_tokens() get_bootstrap_state() set_bootstrap_state() for people more familiar with execute_cql."
This commit is contained in:
@@ -377,42 +377,6 @@ schema_ptr size_estimates() {
|
||||
return size_estimates;
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
||||
public static KSMetaData definition()
|
||||
{
|
||||
Iterable<CFMetaData> tables =
|
||||
Iterables.concat(LegacySchemaTables.All,
|
||||
Arrays.asList(BuiltIndexes,
|
||||
Hints,
|
||||
Batchlog,
|
||||
Paxos,
|
||||
Local,
|
||||
Peers,
|
||||
PeerEvents,
|
||||
RangeXfers,
|
||||
CompactionsInProgress,
|
||||
CompactionHistory,
|
||||
SSTableActivity));
|
||||
return new KSMetaData(NAME, LocalStrategy.class, Collections.<String, String>emptyMap(), true, tables);
|
||||
}
|
||||
|
||||
private static volatile Map<UUID, Pair<ReplayPosition, Long>> truncationRecords;
|
||||
|
||||
public enum BootstrapState
|
||||
{
|
||||
NEEDS_BOOTSTRAP,
|
||||
COMPLETED,
|
||||
IN_PROGRESS
|
||||
}
|
||||
|
||||
private static DecoratedKey decorate(ByteBuffer key)
|
||||
{
|
||||
return StorageService.getPartitioner().decorateKey(key);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
static future<> setup_version() {
|
||||
sstring req = "INSERT INTO system.%s (key, release_version, cql_version, thrift_version, native_protocol_version, data_center, rack, partitioner, rpc_address, broadcast_address, listen_address) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
|
||||
auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr();
|
||||
@@ -491,104 +455,6 @@ future<> setup(distributed<database>& db, distributed<cql3::query_processor>& qp
|
||||
});
|
||||
}
|
||||
|
||||
#if 0
|
||||
/**
|
||||
* Write compaction log, except columfamilies under system keyspace.
|
||||
*
|
||||
* @param cfs cfs to compact
|
||||
* @param toCompact sstables to compact
|
||||
* @return compaction task id or null if cfs is under system keyspace
|
||||
*/
|
||||
public static UUID startCompaction(ColumnFamilyStore cfs, Iterable<SSTableReader> toCompact)
|
||||
{
|
||||
if (NAME.equals(cfs.keyspace.getName()))
|
||||
return null;
|
||||
|
||||
UUID compactionId = UUIDGen.getTimeUUID();
|
||||
Iterable<Integer> generations = Iterables.transform(toCompact, new Function<SSTableReader, Integer>()
|
||||
{
|
||||
public Integer apply(SSTableReader sstable)
|
||||
{
|
||||
return sstable.descriptor.generation;
|
||||
}
|
||||
});
|
||||
String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (?, ?, ?, ?)";
|
||||
executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS), compactionId, cfs.keyspace.getName(), cfs.name, Sets.newHashSet(generations));
|
||||
forceBlockingFlush(COMPACTIONS_IN_PROGRESS);
|
||||
return compactionId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes the entry for this compaction from the set of compactions in progress. The compaction does not need
|
||||
* to complete successfully for this to be called.
|
||||
* @param taskId what was returned from {@code startCompaction}
|
||||
*/
|
||||
public static void finishCompaction(UUID taskId)
|
||||
{
|
||||
assert taskId != null;
|
||||
|
||||
executeInternal(String.format("DELETE FROM system.%s WHERE id = ?", COMPACTIONS_IN_PROGRESS), taskId);
|
||||
forceBlockingFlush(COMPACTIONS_IN_PROGRESS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a Map whose keys are KS.CF pairs and whose values are maps from sstable generation numbers to the
|
||||
* task ID of the compaction they were participating in.
|
||||
*/
|
||||
public static Map<Pair<String, String>, Map<Integer, UUID>> getUnfinishedCompactions()
|
||||
{
|
||||
String req = "SELECT * FROM system.%s";
|
||||
UntypedResultSet resultSet = executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS));
|
||||
|
||||
Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = new HashMap<>();
|
||||
for (UntypedResultSet.Row row : resultSet)
|
||||
{
|
||||
String keyspace = row.getString("keyspace_name");
|
||||
String columnfamily = row.getString("columnfamily_name");
|
||||
Set<Integer> inputs = row.getSet("inputs", Int32Type.instance);
|
||||
UUID taskID = row.getUUID("id");
|
||||
|
||||
Pair<String, String> kscf = Pair.create(keyspace, columnfamily);
|
||||
Map<Integer, UUID> generationToTaskID = unfinishedCompactions.get(kscf);
|
||||
if (generationToTaskID == null)
|
||||
generationToTaskID = new HashMap<>(inputs.size());
|
||||
|
||||
for (Integer generation : inputs)
|
||||
generationToTaskID.put(generation, taskID);
|
||||
|
||||
unfinishedCompactions.put(kscf, generationToTaskID);
|
||||
}
|
||||
return unfinishedCompactions;
|
||||
}
|
||||
|
||||
public static void discardCompactionsInProgress()
|
||||
{
|
||||
ColumnFamilyStore compactionLog = Keyspace.open(NAME).getColumnFamilyStore(COMPACTIONS_IN_PROGRESS);
|
||||
compactionLog.truncateBlocking();
|
||||
}
|
||||
|
||||
public static void updateCompactionHistory(String ksname,
|
||||
String cfname,
|
||||
long compactedAt,
|
||||
long bytesIn,
|
||||
long bytesOut,
|
||||
Map<Integer, Long> rowsMerged)
|
||||
{
|
||||
// don't write anything when the history table itself is compacted, since that would in turn cause new compactions
|
||||
if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY))
|
||||
return;
|
||||
String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) VALUES (?, ?, ?, ?, ?, ?, ?)";
|
||||
executeInternal(String.format(req, COMPACTION_HISTORY), UUIDGen.getTimeUUID(), ksname, cfname, ByteBufferUtil.bytes(compactedAt), bytesIn, bytesOut, rowsMerged);
|
||||
}
|
||||
|
||||
public static TabularData getCompactionHistory() throws OpenDataException
|
||||
{
|
||||
UntypedResultSet queryResultSet = executeInternal(String.format("SELECT * from system.%s", COMPACTION_HISTORY));
|
||||
return CompactionHistoryTabularData.from(queryResultSet);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
typedef std::pair<db::replay_position, db_clock::time_point> truncation_entry;
|
||||
typedef std::unordered_map<utils::UUID, truncation_entry> truncation_map;
|
||||
static thread_local std::experimental::optional<truncation_map> truncation_records;
|
||||
@@ -679,6 +545,21 @@ future<> update_tokens(gms::inet_address ep, std::unordered_set<dht::token> toke
|
||||
});
|
||||
}
|
||||
|
||||
future<std::unordered_set<dht::token>> update_local_tokens(
|
||||
const std::unordered_set<dht::token>& add_tokens,
|
||||
const std::unordered_set<dht::token>& rm_tokens) {
|
||||
auto tokens = get_saved_tokens();
|
||||
for (auto& x : rm_tokens) {
|
||||
tokens.erase(x);
|
||||
}
|
||||
for (auto& x : add_tokens) {
|
||||
tokens.insert(x);
|
||||
}
|
||||
return update_tokens(tokens).then([tokens] {
|
||||
return tokens;
|
||||
});
|
||||
}
|
||||
|
||||
future<> update_preferred_ip(gms::inet_address ep, gms::inet_address preferred_ip) {
|
||||
sstring req = "INSERT INTO system.%s (peer, preferred_ip) VALUES (?, ?)";
|
||||
return execute_cql(req, PEERS, ep.addr(), preferred_ip).discard_result().then([] {
|
||||
@@ -733,27 +614,6 @@ future<> update_schema_version(utils::UUID version) {
|
||||
return execute_cql(req, LOCAL, sstring(LOCAL), version).discard_result();
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
||||
private static Set<String> tokensAsSet(Collection<Token> tokens)
|
||||
{
|
||||
Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
|
||||
Set<String> s = new HashSet<>(tokens.size());
|
||||
for (Token tk : tokens)
|
||||
s.add(factory.toString(tk));
|
||||
return s;
|
||||
}
|
||||
|
||||
private static Collection<Token> deserializeTokens(Collection<String> tokensStrings)
|
||||
{
|
||||
Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
|
||||
List<Token> tokens = new ArrayList<>(tokensStrings.size());
|
||||
for (String tk : tokensStrings)
|
||||
tokens.add(factory.fromString(tk));
|
||||
return tokens;
|
||||
}
|
||||
|
||||
#endif
|
||||
/**
|
||||
* Remove stored tokens being used by another node
|
||||
*/
|
||||
@@ -780,25 +640,6 @@ future<> update_tokens(std::unordered_set<dht::token> tokens) {
|
||||
});
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
||||
/**
|
||||
* Convenience method to update the list of tokens in the local system keyspace.
|
||||
*
|
||||
* @param addTokens tokens to add
|
||||
* @param rmTokens tokens to remove
|
||||
* @return the collection of persisted tokens
|
||||
*/
|
||||
public static synchronized Collection<Token> updateLocalTokens(Collection<Token> addTokens, Collection<Token> rmTokens)
|
||||
{
|
||||
Collection<Token> tokens = getSavedTokens();
|
||||
tokens.removeAll(rmTokens);
|
||||
tokens.addAll(addTokens);
|
||||
updateTokens(tokens);
|
||||
return tokens;
|
||||
}
|
||||
#endif
|
||||
|
||||
future<> force_blocking_flush(sstring cfname) {
|
||||
if (!qctx) {
|
||||
return make_ready_future<>();
|
||||
@@ -811,78 +652,6 @@ future<> force_blocking_flush(sstring cfname) {
|
||||
});
|
||||
}
|
||||
|
||||
#if 0
|
||||
/**
|
||||
* Return a map of stored tokens to IP addresses
|
||||
*
|
||||
*/
|
||||
public static SetMultimap<InetAddress, Token> loadTokens()
|
||||
{
|
||||
SetMultimap<InetAddress, Token> tokenMap = HashMultimap.create();
|
||||
for (UntypedResultSet.Row row : executeInternal("SELECT peer, tokens FROM system." + PEERS))
|
||||
{
|
||||
InetAddress peer = row.getInetAddress("peer");
|
||||
if (row.has("tokens"))
|
||||
tokenMap.putAll(peer, deserializeTokens(row.getSet("tokens", UTF8Type.instance)));
|
||||
}
|
||||
|
||||
return tokenMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a map of store host_ids to IP addresses
|
||||
*
|
||||
*/
|
||||
public static Map<InetAddress, UUID> loadHostIds()
|
||||
{
|
||||
Map<InetAddress, UUID> hostIdMap = new HashMap<>();
|
||||
for (UntypedResultSet.Row row : executeInternal("SELECT peer, host_id FROM system." + PEERS))
|
||||
{
|
||||
InetAddress peer = row.getInetAddress("peer");
|
||||
if (row.has("host_id"))
|
||||
{
|
||||
hostIdMap.put(peer, row.getUUID("host_id"));
|
||||
}
|
||||
}
|
||||
return hostIdMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get preferred IP for given endpoint if it is known. Otherwise this returns given endpoint itself.
|
||||
*
|
||||
* @param ep endpoint address to check
|
||||
* @return Preferred IP for given endpoint if present, otherwise returns given ep
|
||||
*/
|
||||
public static InetAddress getPreferredIP(InetAddress ep)
|
||||
{
|
||||
String req = "SELECT preferred_ip FROM system.%s WHERE peer=?";
|
||||
UntypedResultSet result = executeInternal(String.format(req, PEERS), ep);
|
||||
if (!result.isEmpty() && result.one().has("preferred_ip"))
|
||||
return result.one().getInetAddress("preferred_ip");
|
||||
return ep;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a map of IP addresses containing a map of dc and rack info
|
||||
*/
|
||||
public static Map<InetAddress, Map<String,String>> loadDcRackInfo()
|
||||
{
|
||||
Map<InetAddress, Map<String, String>> result = new HashMap<>();
|
||||
for (UntypedResultSet.Row row : executeInternal("SELECT peer, data_center, rack from system." + PEERS))
|
||||
{
|
||||
InetAddress peer = row.getInetAddress("peer");
|
||||
if (row.has("data_center") && row.has("rack"))
|
||||
{
|
||||
Map<String, String> dcRack = new HashMap<>();
|
||||
dcRack.put("data_center", row.getString("data_center"));
|
||||
dcRack.put("rack", row.getString("rack"));
|
||||
result.put(peer, dcRack);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
#endif
|
||||
/**
|
||||
* One of three things will happen if you try to read the system keyspace:
|
||||
* 1. files are present and you can read them: great
|
||||
@@ -910,247 +679,47 @@ future<> check_health() {
|
||||
});
|
||||
}
|
||||
|
||||
std::unordered_set<dht::token> get_saved_tokens() {
|
||||
#if 0
|
||||
public static Collection<Token> getSavedTokens()
|
||||
{
|
||||
String req = "SELECT tokens FROM system.%s WHERE key='%s'";
|
||||
UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
|
||||
return result.isEmpty() || !result.one().has("tokens")
|
||||
? Collections.<Token>emptyList()
|
||||
: deserializeTokens(result.one().getSet("tokens", UTF8Type.instance));
|
||||
}
|
||||
|
||||
public static int incrementAndGetGeneration()
|
||||
{
|
||||
String req = "SELECT gossip_generation FROM system.%s WHERE key='%s'";
|
||||
UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
|
||||
|
||||
int generation;
|
||||
if (result.isEmpty() || !result.one().has("gossip_generation"))
|
||||
{
|
||||
// seconds-since-epoch isn't a foolproof new generation
|
||||
// (where foolproof is "guaranteed to be larger than the last one seen at this ip address"),
|
||||
// but it's as close as sanely possible
|
||||
generation = (int) (System.currentTimeMillis() / 1000);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Other nodes will ignore gossip messages about a node that have a lower generation than previously seen.
|
||||
final int storedGeneration = result.one().getInt("gossip_generation") + 1;
|
||||
final int now = (int) (System.currentTimeMillis() / 1000);
|
||||
if (storedGeneration >= now)
|
||||
{
|
||||
logger.warn("Using stored Gossip Generation {} as it is greater than current system time {}. See CASSANDRA-3654 if you experience problems",
|
||||
storedGeneration, now);
|
||||
generation = storedGeneration;
|
||||
}
|
||||
else
|
||||
{
|
||||
generation = now;
|
||||
}
|
||||
}
|
||||
|
||||
req = "INSERT INTO system.%s (key, gossip_generation) VALUES ('%s', ?)";
|
||||
executeInternal(String.format(req, LOCAL, LOCAL), generation);
|
||||
forceBlockingFlush(LOCAL);
|
||||
|
||||
return generation;
|
||||
}
|
||||
|
||||
public static BootstrapState getBootstrapState()
|
||||
{
|
||||
String req = "SELECT bootstrapped FROM system.%s WHERE key='%s'";
|
||||
UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
|
||||
|
||||
if (result.isEmpty() || !result.one().has("bootstrapped"))
|
||||
return BootstrapState.NEEDS_BOOTSTRAP;
|
||||
|
||||
return BootstrapState.valueOf(result.one().getString("bootstrapped"));
|
||||
}
|
||||
|
||||
public static boolean bootstrapComplete()
|
||||
{
|
||||
return getBootstrapState() == BootstrapState.COMPLETED;
|
||||
}
|
||||
|
||||
public static boolean bootstrapInProgress()
|
||||
{
|
||||
return getBootstrapState() == BootstrapState.IN_PROGRESS;
|
||||
}
|
||||
String req = "SELECT tokens FROM system.%s WHERE key='%s'";
|
||||
UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
|
||||
return result.isEmpty() || !result.one().has("tokens")
|
||||
? Collections.<Token>emptyList()
|
||||
: deserializeTokens(result.one().getSet("tokens", UTF8Type.instance));
|
||||
#endif
|
||||
return std::unordered_set<dht::token>();
|
||||
}
|
||||
|
||||
bool bootstrap_complete() {
|
||||
return get_bootstrap_state() == bootstrap_state::COMPLETED;
|
||||
}
|
||||
|
||||
bool bootstrap_in_progress() {
|
||||
return get_bootstrap_state() == bootstrap_state::IN_PROGRESS;
|
||||
}
|
||||
|
||||
bootstrap_state get_bootstrap_state() {
|
||||
#if 0
|
||||
String req = "SELECT bootstrapped FROM system.%s WHERE key='%s'";
|
||||
UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
|
||||
|
||||
if (result.isEmpty() || !result.one().has("bootstrapped"))
|
||||
return BootstrapState.NEEDS_BOOTSTRAP;
|
||||
|
||||
return BootstrapState.valueOf(result.one().getString("bootstrapped"));
|
||||
#endif
|
||||
return bootstrap_state::NEEDS_BOOTSTRAP;
|
||||
}
|
||||
|
||||
future<> set_bootstrap_state(bootstrap_state state) {
|
||||
#if 0
|
||||
sstring req = "INSERT INTO system.%s (key, bootstrapped) VALUES ('%s', '%s')";
|
||||
return execute_cql(req, LOCAL, LOCAL, state.name()).discard_result().then([] {
|
||||
return force_blocking_flush(LOCAL);
|
||||
});
|
||||
#endif
|
||||
return make_ready_future<>();
|
||||
}
|
||||
#endif
|
||||
|
||||
#if 0
|
||||
|
||||
public static boolean isIndexBuilt(String keyspaceName, String indexName)
|
||||
{
|
||||
ColumnFamilyStore cfs = Keyspace.open(NAME).getColumnFamilyStore(BUILT_INDEXES);
|
||||
QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes(keyspaceName)),
|
||||
BUILT_INDEXES,
|
||||
FBUtilities.singleton(cfs.getComparator().makeCellName(indexName), cfs.getComparator()),
|
||||
System.currentTimeMillis());
|
||||
return ColumnFamilyStore.removeDeleted(cfs.getColumnFamily(filter), Integer.MAX_VALUE) != null;
|
||||
}
|
||||
|
||||
public static void setIndexBuilt(String keyspaceName, String indexName)
|
||||
{
|
||||
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(NAME, BUILT_INDEXES);
|
||||
cf.addColumn(new BufferCell(cf.getComparator().makeCellName(indexName), ByteBufferUtil.EMPTY_BYTE_BUFFER, FBUtilities.timestampMicros()));
|
||||
new Mutation(NAME, ByteBufferUtil.bytes(keyspaceName), cf).apply();
|
||||
}
|
||||
|
||||
public static void setIndexRemoved(String keyspaceName, String indexName)
|
||||
{
|
||||
Mutation mutation = new Mutation(NAME, ByteBufferUtil.bytes(keyspaceName));
|
||||
mutation.delete(BUILT_INDEXES, BuiltIndexes.comparator.makeCellName(indexName), FBUtilities.timestampMicros());
|
||||
mutation.apply();
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the host ID from the system keyspace, creating (and storing) one if
|
||||
* none exists.
|
||||
*/
|
||||
public static UUID getLocalHostId()
|
||||
{
|
||||
String req = "SELECT host_id FROM system.%s WHERE key='%s'";
|
||||
UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
|
||||
|
||||
// Look up the Host UUID (return it if found)
|
||||
if (!result.isEmpty() && result.one().has("host_id"))
|
||||
return result.one().getUUID("host_id");
|
||||
|
||||
// ID not found, generate a new one, persist, and then return it.
|
||||
UUID hostId = UUID.randomUUID();
|
||||
logger.warn("No host ID found, created {} (Note: This should happen exactly once per node).", hostId);
|
||||
return setLocalHostId(hostId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the local host ID explicitly. Should only be called outside of SystemTable when replacing a node.
|
||||
*/
|
||||
public static UUID setLocalHostId(UUID hostId)
|
||||
{
|
||||
String req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', ?)";
|
||||
executeInternal(String.format(req, LOCAL, LOCAL), hostId);
|
||||
return hostId;
|
||||
}
|
||||
|
||||
public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata)
|
||||
{
|
||||
String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?";
|
||||
UntypedResultSet results = executeInternal(String.format(req, PAXOS), key, metadata.cfId);
|
||||
if (results.isEmpty())
|
||||
return new PaxosState(key, metadata);
|
||||
UntypedResultSet.Row row = results.one();
|
||||
Commit promised = row.has("in_progress_ballot")
|
||||
? new Commit(key, row.getUUID("in_progress_ballot"), ArrayBackedSortedColumns.factory.create(metadata))
|
||||
: Commit.emptyCommit(key, metadata);
|
||||
// either we have both a recently accepted ballot and update or we have neither
|
||||
Commit accepted = row.has("proposal")
|
||||
? new Commit(key, row.getUUID("proposal_ballot"), ColumnFamily.fromBytes(row.getBytes("proposal")))
|
||||
: Commit.emptyCommit(key, metadata);
|
||||
// either most_recent_commit and most_recent_commit_at will both be set, or neither
|
||||
Commit mostRecent = row.has("most_recent_commit")
|
||||
? new Commit(key, row.getUUID("most_recent_commit_at"), ColumnFamily.fromBytes(row.getBytes("most_recent_commit")))
|
||||
: Commit.emptyCommit(key, metadata);
|
||||
return new PaxosState(promised, accepted, mostRecent);
|
||||
}
|
||||
|
||||
public static void savePaxosPromise(Commit promise)
|
||||
{
|
||||
String req = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET in_progress_ballot = ? WHERE row_key = ? AND cf_id = ?";
|
||||
executeInternal(String.format(req, PAXOS),
|
||||
UUIDGen.microsTimestamp(promise.ballot),
|
||||
paxosTtl(promise.update.metadata),
|
||||
promise.ballot,
|
||||
promise.key,
|
||||
promise.update.id());
|
||||
}
|
||||
|
||||
public static void savePaxosProposal(Commit proposal)
|
||||
{
|
||||
executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ? WHERE row_key = ? AND cf_id = ?", PAXOS),
|
||||
UUIDGen.microsTimestamp(proposal.ballot),
|
||||
paxosTtl(proposal.update.metadata),
|
||||
proposal.ballot,
|
||||
proposal.update.toBytes(),
|
||||
proposal.key,
|
||||
proposal.update.id());
|
||||
}
|
||||
|
||||
private static int paxosTtl(CFMetaData metadata)
|
||||
{
|
||||
// keep paxos state around for at least 3h
|
||||
return Math.max(3 * 3600, metadata.getGcGraceSeconds());
|
||||
}
|
||||
|
||||
public static void savePaxosCommit(Commit commit)
|
||||
{
|
||||
// We always erase the last proposal (with the commit timestamp to no erase more recent proposal in case the commit is old)
|
||||
// even though that's really just an optimization since SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc.
|
||||
String cql = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, most_recent_commit_at = ?, most_recent_commit = ? WHERE row_key = ? AND cf_id = ?";
|
||||
executeInternal(String.format(cql, PAXOS),
|
||||
UUIDGen.microsTimestamp(commit.ballot),
|
||||
paxosTtl(commit.update.metadata),
|
||||
commit.ballot,
|
||||
commit.update.toBytes(),
|
||||
commit.key,
|
||||
commit.update.id());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a RestorableMeter tracking the average read rate of a particular SSTable, restoring the last-seen rate
|
||||
* from values in system.sstable_activity if present.
|
||||
* @param keyspace the keyspace the sstable belongs to
|
||||
* @param table the table the sstable belongs to
|
||||
* @param generation the generation number for the sstable
|
||||
*/
|
||||
public static RestorableMeter getSSTableReadMeter(String keyspace, String table, int generation)
|
||||
{
|
||||
String cql = "SELECT * FROM system.%s WHERE keyspace_name=? and columnfamily_name=? and generation=?";
|
||||
UntypedResultSet results = executeInternal(String.format(cql, SSTABLE_ACTIVITY), keyspace, table, generation);
|
||||
|
||||
if (results.isEmpty())
|
||||
return new RestorableMeter();
|
||||
|
||||
UntypedResultSet.Row row = results.one();
|
||||
double m15rate = row.getDouble("rate_15m");
|
||||
double m120rate = row.getDouble("rate_120m");
|
||||
return new RestorableMeter(m15rate, m120rate);
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes the current read rates for a given SSTable to system.sstable_activity
|
||||
*/
|
||||
public static void persistSSTableReadMeter(String keyspace, String table, int generation, RestorableMeter meter)
|
||||
{
|
||||
// Store values with a one-day TTL to handle corner cases where cleanup might not occur
|
||||
String cql = "INSERT INTO system.%s (keyspace_name, columnfamily_name, generation, rate_15m, rate_120m) VALUES (?, ?, ?, ?, ?) USING TTL 864000";
|
||||
executeInternal(String.format(cql, SSTABLE_ACTIVITY),
|
||||
keyspace,
|
||||
table,
|
||||
generation,
|
||||
meter.fifteenMinuteRate(),
|
||||
meter.twoHourRate());
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears persisted read rates from system.sstable_activity for SSTables that have been deleted.
|
||||
*/
|
||||
public static void clearSSTableReadMeter(String keyspace, String table, int generation)
|
||||
{
|
||||
String cql = "DELETE FROM system.%s WHERE keyspace_name=? AND columnfamily_name=? and generation=?";
|
||||
executeInternal(String.format(cql, SSTABLE_ACTIVITY), keyspace, table, generation);
|
||||
}
|
||||
#endif
|
||||
|
||||
std::vector<schema_ptr> all_tables() {
|
||||
std::vector<schema_ptr> r;
|
||||
|
||||
@@ -105,16 +105,35 @@ std::unordered_map<gms::inet_address, locator::endpoint_dc_rack>
|
||||
load_dc_rack_info();
|
||||
|
||||
#if 0
|
||||
|
||||
private static volatile Map<UUID, Pair<ReplayPosition, Long>> truncationRecords;
|
||||
|
||||
public enum BootstrapState
|
||||
public static KSMetaData definition()
|
||||
{
|
||||
NEEDS_BOOTSTRAP,
|
||||
COMPLETED,
|
||||
IN_PROGRESS
|
||||
Iterable<CFMetaData> tables =
|
||||
Iterables.concat(LegacySchemaTables.All,
|
||||
Arrays.asList(BuiltIndexes,
|
||||
Hints,
|
||||
Batchlog,
|
||||
Paxos,
|
||||
Local,
|
||||
Peers,
|
||||
PeerEvents,
|
||||
RangeXfers,
|
||||
CompactionsInProgress,
|
||||
CompactionHistory,
|
||||
SSTableActivity));
|
||||
return new KSMetaData(NAME, LocalStrategy.class, Collections.<String, String>emptyMap(), true, tables);
|
||||
}
|
||||
|
||||
private static volatile Map<UUID, Pair<ReplayPosition, Long>> truncationRecords;
|
||||
private static volatile Map<UUID, Pair<ReplayPosition, Long>> truncationRecords;
|
||||
#endif
|
||||
|
||||
enum class bootstrap_state {
|
||||
NEEDS_BOOTSTRAP,
|
||||
COMPLETED,
|
||||
IN_PROGRESS
|
||||
};
|
||||
|
||||
#if 0
|
||||
private static DecoratedKey decorate(ByteBuffer key)
|
||||
{
|
||||
return StorageService.getPartitioner().decorateKey(key);
|
||||
@@ -324,6 +343,7 @@ load_dc_rack_info();
|
||||
executeInternal(String.format(req, LOCAL, LOCAL), tokensAsSet(tokens));
|
||||
forceBlockingFlush(LOCAL);
|
||||
}
|
||||
#endif
|
||||
|
||||
/**
|
||||
* Convenience method to update the list of tokens in the local system keyspace.
|
||||
@@ -332,17 +352,11 @@ load_dc_rack_info();
|
||||
* @param rmTokens tokens to remove
|
||||
* @return the collection of persisted tokens
|
||||
*/
|
||||
public static synchronized Collection<Token> updateLocalTokens(Collection<Token> addTokens, Collection<Token> rmTokens)
|
||||
{
|
||||
Collection<Token> tokens = getSavedTokens();
|
||||
tokens.removeAll(rmTokens);
|
||||
tokens.addAll(addTokens);
|
||||
updateTokens(tokens);
|
||||
return tokens;
|
||||
}
|
||||
#endif
|
||||
future<std::unordered_set<dht::token>> update_local_tokens(
|
||||
const std::unordered_set<dht::token>& add_tokens,
|
||||
const std::unordered_set<dht::token>& rm_tokens);
|
||||
|
||||
#if o
|
||||
#if 0
|
||||
/**
|
||||
* Return a map of stored tokens to IP addresses
|
||||
*
|
||||
@@ -456,14 +470,9 @@ load_dc_rack_info();
|
||||
throw new ConfigurationException("Saved cluster name " + savedClusterName + " != configured name " + DatabaseDescriptor.getClusterName());
|
||||
}
|
||||
|
||||
public static Collection<Token> getSavedTokens()
|
||||
{
|
||||
String req = "SELECT tokens FROM system.%s WHERE key='%s'";
|
||||
UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
|
||||
return result.isEmpty() || !result.one().has("tokens")
|
||||
? Collections.<Token>emptyList()
|
||||
: deserializeTokens(result.one().getSet("tokens", UTF8Type.instance));
|
||||
}
|
||||
#endif
|
||||
std::unordered_set<dht::token> get_saved_tokens();
|
||||
#if 0
|
||||
|
||||
public static int incrementAndGetGeneration()
|
||||
{
|
||||
@@ -501,35 +510,14 @@ load_dc_rack_info();
|
||||
|
||||
return generation;
|
||||
}
|
||||
#endif
|
||||
|
||||
public static BootstrapState getBootstrapState()
|
||||
{
|
||||
String req = "SELECT bootstrapped FROM system.%s WHERE key='%s'";
|
||||
UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
|
||||
|
||||
if (result.isEmpty() || !result.one().has("bootstrapped"))
|
||||
return BootstrapState.NEEDS_BOOTSTRAP;
|
||||
|
||||
return BootstrapState.valueOf(result.one().getString("bootstrapped"));
|
||||
}
|
||||
|
||||
public static boolean bootstrapComplete()
|
||||
{
|
||||
return getBootstrapState() == BootstrapState.COMPLETED;
|
||||
}
|
||||
|
||||
public static boolean bootstrapInProgress()
|
||||
{
|
||||
return getBootstrapState() == BootstrapState.IN_PROGRESS;
|
||||
}
|
||||
|
||||
public static void setBootstrapState(BootstrapState state)
|
||||
{
|
||||
String req = "INSERT INTO system.%s (key, bootstrapped) VALUES ('%s', ?)";
|
||||
executeInternal(String.format(req, LOCAL, LOCAL), state.name());
|
||||
forceBlockingFlush(LOCAL);
|
||||
}
|
||||
bool bootstrap_complete();
|
||||
bool bootstrap_in_progress();
|
||||
bootstrap_state get_bootstrap_state();
|
||||
future<> set_bootstrap_state(bootstrap_state state);
|
||||
|
||||
#if 0
|
||||
public static boolean isIndexBuilt(String keyspaceName, String indexName)
|
||||
{
|
||||
ColumnFamilyStore cfs = Keyspace.open(NAME).getColumnFamilyStore(BUILT_INDEXES);
|
||||
|
||||
@@ -58,6 +58,11 @@ std::experimental::optional<inet_address> get_replace_address() {
|
||||
return {};
|
||||
}
|
||||
|
||||
std::unordered_set<sstring> get_initial_tokens() {
|
||||
// FIXME: DatabaseDescriptor.getInitialTokens();
|
||||
return std::unordered_set<sstring>();
|
||||
}
|
||||
|
||||
bool get_property_join_ring() {
|
||||
// FIXME: Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true")))
|
||||
return true;
|
||||
@@ -155,7 +160,6 @@ future<> storage_service::prepare_to_join() {
|
||||
future<> storage_service::join_token_ring(int delay) {
|
||||
return seastar::async([this, delay] {
|
||||
_joined = true;
|
||||
#if 0
|
||||
// We bootstrap if we haven't successfully bootstrapped before, as long as we are not a seed.
|
||||
// If we are a seed, or if the user manually sets auto_bootstrap to false,
|
||||
// we'll skip streaming data from other nodes and jump directly into the ring.
|
||||
@@ -165,7 +169,8 @@ future<> storage_service::join_token_ring(int delay) {
|
||||
//
|
||||
// We attempted to replace this with a schema-presence check, but you need a meaningful sleep
|
||||
// to get schema info from gossip which defeats the purpose. See CASSANDRA-4427 for the gory details.
|
||||
Set<InetAddress> current = new HashSet<>();
|
||||
std::unordered_set<inet_address> current;
|
||||
#if 0
|
||||
logger.debug("Bootstrap variables: {} {} {} {}",
|
||||
DatabaseDescriptor.isAutoBootstrap(),
|
||||
SystemKeyspace.bootstrapInProgress(),
|
||||
@@ -235,12 +240,12 @@ future<> storage_service::join_token_ring(int delay) {
|
||||
for (auto token : _bootstrap_tokens) {
|
||||
auto existing = _token_metadata.get_endpoint(token);
|
||||
if (existing) {
|
||||
#if 0
|
||||
long nanoDelay = delay * 1000000L;
|
||||
if (Gossiper.instance.getEndpointStateForEndpoint(existing).getUpdateTimestamp() > (System.nanoTime() - nanoDelay))
|
||||
throw new UnsupportedOperationException("Cannot replace a live node... ");
|
||||
current.add(existing);
|
||||
#endif
|
||||
auto& gossiper = gms::get_local_gossiper();
|
||||
auto eps = gossiper.get_endpoint_state_for_endpoint(*existing);
|
||||
if (eps && eps->get_update_timestamp() > gms::gossiper::clk::now() - std::chrono::milliseconds(delay)) {
|
||||
throw std::runtime_error("Cannot replace a live node...");
|
||||
}
|
||||
current.insert(*existing);
|
||||
} else {
|
||||
throw std::runtime_error(sprint("Cannot replace token %s which does not exist!", token));
|
||||
}
|
||||
@@ -257,87 +262,71 @@ future<> storage_service::join_token_ring(int delay) {
|
||||
// assert(!_is_bootstrap_mode); // bootstrap will block until finished
|
||||
} else {
|
||||
size_t num_tokens = _db.local().get_config().num_tokens();
|
||||
_bootstrap_tokens = boot_strapper::get_random_tokens(_token_metadata, num_tokens);
|
||||
logger.info("Generated random tokens. tokens are {}", _bootstrap_tokens);
|
||||
#if 0
|
||||
_bootstrap_tokens = SystemKeyspace.getSavedTokens();
|
||||
if (_bootstrap_tokens.isEmpty())
|
||||
{
|
||||
Collection<String> initialTokens = DatabaseDescriptor.getInitialTokens();
|
||||
if (initialTokens.size() < 1)
|
||||
{
|
||||
_bootstrap_tokens = BootStrapper.getRandomTokens(_token_metadata, DatabaseDescriptor.getNumTokens());
|
||||
if (DatabaseDescriptor.getNumTokens() == 1)
|
||||
_bootstrap_tokens = db::system_keyspace::get_saved_tokens();
|
||||
if (_bootstrap_tokens.empty()) {
|
||||
auto initial_tokens = get_initial_tokens();
|
||||
if (initial_tokens.size() < 1) {
|
||||
_bootstrap_tokens = boot_strapper::get_random_tokens(_token_metadata, num_tokens);
|
||||
if (num_tokens == 1) {
|
||||
logger.warn("Generated random token {}. Random tokens will result in an unbalanced ring; see http://wiki.apache.org/cassandra/Operations", _bootstrap_tokens);
|
||||
else
|
||||
} else {
|
||||
logger.info("Generated random tokens. tokens are {}", _bootstrap_tokens);
|
||||
}
|
||||
else
|
||||
{
|
||||
_bootstrap_tokens = new ArrayList<Token>(initialTokens.size());
|
||||
for (String token : initialTokens)
|
||||
_bootstrap_tokens.add(getPartitioner().getTokenFactory().fromString(token));
|
||||
}
|
||||
} else {
|
||||
for (auto token : initial_tokens) {
|
||||
// FIXME: token from string
|
||||
// _bootstrap_tokens.insert(getPartitioner().getTokenFactory().fromString(token));
|
||||
}
|
||||
logger.info("Saved tokens not found. Using configuration value: {}", _bootstrap_tokens);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (_bootstrap_tokens.size() != DatabaseDescriptor.getNumTokens())
|
||||
throw new ConfigurationException("Cannot change the number of tokens from " + _bootstrap_tokens.size() + " to " + DatabaseDescriptor.getNumTokens());
|
||||
else
|
||||
} else {
|
||||
if (_bootstrap_tokens.size() != num_tokens) {
|
||||
throw std::runtime_error(sprint("Cannot change the number of tokens from %ld to %ld", _bootstrap_tokens.size(), num_tokens));
|
||||
} else {
|
||||
logger.info("Using saved tokens {}", _bootstrap_tokens);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
set_tokens(_bootstrap_tokens).get();
|
||||
#if 0
|
||||
// if we don't have system_traces keyspace at this point, then create it manually
|
||||
if (Schema.instance.getKSMetaData(TraceKeyspace.NAME) == null)
|
||||
MigrationManager.announceNewKeyspace(TraceKeyspace.definition(), 0, false);
|
||||
#endif
|
||||
|
||||
if (!_is_survey_mode)
|
||||
{
|
||||
if (!_is_survey_mode) {
|
||||
// start participating in the ring.
|
||||
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
|
||||
set_tokens(_bootstrap_tokens);
|
||||
//SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
|
||||
set_tokens(_bootstrap_tokens).get();
|
||||
// remove the existing info about the replaced node.
|
||||
if (!current.isEmpty())
|
||||
for (InetAddress existing : current)
|
||||
Gossiper.instance.replacedEndpoint(existing);
|
||||
assert _token_metadata.sortedTokens().size() > 0;
|
||||
|
||||
Auth.setup();
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!current.empty()) {
|
||||
auto& gossiper = gms::get_local_gossiper();
|
||||
for (auto existing : current) {
|
||||
gossiper.replaced_endpoint(existing);
|
||||
}
|
||||
}
|
||||
assert(_token_metadata.sorted_tokens().size() > 0);
|
||||
//Auth.setup();
|
||||
} else {
|
||||
logger.info("Startup complete, but write survey mode is active, not becoming an active ring member. Use JMX (StorageService->joinRing()) to finalize ring joining.");
|
||||
}
|
||||
#endif
|
||||
});
|
||||
}
|
||||
|
||||
void storage_service::join_ring() {
|
||||
#if 0
|
||||
if (!joined) {
|
||||
future<> storage_service::join_ring() {
|
||||
if (!_joined) {
|
||||
logger.info("Joining ring by operator request");
|
||||
try
|
||||
{
|
||||
joinTokenRing(0);
|
||||
}
|
||||
catch (ConfigurationException e)
|
||||
{
|
||||
throw new IOException(e.getMessage());
|
||||
}
|
||||
return join_token_ring(0);
|
||||
} else if (_is_survey_mode) {
|
||||
set_tokens(SystemKeyspace.getSavedTokens());
|
||||
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
|
||||
_is_survey_mode = false;
|
||||
logger.info("Leaving write survey mode and joining ring at operator request");
|
||||
assert _token_metadata.sortedTokens().size() > 0;
|
||||
|
||||
Auth.setup();
|
||||
return set_tokens(db::system_keyspace::get_saved_tokens()).then([this] {
|
||||
//SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
|
||||
_is_survey_mode = false;
|
||||
logger.info("Leaving write survey mode and joining ring at operator request");
|
||||
assert(_token_metadata.sorted_tokens().size() > 0);
|
||||
//Auth.setup();
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
#endif
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> storage_service::bootstrap(std::unordered_set<token> tokens) {
|
||||
@@ -489,10 +478,10 @@ void storage_service::handle_state_normal(inet_address endpoint) {
|
||||
_token_metadata.update_normal_tokens(tokens_to_update_in_metadata, endpoint);
|
||||
for (auto ep : endpoints_to_remove) {
|
||||
remove_endpoint(ep);
|
||||
#if 0
|
||||
if (DatabaseDescriptor.isReplacing() && DatabaseDescriptor.getReplaceAddress().equals(ep))
|
||||
Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260
|
||||
#endif
|
||||
auto replace_addr = get_replace_address();
|
||||
if (is_replacing() && replace_addr && *replace_addr == ep) {
|
||||
gossiper.replacement_quarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260
|
||||
}
|
||||
}
|
||||
logger.debug("ep={} tokens_to_update_in_system_keyspace = {}", endpoint, tokens_to_update_in_system_keyspace);
|
||||
if (!tokens_to_update_in_system_keyspace.empty()) {
|
||||
@@ -506,7 +495,7 @@ void storage_service::handle_state_normal(inet_address endpoint) {
|
||||
}).get();
|
||||
}
|
||||
if (!local_tokens_to_remove.empty()) {
|
||||
// SystemKeyspace.updateLocalTokens(Collections.<Token>emptyList(), local_tokens_to_remove);
|
||||
db::system_keyspace::update_local_tokens(std::unordered_set<dht::token>(), local_tokens_to_remove).discard_result().get();
|
||||
}
|
||||
|
||||
if (is_moving) {
|
||||
@@ -710,8 +699,8 @@ void storage_service::on_change(inet_address endpoint, application_state state,
|
||||
|
||||
void storage_service::on_remove(gms::inet_address endpoint) {
|
||||
logger.debug("on_remove endpoint={}", endpoint);
|
||||
_token_metadata.remove_endpoint(endpoint);
|
||||
#if 0
|
||||
_token_metadata.removeEndpoint(endpoint);
|
||||
PendingRangeCalculatorService.instance.update();
|
||||
#endif
|
||||
}
|
||||
|
||||
@@ -371,7 +371,7 @@ private:
|
||||
future<> prepare_to_join();
|
||||
future<> join_token_ring(int delay);
|
||||
public:
|
||||
void join_ring();
|
||||
future<> join_ring();
|
||||
bool is_joined() {
|
||||
return _joined;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user