mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-22 17:40:34 +00:00
We won't create our own versions of database and query_proxy, so we need some setup to be done. The current code will capture those variables and setup the structure used to conduct the queries. Later on, it will also carry on the database's basic setup. Signed-off-by: Glauber Costa <glommer@cloudius-systems.com>
1128 lines
44 KiB
C++
1128 lines
44 KiB
C++
/*
|
|
* Licensed to the Apache Software Foundation (ASF) under one
|
|
* or more contributor license agreements. See the NOTICE file
|
|
* distributed with this work for additional information
|
|
* regarding copyright ownership. The ASF licenses this file
|
|
* to you under the Apache License, Version 2.0 (the
|
|
* "License"); you may not use this file except in compliance
|
|
* with the License. You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
/*
|
|
* Modified by Cloudius Systems
|
|
* Copyright 2015 Cloudius Systems
|
|
*/
|
|
|
|
#include <boost/range/algorithm_ext/push_back.hpp>
|
|
#include <boost/range/adaptor/transformed.hpp>
|
|
|
|
#include "system_keyspace.hh"
|
|
#include "types.hh"
|
|
#include "service/storage_service.hh"
|
|
#include "service/storage_proxy.hh"
|
|
#include "service/client_state.hh"
|
|
#include "service/query_state.hh"
|
|
#include "cql3/query_options.hh"
|
|
#include "cql3/query_processor.hh"
|
|
|
|
namespace db {
|
|
namespace system_keyspace {
|
|
|
|
// Currently, the type variables (uuid_type, etc.) are thread-local reference-
|
|
// counted shared pointers. This forces us to also make the built in schemas
|
|
// below thread-local as well.
|
|
// We return schema_ptr, not schema&, because that's the "tradition" in our
|
|
// other code.
|
|
// We hide the thread_local variable inside a function, because if we later
|
|
// we remove the thread_local, we'll start having initialization order
|
|
// problems (we need the type variables to be constructed first), and using
|
|
// functions will solve this problem. So we use functions right now.
|
|
|
|
schema_ptr hints() {
|
|
static thread_local auto hints = make_lw_shared(schema(generate_legacy_id(NAME, HINTS), NAME, HINTS,
|
|
// partition key
|
|
{{"target_id", uuid_type}},
|
|
// clustering key
|
|
{{"hint_id", timeuuid_type}, {"message_version", int32_type}},
|
|
// regular columns
|
|
{{"mutation", bytes_type}},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"hints awaiting delivery"
|
|
// FIXME: the original Java code also had:
|
|
// in CQL statement creating the table:
|
|
// "WITH COMPACT STORAGE"
|
|
// operations on resulting CFMetaData:
|
|
// .compactionStrategyOptions(Collections.singletonMap("enabled", "false"))
|
|
// .gcGraceSeconds(0);
|
|
));
|
|
return hints;
|
|
}
|
|
|
|
schema_ptr batchlog() {
|
|
static thread_local auto batchlog = make_lw_shared(schema(generate_legacy_id(NAME, BATCHLOG), NAME, BATCHLOG,
|
|
// partition key
|
|
{{"id", uuid_type}},
|
|
// clustering key
|
|
{},
|
|
// regular columns
|
|
{{"data", bytes_type}, {"version", int32_type}, {"written_at", timestamp_type}},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"batches awaiting replay"
|
|
// FIXME: the original Java code also had:
|
|
// operations on resulting CFMetaData:
|
|
// .compactionStrategyOptions(Collections.singletonMap("min_threshold", "2"))
|
|
// .gcGraceSeconds(0);
|
|
));
|
|
return batchlog;
|
|
}
|
|
|
|
/*static*/ schema_ptr paxos() {
|
|
static thread_local auto paxos = make_lw_shared(schema(generate_legacy_id(NAME, PAXOS), NAME, PAXOS,
|
|
// partition key
|
|
{{"row_key", bytes_type}},
|
|
// clustering key
|
|
{{"cf_id", uuid_type}},
|
|
// regular columns
|
|
{{"in_progress_ballot", timeuuid_type}, {"most_recent_commit", bytes_type}, {"most_recent_commit_at", timeuuid_type}, {"proposal", bytes_type}, {"proposal_ballot", timeuuid_type}},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"in-progress paxos proposals"
|
|
// FIXME: the original Java code also had:
|
|
// operations on resulting CFMetaData:
|
|
// .compactionStrategyClass(LeveledCompactionStrategy.class);
|
|
));
|
|
return paxos;
|
|
}
|
|
|
|
schema_ptr built_indexes() {
|
|
static thread_local auto built_indexes = make_lw_shared(schema(generate_legacy_id(NAME, BUILT_INDEXES), NAME, BUILT_INDEXES,
|
|
// partition key
|
|
{{"table_name", utf8_type}},
|
|
// clustering key
|
|
{{"index_name", utf8_type}},
|
|
// regular columns
|
|
{},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"built column indexes"
|
|
// FIXME: the original Java code also had:
|
|
// in CQL statement creating the table:
|
|
// "WITH COMPACT STORAGE"
|
|
));
|
|
return built_indexes;
|
|
}
|
|
|
|
/*static*/ schema_ptr local() {
|
|
static thread_local auto local = make_lw_shared(schema(generate_legacy_id(NAME, LOCAL), NAME, LOCAL,
|
|
// partition key
|
|
{{"key", utf8_type}},
|
|
// clustering key
|
|
{},
|
|
// regular columns
|
|
{
|
|
{"bootstrapped", utf8_type},
|
|
{"cluster_name", utf8_type},
|
|
{"cql_version", utf8_type},
|
|
{"data_center", utf8_type},
|
|
{"gossip_generation", int32_type},
|
|
{"host_id", uuid_type},
|
|
{"native_protocol_version", utf8_type},
|
|
{"partitioner", utf8_type},
|
|
{"rack", utf8_type},
|
|
{"release_version", utf8_type},
|
|
{"schema_version", uuid_type},
|
|
{"thrift_version", utf8_type},
|
|
{"tokens", set_type_impl::get_instance(utf8_type, false)},
|
|
{"truncated_at", map_type_impl::get_instance(uuid_type, bytes_type, false)},
|
|
},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"information about the local node"
|
|
));
|
|
return local;
|
|
}
|
|
|
|
/*static*/ schema_ptr peers() {
|
|
static thread_local auto peers = make_lw_shared(schema(generate_legacy_id(NAME, PEERS), NAME, PEERS,
|
|
// partition key
|
|
{{"peer", inet_addr_type}},
|
|
// clustering key
|
|
{},
|
|
// regular columns
|
|
{
|
|
{"data_center", utf8_type},
|
|
{"host_id", utf8_type},
|
|
{"preferred_ip", inet_addr_type},
|
|
{"rack", utf8_type},
|
|
{"release_version", utf8_type},
|
|
{"rpc_address", inet_addr_type},
|
|
{"schema_version", utf8_type},
|
|
{"tokens", set_type_impl::get_instance(utf8_type, false)},
|
|
},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"information about known peers in the cluster"
|
|
));
|
|
return peers;
|
|
}
|
|
|
|
/*static*/ schema_ptr peer_events() {
|
|
static thread_local auto peer_events = make_lw_shared(schema(generate_legacy_id(NAME, PEER_EVENTS), NAME, PEER_EVENTS,
|
|
// partition key
|
|
{{"peer", inet_addr_type}},
|
|
// clustering key
|
|
{},
|
|
// regular columns
|
|
{
|
|
{"hints_dropped", map_type_impl::get_instance(uuid_type, int32_type, false)},
|
|
},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"events related to peers"
|
|
));
|
|
return peer_events;
|
|
}
|
|
|
|
/*static*/ schema_ptr range_xfers() {
|
|
static thread_local auto range_xfers = make_lw_shared(schema(generate_legacy_id(NAME, RANGE_XFERS), NAME, RANGE_XFERS,
|
|
// partition key
|
|
{{"token_bytes", bytes_type}},
|
|
// clustering key
|
|
{},
|
|
// regular columns
|
|
{{"requested_at", timestamp_type}},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"ranges requested for transfer"
|
|
));
|
|
return range_xfers;
|
|
}
|
|
|
|
/*static*/ schema_ptr compactions_in_progress() {
|
|
static thread_local auto compactions_in_progress = make_lw_shared(schema(generate_legacy_id(NAME, COMPACTIONS_IN_PROGRESS), NAME, COMPACTIONS_IN_PROGRESS,
|
|
// partition key
|
|
{{"id", uuid_type}},
|
|
// clustering key
|
|
{},
|
|
// regular columns
|
|
{
|
|
{"columnfamily_name", utf8_type},
|
|
{"inputs", set_type_impl::get_instance(int32_type, false)},
|
|
{"keyspace_name", utf8_type},
|
|
},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"unfinished compactions"
|
|
));
|
|
return compactions_in_progress;
|
|
}
|
|
|
|
/*static*/ schema_ptr compaction_history() {
|
|
static thread_local auto compaction_history = make_lw_shared(schema(generate_legacy_id(NAME, COMPACTION_HISTORY), NAME, COMPACTION_HISTORY,
|
|
// partition key
|
|
{{"id", uuid_type}},
|
|
// clustering key
|
|
{},
|
|
// regular columns
|
|
{
|
|
{"bytes_in", long_type},
|
|
{"bytes_out", long_type},
|
|
{"columnfamily_name", utf8_type},
|
|
{"compacted_at", timestamp_type},
|
|
{"keyspace_name", utf8_type},
|
|
{"rows_merged", map_type_impl::get_instance(int32_type, long_type, false)},
|
|
},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"week-long compaction history"
|
|
// FIXME: the original Java code also had:
|
|
//.defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(7));
|
|
));
|
|
return compaction_history;
|
|
}
|
|
|
|
/*static*/ schema_ptr sstable_activity() {
|
|
static thread_local auto sstable_activity = make_lw_shared(schema(generate_legacy_id(NAME, SSTABLE_ACTIVITY), NAME, SSTABLE_ACTIVITY,
|
|
// partition key
|
|
{
|
|
{"keyspace_name", utf8_type},
|
|
{"columnfamily_name", utf8_type},
|
|
{"generation", int32_type},
|
|
},
|
|
// clustering key
|
|
{},
|
|
// regular columns
|
|
{
|
|
{"rate_120m", double_type},
|
|
{"rate_15m", double_type},
|
|
},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"historic sstable read rates"
|
|
));
|
|
return sstable_activity;
|
|
}
|
|
|
|
struct query_context {
|
|
distributed<database>& _db;
|
|
distributed<cql3::query_processor>& _qp;
|
|
query_context(distributed<database>& db, distributed<cql3::query_processor>& qp) : _db(db), _qp(qp) {}
|
|
|
|
template <typename... Args>
|
|
future<::shared_ptr<cql3::untyped_result_set>> execute_cql(sstring text, sstring cf, Args&&... args) {
|
|
// FIXME: Would be better not to use sprint here.
|
|
sstring req = sprint(text, cf);
|
|
return this->_qp.local().execute_internal(req, { boost::any(std::forward<Args>(args))... });
|
|
}
|
|
database& db() {
|
|
return _db.local();
|
|
}
|
|
};
|
|
|
|
// This does not have to be thread local, because all cores will share the same context.
|
|
static std::unique_ptr<query_context> qctx = {};
|
|
|
|
future<> setup(distributed<database>& db, distributed<cql3::query_processor>& qp) {
|
|
auto new_ctx = std::make_unique<query_context>(db, qp);
|
|
qctx.swap(new_ctx);
|
|
assert(!new_ctx);
|
|
|
|
// Other functions will be here later, that will return futures.
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
// Sometimes we are not concerned about system tables at all - for instance, when we are testing. In those cases, just pretend
|
|
// we executed the query, and return an empty result
|
|
template <typename... Args>
|
|
static future<::shared_ptr<cql3::untyped_result_set>> execute_cql(sstring text, Args&&... args) {
|
|
if (qctx) {
|
|
return qctx->execute_cql(text, std::forward<Args>(args)...);
|
|
}
|
|
return make_ready_future<shared_ptr<cql3::untyped_result_set>>(::make_shared<cql3::untyped_result_set>(cql3::untyped_result_set::make_empty()));
|
|
}
|
|
|
|
#if 0
|
|
|
|
public static KSMetaData definition()
|
|
{
|
|
Iterable<CFMetaData> tables =
|
|
Iterables.concat(LegacySchemaTables.All,
|
|
Arrays.asList(BuiltIndexes,
|
|
Hints,
|
|
Batchlog,
|
|
Paxos,
|
|
Local,
|
|
Peers,
|
|
PeerEvents,
|
|
RangeXfers,
|
|
CompactionsInProgress,
|
|
CompactionHistory,
|
|
SSTableActivity));
|
|
return new KSMetaData(NAME, LocalStrategy.class, Collections.<String, String>emptyMap(), true, tables);
|
|
}
|
|
|
|
private static volatile Map<UUID, Pair<ReplayPosition, Long>> truncationRecords;
|
|
|
|
public enum BootstrapState
|
|
{
|
|
NEEDS_BOOTSTRAP,
|
|
COMPLETED,
|
|
IN_PROGRESS
|
|
}
|
|
|
|
private static DecoratedKey decorate(ByteBuffer key)
|
|
{
|
|
return StorageService.getPartitioner().decorateKey(key);
|
|
}
|
|
|
|
public static void finishStartup()
|
|
{
|
|
setupVersion();
|
|
LegacySchemaTables.saveSystemKeyspaceSchema();
|
|
}
|
|
|
|
private static void setupVersion()
|
|
{
|
|
String req = "INSERT INTO system.%s (key, release_version, cql_version, thrift_version, native_protocol_version, data_center, rack, partitioner) VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
|
|
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
|
|
executeOnceInternal(String.format(req, LOCAL),
|
|
LOCAL,
|
|
FBUtilities.getReleaseVersionString(),
|
|
QueryProcessor.CQL_VERSION.toString(),
|
|
cassandraConstants.VERSION,
|
|
String.valueOf(Server.CURRENT_VERSION),
|
|
snitch.getDatacenter(FBUtilities.getBroadcastAddress()),
|
|
snitch.getRack(FBUtilities.getBroadcastAddress()),
|
|
DatabaseDescriptor.getPartitioner().getClass().getName());
|
|
}
|
|
|
|
/**
|
|
* Write compaction log, except columfamilies under system keyspace.
|
|
*
|
|
* @param cfs cfs to compact
|
|
* @param toCompact sstables to compact
|
|
* @return compaction task id or null if cfs is under system keyspace
|
|
*/
|
|
public static UUID startCompaction(ColumnFamilyStore cfs, Iterable<SSTableReader> toCompact)
|
|
{
|
|
if (NAME.equals(cfs.keyspace.getName()))
|
|
return null;
|
|
|
|
UUID compactionId = UUIDGen.getTimeUUID();
|
|
Iterable<Integer> generations = Iterables.transform(toCompact, new Function<SSTableReader, Integer>()
|
|
{
|
|
public Integer apply(SSTableReader sstable)
|
|
{
|
|
return sstable.descriptor.generation;
|
|
}
|
|
});
|
|
String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (?, ?, ?, ?)";
|
|
executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS), compactionId, cfs.keyspace.getName(), cfs.name, Sets.newHashSet(generations));
|
|
forceBlockingFlush(COMPACTIONS_IN_PROGRESS);
|
|
return compactionId;
|
|
}
|
|
|
|
/**
|
|
* Deletes the entry for this compaction from the set of compactions in progress. The compaction does not need
|
|
* to complete successfully for this to be called.
|
|
* @param taskId what was returned from {@code startCompaction}
|
|
*/
|
|
public static void finishCompaction(UUID taskId)
|
|
{
|
|
assert taskId != null;
|
|
|
|
executeInternal(String.format("DELETE FROM system.%s WHERE id = ?", COMPACTIONS_IN_PROGRESS), taskId);
|
|
forceBlockingFlush(COMPACTIONS_IN_PROGRESS);
|
|
}
|
|
|
|
/**
|
|
* Returns a Map whose keys are KS.CF pairs and whose values are maps from sstable generation numbers to the
|
|
* task ID of the compaction they were participating in.
|
|
*/
|
|
public static Map<Pair<String, String>, Map<Integer, UUID>> getUnfinishedCompactions()
|
|
{
|
|
String req = "SELECT * FROM system.%s";
|
|
UntypedResultSet resultSet = executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS));
|
|
|
|
Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = new HashMap<>();
|
|
for (UntypedResultSet.Row row : resultSet)
|
|
{
|
|
String keyspace = row.getString("keyspace_name");
|
|
String columnfamily = row.getString("columnfamily_name");
|
|
Set<Integer> inputs = row.getSet("inputs", Int32Type.instance);
|
|
UUID taskID = row.getUUID("id");
|
|
|
|
Pair<String, String> kscf = Pair.create(keyspace, columnfamily);
|
|
Map<Integer, UUID> generationToTaskID = unfinishedCompactions.get(kscf);
|
|
if (generationToTaskID == null)
|
|
generationToTaskID = new HashMap<>(inputs.size());
|
|
|
|
for (Integer generation : inputs)
|
|
generationToTaskID.put(generation, taskID);
|
|
|
|
unfinishedCompactions.put(kscf, generationToTaskID);
|
|
}
|
|
return unfinishedCompactions;
|
|
}
|
|
|
|
public static void discardCompactionsInProgress()
|
|
{
|
|
ColumnFamilyStore compactionLog = Keyspace.open(NAME).getColumnFamilyStore(COMPACTIONS_IN_PROGRESS);
|
|
compactionLog.truncateBlocking();
|
|
}
|
|
|
|
public static void updateCompactionHistory(String ksname,
|
|
String cfname,
|
|
long compactedAt,
|
|
long bytesIn,
|
|
long bytesOut,
|
|
Map<Integer, Long> rowsMerged)
|
|
{
|
|
// don't write anything when the history table itself is compacted, since that would in turn cause new compactions
|
|
if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY))
|
|
return;
|
|
String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) VALUES (?, ?, ?, ?, ?, ?, ?)";
|
|
executeInternal(String.format(req, COMPACTION_HISTORY), UUIDGen.getTimeUUID(), ksname, cfname, ByteBufferUtil.bytes(compactedAt), bytesIn, bytesOut, rowsMerged);
|
|
}
|
|
|
|
public static TabularData getCompactionHistory() throws OpenDataException
|
|
{
|
|
UntypedResultSet queryResultSet = executeInternal(String.format("SELECT * from system.%s", COMPACTION_HISTORY));
|
|
return CompactionHistoryTabularData.from(queryResultSet);
|
|
}
|
|
|
|
public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
|
|
{
|
|
String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'";
|
|
executeInternal(String.format(req, LOCAL, LOCAL), truncationAsMapEntry(cfs, truncatedAt, position));
|
|
truncationRecords = null;
|
|
forceBlockingFlush(LOCAL);
|
|
}
|
|
|
|
/**
|
|
* This method is used to remove information about truncation time for specified column family
|
|
*/
|
|
public static synchronized void removeTruncationRecord(UUID cfId)
|
|
{
|
|
String req = "DELETE truncated_at[?] from system.%s WHERE key = '%s'";
|
|
executeInternal(String.format(req, LOCAL, LOCAL), cfId);
|
|
truncationRecords = null;
|
|
forceBlockingFlush(LOCAL);
|
|
}
|
|
|
|
private static Map<UUID, ByteBuffer> truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
|
|
{
|
|
DataOutputBuffer out = new DataOutputBuffer();
|
|
try
|
|
{
|
|
ReplayPosition.serializer.serialize(position, out);
|
|
out.writeLong(truncatedAt);
|
|
}
|
|
catch (IOException e)
|
|
{
|
|
throw new RuntimeException(e);
|
|
}
|
|
return Collections.singletonMap(cfs.metadata.cfId, ByteBuffer.wrap(out.getData(), 0, out.getLength()));
|
|
}
|
|
|
|
public static ReplayPosition getTruncatedPosition(UUID cfId)
|
|
{
|
|
Pair<ReplayPosition, Long> record = getTruncationRecord(cfId);
|
|
return record == null ? null : record.left;
|
|
}
|
|
|
|
public static long getTruncatedAt(UUID cfId)
|
|
{
|
|
Pair<ReplayPosition, Long> record = getTruncationRecord(cfId);
|
|
return record == null ? Long.MIN_VALUE : record.right;
|
|
}
|
|
|
|
private static synchronized Pair<ReplayPosition, Long> getTruncationRecord(UUID cfId)
|
|
{
|
|
if (truncationRecords == null)
|
|
truncationRecords = readTruncationRecords();
|
|
return truncationRecords.get(cfId);
|
|
}
|
|
|
|
private static Map<UUID, Pair<ReplayPosition, Long>> readTruncationRecords()
|
|
{
|
|
UntypedResultSet rows = executeInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL, LOCAL));
|
|
|
|
Map<UUID, Pair<ReplayPosition, Long>> records = new HashMap<>();
|
|
|
|
if (!rows.isEmpty() && rows.one().has("truncated_at"))
|
|
{
|
|
Map<UUID, ByteBuffer> map = rows.one().getMap("truncated_at", UUIDType.instance, BytesType.instance);
|
|
for (Map.Entry<UUID, ByteBuffer> entry : map.entrySet())
|
|
records.put(entry.getKey(), truncationRecordFromBlob(entry.getValue()));
|
|
}
|
|
|
|
return records;
|
|
}
|
|
|
|
private static Pair<ReplayPosition, Long> truncationRecordFromBlob(ByteBuffer bytes)
|
|
{
|
|
try
|
|
{
|
|
DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(bytes));
|
|
return Pair.create(ReplayPosition.serializer.deserialize(in), in.available() > 0 ? in.readLong() : Long.MIN_VALUE);
|
|
}
|
|
catch (IOException e)
|
|
{
|
|
throw new RuntimeException(e);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Record tokens being used by another node
|
|
*/
|
|
public static synchronized void updateTokens(InetAddress ep, Collection<Token> tokens)
|
|
{
|
|
if (ep.equals(FBUtilities.getBroadcastAddress()))
|
|
{
|
|
removeEndpoint(ep);
|
|
return;
|
|
}
|
|
|
|
String req = "INSERT INTO system.%s (peer, tokens) VALUES (?, ?)";
|
|
executeInternal(String.format(req, PEERS), ep, tokensAsSet(tokens));
|
|
}
|
|
|
|
public static synchronized void updatePreferredIP(InetAddress ep, InetAddress preferred_ip)
|
|
{
|
|
String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES (?, ?)";
|
|
executeInternal(String.format(req, PEERS), ep, preferred_ip);
|
|
forceBlockingFlush(PEERS);
|
|
}
|
|
|
|
public static synchronized void updatePeerInfo(InetAddress ep, String columnName, Object value)
|
|
{
|
|
if (ep.equals(FBUtilities.getBroadcastAddress()))
|
|
return;
|
|
|
|
String req = "INSERT INTO system.%s (peer, %s) VALUES (?, ?)";
|
|
executeInternal(String.format(req, PEERS, columnName), ep, value);
|
|
}
|
|
|
|
public static synchronized void updateHintsDropped(InetAddress ep, UUID timePeriod, int value)
|
|
{
|
|
// with 30 day TTL
|
|
String req = "UPDATE system.%s USING TTL 2592000 SET hints_dropped[ ? ] = ? WHERE peer = ?";
|
|
executeInternal(String.format(req, PEER_EVENTS), timePeriod, value, ep);
|
|
}
|
|
|
|
public static synchronized void updateSchemaVersion(UUID version)
|
|
{
|
|
String req = "INSERT INTO system.%s (key, schema_version) VALUES ('%s', ?)";
|
|
executeInternal(String.format(req, LOCAL, LOCAL), version);
|
|
}
|
|
|
|
private static Set<String> tokensAsSet(Collection<Token> tokens)
|
|
{
|
|
Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
|
|
Set<String> s = new HashSet<>(tokens.size());
|
|
for (Token tk : tokens)
|
|
s.add(factory.toString(tk));
|
|
return s;
|
|
}
|
|
|
|
private static Collection<Token> deserializeTokens(Collection<String> tokensStrings)
|
|
{
|
|
Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
|
|
List<Token> tokens = new ArrayList<>(tokensStrings.size());
|
|
for (String tk : tokensStrings)
|
|
tokens.add(factory.fromString(tk));
|
|
return tokens;
|
|
}
|
|
|
|
/**
|
|
* Remove stored tokens being used by another node
|
|
*/
|
|
public static synchronized void removeEndpoint(InetAddress ep)
|
|
{
|
|
String req = "DELETE FROM system.%s WHERE peer = ?";
|
|
executeInternal(String.format(req, PEERS), ep);
|
|
}
|
|
|
|
/**
|
|
* This method is used to update the System Keyspace with the new tokens for this node
|
|
*/
|
|
public static synchronized void updateTokens(Collection<Token> tokens)
|
|
{
|
|
assert !tokens.isEmpty() : "removeEndpoint should be used instead";
|
|
String req = "INSERT INTO system.%s (key, tokens) VALUES ('%s', ?)";
|
|
executeInternal(String.format(req, LOCAL, LOCAL), tokensAsSet(tokens));
|
|
forceBlockingFlush(LOCAL);
|
|
}
|
|
|
|
/**
|
|
* Convenience method to update the list of tokens in the local system keyspace.
|
|
*
|
|
* @param addTokens tokens to add
|
|
* @param rmTokens tokens to remove
|
|
* @return the collection of persisted tokens
|
|
*/
|
|
public static synchronized Collection<Token> updateLocalTokens(Collection<Token> addTokens, Collection<Token> rmTokens)
|
|
{
|
|
Collection<Token> tokens = getSavedTokens();
|
|
tokens.removeAll(rmTokens);
|
|
tokens.addAll(addTokens);
|
|
updateTokens(tokens);
|
|
return tokens;
|
|
}
|
|
|
|
public static void forceBlockingFlush(String cfname)
|
|
{
|
|
if (!Boolean.getBoolean("cassandra.unsafesystem"))
|
|
FBUtilities.waitOnFuture(Keyspace.open(NAME).getColumnFamilyStore(cfname).forceFlush());
|
|
}
|
|
|
|
/**
|
|
* Return a map of stored tokens to IP addresses
|
|
*
|
|
*/
|
|
public static SetMultimap<InetAddress, Token> loadTokens()
|
|
{
|
|
SetMultimap<InetAddress, Token> tokenMap = HashMultimap.create();
|
|
for (UntypedResultSet.Row row : executeInternal("SELECT peer, tokens FROM system." + PEERS))
|
|
{
|
|
InetAddress peer = row.getInetAddress("peer");
|
|
if (row.has("tokens"))
|
|
tokenMap.putAll(peer, deserializeTokens(row.getSet("tokens", UTF8Type.instance)));
|
|
}
|
|
|
|
return tokenMap;
|
|
}
|
|
|
|
/**
|
|
* Return a map of store host_ids to IP addresses
|
|
*
|
|
*/
|
|
public static Map<InetAddress, UUID> loadHostIds()
|
|
{
|
|
Map<InetAddress, UUID> hostIdMap = new HashMap<>();
|
|
for (UntypedResultSet.Row row : executeInternal("SELECT peer, host_id FROM system." + PEERS))
|
|
{
|
|
InetAddress peer = row.getInetAddress("peer");
|
|
if (row.has("host_id"))
|
|
{
|
|
hostIdMap.put(peer, row.getUUID("host_id"));
|
|
}
|
|
}
|
|
return hostIdMap;
|
|
}
|
|
|
|
/**
|
|
* Get preferred IP for given endpoint if it is known. Otherwise this returns given endpoint itself.
|
|
*
|
|
* @param ep endpoint address to check
|
|
* @return Preferred IP for given endpoint if present, otherwise returns given ep
|
|
*/
|
|
public static InetAddress getPreferredIP(InetAddress ep)
|
|
{
|
|
String req = "SELECT preferred_ip FROM system.%s WHERE peer=?";
|
|
UntypedResultSet result = executeInternal(String.format(req, PEERS), ep);
|
|
if (!result.isEmpty() && result.one().has("preferred_ip"))
|
|
return result.one().getInetAddress("preferred_ip");
|
|
return ep;
|
|
}
|
|
|
|
/**
|
|
* Return a map of IP addresses containing a map of dc and rack info
|
|
*/
|
|
public static Map<InetAddress, Map<String,String>> loadDcRackInfo()
|
|
{
|
|
Map<InetAddress, Map<String, String>> result = new HashMap<>();
|
|
for (UntypedResultSet.Row row : executeInternal("SELECT peer, data_center, rack from system." + PEERS))
|
|
{
|
|
InetAddress peer = row.getInetAddress("peer");
|
|
if (row.has("data_center") && row.has("rack"))
|
|
{
|
|
Map<String, String> dcRack = new HashMap<>();
|
|
dcRack.put("data_center", row.getString("data_center"));
|
|
dcRack.put("rack", row.getString("rack"));
|
|
result.put(peer, dcRack);
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* One of three things will happen if you try to read the system keyspace:
|
|
* 1. files are present and you can read them: great
|
|
* 2. no files are there: great (new node is assumed)
|
|
* 3. files are present but you can't read them: bad
|
|
* @throws ConfigurationException
|
|
*/
|
|
public static void checkHealth() throws ConfigurationException
|
|
{
|
|
Keyspace keyspace;
|
|
try
|
|
{
|
|
keyspace = Keyspace.open(NAME);
|
|
}
|
|
catch (AssertionError err)
|
|
{
|
|
// this happens when a user switches from OPP to RP.
|
|
ConfigurationException ex = new ConfigurationException("Could not read system keyspace!");
|
|
ex.initCause(err);
|
|
throw ex;
|
|
}
|
|
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(LOCAL);
|
|
|
|
String req = "SELECT cluster_name FROM system.%s WHERE key='%s'";
|
|
UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
|
|
|
|
if (result.isEmpty() || !result.one().has("cluster_name"))
|
|
{
|
|
// this is a brand new node
|
|
if (!cfs.getSSTables().isEmpty())
|
|
throw new ConfigurationException("Found system keyspace files, but they couldn't be loaded!");
|
|
|
|
// no system files. this is a new node.
|
|
req = "INSERT INTO system.%s (key, cluster_name) VALUES ('%s', ?)";
|
|
executeInternal(String.format(req, LOCAL, LOCAL), DatabaseDescriptor.getClusterName());
|
|
return;
|
|
}
|
|
|
|
String savedClusterName = result.one().getString("cluster_name");
|
|
if (!DatabaseDescriptor.getClusterName().equals(savedClusterName))
|
|
throw new ConfigurationException("Saved cluster name " + savedClusterName + " != configured name " + DatabaseDescriptor.getClusterName());
|
|
}
|
|
|
|
public static Collection<Token> getSavedTokens()
|
|
{
|
|
String req = "SELECT tokens FROM system.%s WHERE key='%s'";
|
|
UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
|
|
return result.isEmpty() || !result.one().has("tokens")
|
|
? Collections.<Token>emptyList()
|
|
: deserializeTokens(result.one().getSet("tokens", UTF8Type.instance));
|
|
}
|
|
|
|
public static int incrementAndGetGeneration()
|
|
{
|
|
String req = "SELECT gossip_generation FROM system.%s WHERE key='%s'";
|
|
UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
|
|
|
|
int generation;
|
|
if (result.isEmpty() || !result.one().has("gossip_generation"))
|
|
{
|
|
// seconds-since-epoch isn't a foolproof new generation
|
|
// (where foolproof is "guaranteed to be larger than the last one seen at this ip address"),
|
|
// but it's as close as sanely possible
|
|
generation = (int) (System.currentTimeMillis() / 1000);
|
|
}
|
|
else
|
|
{
|
|
// Other nodes will ignore gossip messages about a node that have a lower generation than previously seen.
|
|
final int storedGeneration = result.one().getInt("gossip_generation") + 1;
|
|
final int now = (int) (System.currentTimeMillis() / 1000);
|
|
if (storedGeneration >= now)
|
|
{
|
|
logger.warn("Using stored Gossip Generation {} as it is greater than current system time {}. See CASSANDRA-3654 if you experience problems",
|
|
storedGeneration, now);
|
|
generation = storedGeneration;
|
|
}
|
|
else
|
|
{
|
|
generation = now;
|
|
}
|
|
}
|
|
|
|
req = "INSERT INTO system.%s (key, gossip_generation) VALUES ('%s', ?)";
|
|
executeInternal(String.format(req, LOCAL, LOCAL), generation);
|
|
forceBlockingFlush(LOCAL);
|
|
|
|
return generation;
|
|
}
|
|
|
|
public static BootstrapState getBootstrapState()
|
|
{
|
|
String req = "SELECT bootstrapped FROM system.%s WHERE key='%s'";
|
|
UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
|
|
|
|
if (result.isEmpty() || !result.one().has("bootstrapped"))
|
|
return BootstrapState.NEEDS_BOOTSTRAP;
|
|
|
|
return BootstrapState.valueOf(result.one().getString("bootstrapped"));
|
|
}
|
|
|
|
public static boolean bootstrapComplete()
|
|
{
|
|
return getBootstrapState() == BootstrapState.COMPLETED;
|
|
}
|
|
|
|
public static boolean bootstrapInProgress()
|
|
{
|
|
return getBootstrapState() == BootstrapState.IN_PROGRESS;
|
|
}
|
|
|
|
public static void setBootstrapState(BootstrapState state)
|
|
{
|
|
String req = "INSERT INTO system.%s (key, bootstrapped) VALUES ('%s', ?)";
|
|
executeInternal(String.format(req, LOCAL, LOCAL), state.name());
|
|
forceBlockingFlush(LOCAL);
|
|
}
|
|
|
|
public static boolean isIndexBuilt(String keyspaceName, String indexName)
|
|
{
|
|
ColumnFamilyStore cfs = Keyspace.open(NAME).getColumnFamilyStore(BUILT_INDEXES);
|
|
QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes(keyspaceName)),
|
|
BUILT_INDEXES,
|
|
FBUtilities.singleton(cfs.getComparator().makeCellName(indexName), cfs.getComparator()),
|
|
System.currentTimeMillis());
|
|
return ColumnFamilyStore.removeDeleted(cfs.getColumnFamily(filter), Integer.MAX_VALUE) != null;
|
|
}
|
|
|
|
public static void setIndexBuilt(String keyspaceName, String indexName)
|
|
{
|
|
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(NAME, BUILT_INDEXES);
|
|
cf.addColumn(new BufferCell(cf.getComparator().makeCellName(indexName), ByteBufferUtil.EMPTY_BYTE_BUFFER, FBUtilities.timestampMicros()));
|
|
new Mutation(NAME, ByteBufferUtil.bytes(keyspaceName), cf).apply();
|
|
}
|
|
|
|
public static void setIndexRemoved(String keyspaceName, String indexName)
|
|
{
|
|
Mutation mutation = new Mutation(NAME, ByteBufferUtil.bytes(keyspaceName));
|
|
mutation.delete(BUILT_INDEXES, BuiltIndexes.comparator.makeCellName(indexName), FBUtilities.timestampMicros());
|
|
mutation.apply();
|
|
}
|
|
|
|
/**
|
|
* Read the host ID from the system keyspace, creating (and storing) one if
|
|
* none exists.
|
|
*/
|
|
public static UUID getLocalHostId()
|
|
{
|
|
String req = "SELECT host_id FROM system.%s WHERE key='%s'";
|
|
UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
|
|
|
|
// Look up the Host UUID (return it if found)
|
|
if (!result.isEmpty() && result.one().has("host_id"))
|
|
return result.one().getUUID("host_id");
|
|
|
|
// ID not found, generate a new one, persist, and then return it.
|
|
UUID hostId = UUID.randomUUID();
|
|
logger.warn("No host ID found, created {} (Note: This should happen exactly once per node).", hostId);
|
|
return setLocalHostId(hostId);
|
|
}
|
|
|
|
/**
|
|
* Sets the local host ID explicitly. Should only be called outside of SystemTable when replacing a node.
|
|
*/
|
|
public static UUID setLocalHostId(UUID hostId)
|
|
{
|
|
String req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', ?)";
|
|
executeInternal(String.format(req, LOCAL, LOCAL), hostId);
|
|
return hostId;
|
|
}
|
|
|
|
public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata)
|
|
{
|
|
String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?";
|
|
UntypedResultSet results = executeInternal(String.format(req, PAXOS), key, metadata.cfId);
|
|
if (results.isEmpty())
|
|
return new PaxosState(key, metadata);
|
|
UntypedResultSet.Row row = results.one();
|
|
Commit promised = row.has("in_progress_ballot")
|
|
? new Commit(key, row.getUUID("in_progress_ballot"), ArrayBackedSortedColumns.factory.create(metadata))
|
|
: Commit.emptyCommit(key, metadata);
|
|
// either we have both a recently accepted ballot and update or we have neither
|
|
Commit accepted = row.has("proposal")
|
|
? new Commit(key, row.getUUID("proposal_ballot"), ColumnFamily.fromBytes(row.getBytes("proposal")))
|
|
: Commit.emptyCommit(key, metadata);
|
|
// either most_recent_commit and most_recent_commit_at will both be set, or neither
|
|
Commit mostRecent = row.has("most_recent_commit")
|
|
? new Commit(key, row.getUUID("most_recent_commit_at"), ColumnFamily.fromBytes(row.getBytes("most_recent_commit")))
|
|
: Commit.emptyCommit(key, metadata);
|
|
return new PaxosState(promised, accepted, mostRecent);
|
|
}
|
|
|
|
public static void savePaxosPromise(Commit promise)
|
|
{
|
|
String req = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET in_progress_ballot = ? WHERE row_key = ? AND cf_id = ?";
|
|
executeInternal(String.format(req, PAXOS),
|
|
UUIDGen.microsTimestamp(promise.ballot),
|
|
paxosTtl(promise.update.metadata),
|
|
promise.ballot,
|
|
promise.key,
|
|
promise.update.id());
|
|
}
|
|
|
|
public static void savePaxosProposal(Commit proposal)
|
|
{
|
|
executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ? WHERE row_key = ? AND cf_id = ?", PAXOS),
|
|
UUIDGen.microsTimestamp(proposal.ballot),
|
|
paxosTtl(proposal.update.metadata),
|
|
proposal.ballot,
|
|
proposal.update.toBytes(),
|
|
proposal.key,
|
|
proposal.update.id());
|
|
}
|
|
|
|
private static int paxosTtl(CFMetaData metadata)
|
|
{
|
|
// keep paxos state around for at least 3h
|
|
return Math.max(3 * 3600, metadata.getGcGraceSeconds());
|
|
}
|
|
|
|
public static void savePaxosCommit(Commit commit)
|
|
{
|
|
// We always erase the last proposal (with the commit timestamp to no erase more recent proposal in case the commit is old)
|
|
// even though that's really just an optimization since SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc.
|
|
String cql = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, most_recent_commit_at = ?, most_recent_commit = ? WHERE row_key = ? AND cf_id = ?";
|
|
executeInternal(String.format(cql, PAXOS),
|
|
UUIDGen.microsTimestamp(commit.ballot),
|
|
paxosTtl(commit.update.metadata),
|
|
commit.ballot,
|
|
commit.update.toBytes(),
|
|
commit.key,
|
|
commit.update.id());
|
|
}
|
|
|
|
/**
|
|
* Returns a RestorableMeter tracking the average read rate of a particular SSTable, restoring the last-seen rate
|
|
* from values in system.sstable_activity if present.
|
|
* @param keyspace the keyspace the sstable belongs to
|
|
* @param table the table the sstable belongs to
|
|
* @param generation the generation number for the sstable
|
|
*/
|
|
public static RestorableMeter getSSTableReadMeter(String keyspace, String table, int generation)
|
|
{
|
|
String cql = "SELECT * FROM system.%s WHERE keyspace_name=? and columnfamily_name=? and generation=?";
|
|
UntypedResultSet results = executeInternal(String.format(cql, SSTABLE_ACTIVITY), keyspace, table, generation);
|
|
|
|
if (results.isEmpty())
|
|
return new RestorableMeter();
|
|
|
|
UntypedResultSet.Row row = results.one();
|
|
double m15rate = row.getDouble("rate_15m");
|
|
double m120rate = row.getDouble("rate_120m");
|
|
return new RestorableMeter(m15rate, m120rate);
|
|
}
|
|
|
|
/**
|
|
* Writes the current read rates for a given SSTable to system.sstable_activity
|
|
*/
|
|
public static void persistSSTableReadMeter(String keyspace, String table, int generation, RestorableMeter meter)
|
|
{
|
|
// Store values with a one-day TTL to handle corner cases where cleanup might not occur
|
|
String cql = "INSERT INTO system.%s (keyspace_name, columnfamily_name, generation, rate_15m, rate_120m) VALUES (?, ?, ?, ?, ?) USING TTL 864000";
|
|
executeInternal(String.format(cql, SSTABLE_ACTIVITY),
|
|
keyspace,
|
|
table,
|
|
generation,
|
|
meter.fifteenMinuteRate(),
|
|
meter.twoHourRate());
|
|
}
|
|
|
|
/**
|
|
* Clears persisted read rates from system.sstable_activity for SSTables that have been deleted.
|
|
*/
|
|
public static void clearSSTableReadMeter(String keyspace, String table, int generation)
|
|
{
|
|
String cql = "DELETE FROM system.%s WHERE keyspace_name=? AND columnfamily_name=? and generation=?";
|
|
executeInternal(String.format(cql, SSTABLE_ACTIVITY), keyspace, table, generation);
|
|
}
|
|
#endif
|
|
|
|
std::vector<schema_ptr> all_tables() {
|
|
std::vector<schema_ptr> r;
|
|
auto legacy_tables = db::legacy_schema_tables::all_tables();
|
|
std::copy(legacy_tables.begin(), legacy_tables.end(), std::back_inserter(r));
|
|
r.push_back(built_indexes());
|
|
r.push_back(hints());
|
|
r.push_back(batchlog());
|
|
r.push_back(paxos());
|
|
r.push_back(local());
|
|
r.push_back(peers());
|
|
r.push_back(peer_events());
|
|
r.push_back(range_xfers());
|
|
r.push_back(compactions_in_progress());
|
|
r.push_back(compaction_history());
|
|
r.push_back(sstable_activity());
|
|
return r;
|
|
}
|
|
|
|
void make(database& db, bool durable) {
|
|
auto ksm = make_lw_shared<keyspace_metadata>(NAME,
|
|
"org.apache.cassandra.locator.LocalStrategy",
|
|
std::map<sstring, sstring>{},
|
|
durable
|
|
);
|
|
auto kscfg = db.make_keyspace_config(*ksm);
|
|
keyspace _ks{ksm, std::move(kscfg)};
|
|
auto rs(locator::abstract_replication_strategy::create_replication_strategy(NAME, "LocalStrategy", service::get_local_storage_service().get_token_metadata(), ksm->strategy_options()));
|
|
_ks.set_replication_strategy(std::move(rs));
|
|
db.add_keyspace(NAME, std::move(_ks));
|
|
auto& ks = db.find_keyspace(NAME);
|
|
for (auto&& table : all_tables()) {
|
|
db.add_column_family(table, ks.make_column_family_config(*table));
|
|
}
|
|
}
|
|
|
|
utils::UUID get_local_host_id() {
|
|
#if 0
|
|
String req = "SELECT host_id FROM system.%s WHERE key='%s'";
|
|
UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
|
|
|
|
// Look up the Host UUID (return it if found)
|
|
if (!result.isEmpty() && result.one().has("host_id"))
|
|
return result.one().getUUID("host_id");
|
|
#endif
|
|
|
|
// ID not found, generate a new one, persist, and then return it.
|
|
auto host_id = utils::make_random_uuid();
|
|
//logger.warn("No host ID found, created {} (Note: This should happen exactly once per node).", hostId);
|
|
return set_local_host_id(host_id);
|
|
}
|
|
|
|
utils::UUID set_local_host_id(const utils::UUID& host_id) {
|
|
// String req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', ?)";
|
|
// executeInternal(String.format(req, LOCAL, LOCAL), hostId);
|
|
return host_id;
|
|
}
|
|
std::unordered_map<gms::inet_address, locator::endpoint_dc_rack>
|
|
load_dc_rack_info()
|
|
{
|
|
std::unordered_map<gms::inet_address, locator::endpoint_dc_rack> result;
|
|
#if 0 //TODO
|
|
for (UntypedResultSet.Row row : executeInternal("SELECT peer, data_center, rack from system." + PEERS))
|
|
{
|
|
InetAddress peer = row.getInetAddress("peer");
|
|
if (row.has("data_center") && row.has("rack"))
|
|
{
|
|
Map<String, String> dcRack = new HashMap<>();
|
|
dcRack.put("data_center", row.getString("data_center"));
|
|
dcRack.put("rack", row.getString("rack"));
|
|
result.put(peer, dcRack);
|
|
}
|
|
}
|
|
#endif
|
|
return result;
|
|
}
|
|
|
|
future<lw_shared_ptr<query::result_set>>
|
|
query(service::storage_proxy& proxy, const sstring& cf_name) {
|
|
database& db = proxy.get_db().local();
|
|
schema_ptr schema = db.find_schema(db::system_keyspace::NAME, cf_name);
|
|
std::vector<column_id> regular_cols;
|
|
boost::range::push_back(regular_cols, schema->regular_columns() | boost::adaptors::transformed(std::mem_fn(&column_definition::id)));
|
|
std::vector<column_id> static_cols;
|
|
boost::range::push_back(static_cols, schema->static_columns() | boost::adaptors::transformed(std::mem_fn(&column_definition::id)));
|
|
auto opts = query::partition_slice::option_set::of<
|
|
query::partition_slice::option::send_partition_key,
|
|
query::partition_slice::option::send_clustering_key>();
|
|
query::partition_slice slice{{query::clustering_range::make_open_ended_both_sides()}, static_cols, regular_cols, opts};
|
|
auto cmd = make_lw_shared<query::read_command>(schema->id(), slice, std::numeric_limits<uint32_t>::max());
|
|
return proxy.query(cmd, {query::full_partition_range}, db::consistency_level::ONE).then([schema, cmd] (auto&& result) {
|
|
return make_lw_shared(query::result_set::from_raw_result(schema, cmd->slice, *result));
|
|
});
|
|
}
|
|
|
|
} // namespace system_keyspace
|
|
} // namespace db
|