Compare commits
47 Commits
branch-6.2
...
branch-0.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8d8932cb15 | ||
|
|
891476dfc6 | ||
|
|
346c729531 | ||
|
|
2c06609fc1 | ||
|
|
0f22c3ebe9 | ||
|
|
ee1ce3c6b4 | ||
|
|
b2f07c0d44 | ||
|
|
6c6447c131 | ||
|
|
ade185e518 | ||
|
|
d7001cad04 | ||
|
|
14504bdb25 | ||
|
|
22682636ae | ||
|
|
c409e3508e | ||
|
|
f137536c1c | ||
|
|
d738863ed6 | ||
|
|
7426cf980e | ||
|
|
8b88789dfb | ||
|
|
7dbcd5f2ca | ||
|
|
ffbf02deb5 | ||
|
|
65aa036c75 | ||
|
|
6ab1b2d453 | ||
|
|
11679b28f5 | ||
|
|
ffb5e6f01e | ||
|
|
6aea747275 | ||
|
|
2a840788fd | ||
|
|
6b443db4d9 | ||
|
|
9919211c25 | ||
|
|
c4f73f4e12 | ||
|
|
dcd62cc0be | ||
|
|
dcd2b85e02 | ||
|
|
1d1416f841 | ||
|
|
be552139ce | ||
|
|
1b45b5d649 | ||
|
|
7c1268765c | ||
|
|
9ef84d1f01 | ||
|
|
4e3b98f281 | ||
|
|
124489e8d8 | ||
|
|
7a2c57d6bd | ||
|
|
10543bf81e | ||
|
|
579a220162 | ||
|
|
8c5ffb84ce | ||
|
|
d05cdb0f6e | ||
|
|
df02fb7a3e | ||
|
|
559a8b41f2 | ||
|
|
8b1f18ee1a | ||
|
|
cbbd18a249 | ||
|
|
4db985e505 |
@@ -1,6 +1,6 @@
|
||||
#!/bin/sh
|
||||
|
||||
VERSION=666.development
|
||||
VERSION=0.19
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -423,7 +423,6 @@ scylla_core = (['database.cc',
|
||||
'service/client_state.cc',
|
||||
'service/migration_task.cc',
|
||||
'service/storage_service.cc',
|
||||
'service/pending_range_calculator_service.cc',
|
||||
'service/load_broadcaster.cc',
|
||||
'service/pager/paging_state.cc',
|
||||
'service/pager/query_pagers.cc',
|
||||
|
||||
@@ -338,13 +338,8 @@ lists::do_append(shared_ptr<term> t,
|
||||
if (!value) {
|
||||
m.set_cell(prefix, column, params.make_dead_cell());
|
||||
} else {
|
||||
auto&& to_add = list_value->_elements;
|
||||
auto deref = [] (const bytes_opt& v) { return *v; };
|
||||
auto&& newv = collection_mutation{list_type_impl::pack(
|
||||
boost::make_transform_iterator(to_add.begin(), deref),
|
||||
boost::make_transform_iterator(to_add.end(), deref),
|
||||
to_add.size(), cql_serialization_format::internal())};
|
||||
m.set_cell(prefix, column, atomic_cell_or_collection::from_collection_mutation(std::move(newv)));
|
||||
auto newv = list_value->get_with_protocol_version(cql_serialization_format::internal());
|
||||
m.set_cell(prefix, column, params.make_cell(std::move(newv)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
17
database.cc
17
database.cc
@@ -588,9 +588,7 @@ column_family::seal_active_memtable() {
|
||||
|
||||
future<stop_iteration>
|
||||
column_family::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old) {
|
||||
// FIXME: better way of ensuring we don't attempt to
|
||||
// overwrite an existing table.
|
||||
auto gen = _sstable_generation++ * smp::count + engine().cpu_id();
|
||||
auto gen = calculate_generation_for_new_table();
|
||||
|
||||
auto newtab = make_lw_shared<sstables::sstable>(_schema->ks_name(), _schema->cf_name(),
|
||||
_config.datadir, gen,
|
||||
@@ -1017,6 +1015,9 @@ future<> column_family::populate(sstring sstdir) {
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
}).then([this] {
|
||||
// Make sure this is called even if CF is empty
|
||||
mark_ready_for_writes();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1201,6 +1202,14 @@ database::init_system_keyspace() {
|
||||
return populate_keyspace(_cfg->data_file_directories()[0], db::system_keyspace::NAME).then([this]() {
|
||||
return init_commitlog();
|
||||
});
|
||||
}).then([this] {
|
||||
auto& ks = find_keyspace(db::system_keyspace::NAME);
|
||||
return parallel_for_each(ks.metadata()->cf_meta_data(), [this] (auto& pair) {
|
||||
auto cfm = pair.second;
|
||||
auto& cf = this->find_column_family(cfm);
|
||||
cf.mark_ready_for_writes();
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1718,11 +1727,9 @@ column_family::apply(const frozen_mutation& m, const schema_ptr& m_schema, const
|
||||
|
||||
void
|
||||
column_family::seal_on_overflow() {
|
||||
++_mutation_count;
|
||||
if (active_memtable().occupancy().total_space() >= _config.max_memtable_size) {
|
||||
// FIXME: if sparse, do some in-memory compaction first
|
||||
// FIXME: maybe merge with other in-memory memtables
|
||||
_mutation_count = 0;
|
||||
seal_active_memtable();
|
||||
}
|
||||
}
|
||||
|
||||
40
database.hh
40
database.hh
@@ -159,8 +159,8 @@ private:
|
||||
// the read lock, and the ones that wish to stop that process will take the write lock.
|
||||
rwlock _sstables_lock;
|
||||
mutable row_cache _cache; // Cache covers only sstables.
|
||||
int64_t _sstable_generation = 1;
|
||||
unsigned _mutation_count = 0;
|
||||
std::experimental::optional<int64_t> _sstable_generation = {};
|
||||
|
||||
db::replay_position _highest_flushed_rp;
|
||||
// Provided by the database that owns this commitlog
|
||||
db::commitlog* _commitlog;
|
||||
@@ -182,11 +182,17 @@ private:
|
||||
|
||||
// update the sstable generation, making sure that new new sstables don't overwrite this one.
|
||||
void update_sstables_known_generation(unsigned generation) {
|
||||
_sstable_generation = std::max<uint64_t>(_sstable_generation, generation / smp::count + 1);
|
||||
if (!_sstable_generation) {
|
||||
_sstable_generation = 1;
|
||||
}
|
||||
_sstable_generation = std::max<uint64_t>(*_sstable_generation, generation / smp::count + 1);
|
||||
}
|
||||
|
||||
uint64_t calculate_generation_for_new_table() {
|
||||
return _sstable_generation++ * smp::count + engine().cpu_id();
|
||||
assert(_sstable_generation);
|
||||
// FIXME: better way of ensuring we don't attempt to
|
||||
// overwrite an existing table.
|
||||
return (*_sstable_generation)++ * smp::count + engine().cpu_id();
|
||||
}
|
||||
|
||||
// Rebuild existing _sstables with new_sstables added to it and sstables_to_remove removed from it.
|
||||
@@ -205,6 +211,27 @@ private:
|
||||
std::chrono::steady_clock::time_point _sstable_writes_disabled_at;
|
||||
void do_trigger_compaction();
|
||||
public:
|
||||
|
||||
// This function should be called when this column family is ready for writes, IOW,
|
||||
// to produce SSTables. Extensive details about why this is important can be found
|
||||
// in Scylla's Github Issue #1014
|
||||
//
|
||||
// Nothing should be writing to SSTables before we have the chance to populate the
|
||||
// existing SSTables and calculate what should the next generation number be.
|
||||
//
|
||||
// However, if that happens, we want to protect against it in a way that does not
|
||||
// involve overwriting existing tables. This is one of the ways to do it: every
|
||||
// column family starts in an unwriteable state, and when it can finally be written
|
||||
// to, we mark it as writeable.
|
||||
//
|
||||
// Note that this *cannot* be a part of add_column_family. That adds a column family
|
||||
// to a db in memory only, and if anybody is about to write to a CF, that was most
|
||||
// likely already called. We need to call this explicitly when we are sure we're ready
|
||||
// to issue disk operations safely.
|
||||
void mark_ready_for_writes() {
|
||||
update_sstables_known_generation(0);
|
||||
}
|
||||
|
||||
// Creates a mutation reader which covers all data sources for this column family.
|
||||
// Caller needs to ensure that column_family remains live (FIXME: relax this).
|
||||
// Note: for data queries use query() instead.
|
||||
@@ -377,16 +404,11 @@ private:
|
||||
// But it is possible to synchronously wait for the seal to complete by
|
||||
// waiting on this future. This is useful in situations where we want to
|
||||
// synchronously flush data to disk.
|
||||
//
|
||||
// FIXME: A better interface would guarantee that all writes before this
|
||||
// one are also complete
|
||||
future<> seal_active_memtable();
|
||||
|
||||
// filter manifest.json files out
|
||||
static bool manifest_json_filter(const sstring& fname);
|
||||
|
||||
seastar::gate _in_flight_seals;
|
||||
|
||||
// Iterate over all partitions. Protocol is the same as std::all_of(),
|
||||
// so that iteration can be stopped by returning false.
|
||||
// Func signature: bool (const decorated_key& dk, const mutation_partition& mp)
|
||||
|
||||
@@ -607,10 +607,10 @@ future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std::vector
|
||||
#endif
|
||||
proxy.local().get_db().invoke_on_all([keyspaces_to_drop = std::move(keyspaces_to_drop)] (database& db) {
|
||||
// it is safe to drop a keyspace only when all nested ColumnFamilies where deleted
|
||||
for (auto&& keyspace_to_drop : keyspaces_to_drop) {
|
||||
return do_for_each(keyspaces_to_drop, [&db] (auto keyspace_to_drop) {
|
||||
db.drop_keyspace(keyspace_to_drop);
|
||||
service::get_local_migration_manager().notify_drop_keyspace(keyspace_to_drop);
|
||||
}
|
||||
return service::get_local_migration_manager().notify_drop_keyspace(keyspace_to_drop);
|
||||
});
|
||||
}).get0();
|
||||
});
|
||||
}
|
||||
@@ -650,7 +650,7 @@ future<std::set<sstring>> merge_keyspaces(distributed<service::storage_proxy>& p
|
||||
return do_for_each(created, [&db](auto&& val) {
|
||||
auto ksm = create_keyspace_from_schema_partition(val);
|
||||
return db.create_keyspace(ksm).then([ksm] {
|
||||
service::get_local_migration_manager().notify_create_keyspace(ksm);
|
||||
return service::get_local_migration_manager().notify_create_keyspace(ksm);
|
||||
});
|
||||
}).then([&altered, &db] () mutable {
|
||||
for (auto&& name : altered) {
|
||||
@@ -710,6 +710,8 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
|
||||
auto& ks = db.find_keyspace(s->ks_name());
|
||||
auto cfg = ks.make_column_family_config(*s);
|
||||
db.add_column_family(s, cfg);
|
||||
auto& cf = db.find_column_family(s);
|
||||
cf.mark_ready_for_writes();
|
||||
ks.make_directory_for_column_family(s->cf_name(), s->id()).get();
|
||||
service::get_local_migration_manager().notify_create_column_family(s);
|
||||
}
|
||||
|
||||
2
dist/ami/files/scylla-ami
vendored
2
dist/ami/files/scylla-ami
vendored
Submodule dist/ami/files/scylla-ami updated: d4a0e18637...84bcd0df6d
23
dist/common/scripts/scylla_io_setup
vendored
23
dist/common/scripts/scylla_io_setup
vendored
@@ -1,4 +1,4 @@
|
||||
#!/bin/sh -e
|
||||
#!/bin/sh
|
||||
|
||||
is_ami() {
|
||||
if [ "`dmidecode --string system-version | grep \.amazon`" != "" ] && \
|
||||
@@ -18,20 +18,21 @@ is_supported_instance_type() {
|
||||
}
|
||||
|
||||
is_developer_mode() {
|
||||
. /etc/os-release
|
||||
if [ "$NAME" = "Ubuntu" ]; then
|
||||
. /etc/default/scylla-server
|
||||
else
|
||||
. /etc/sysconfig/scylla-server
|
||||
fi
|
||||
echo $SCYLLA_ARGS|egrep -c "\-\-developer-mode(\s+|=)1"
|
||||
}
|
||||
|
||||
if [ ! -f /etc/scylla/io_configured ] && [ `is_developer_mode` -eq 0 ]; then
|
||||
if [ `is_ami` -eq 1 ]; then
|
||||
SMP=`echo $SCYLLA_ARGS|sed -e "s/^.*smp\(\s\+\|=\)\([0-9]*\).*$/\2/"`
|
||||
CPUSET=`echo $SCYLLA_ARGS|sed -e "s/^.*\(--cpuset\(\s\+\|=\)[0-9\-]*\).*$/\1/"`
|
||||
fi
|
||||
if [ `is_ami` -eq 1 ] && [ `is_supported_instance_type` -eq 1 ]; then
|
||||
NR_CPU=`cat /proc/cpuinfo |grep processor|wc -l`
|
||||
NR_DISKS=`curl http://169.254.169.254/latest/meta-data/block-device-mapping/|grep ephemeral|wc -l`
|
||||
|
||||
if [ "$SMP" != "" ]; then
|
||||
NR_CPU=$SMP
|
||||
fi
|
||||
NR_SHARDS=$NR_CPU
|
||||
if [ $NR_CPU -ge 8 ] && [ "$SET_NIC" = "no" ]; then
|
||||
NR_SHARDS=$((NR_CPU - 1))
|
||||
@@ -49,7 +50,13 @@ if [ ! -f /etc/scylla/io_configured ] && [ `is_developer_mode` -eq 0 ]; then
|
||||
|
||||
echo "SCYLLA_IO=\"--num-io-queues $NR_IO_QUEUES --max-io-requests $NR_REQS\"" > /etc/scylla.d/io.conf
|
||||
else
|
||||
iotune --evaluation-directory /var/lib/scylla --format envfile --options-file /etc/scylla.d/io.conf
|
||||
iotune --evaluation-directory /var/lib/scylla --format envfile --options-file /etc/scylla.d/io.conf $CPUSET
|
||||
if [ $? -ne 0 ]; then
|
||||
logger -p user.err "/var/lib/scylla did not pass validation tests, it may not be on XFS and/or has limited disk space."
|
||||
logger -p user.err "This is a non-supported setup, and performance is expected to be very bad."
|
||||
logger -p user.err "For better performance, placing your data on XFS-formatted directories is required."
|
||||
logger -p user.err " To override this error, see the developer_mode configuration option."
|
||||
fi
|
||||
fi
|
||||
touch /etc/scylla/io_configured
|
||||
fi
|
||||
|
||||
2
dist/common/scripts/scylla_prepare
vendored
2
dist/common/scripts/scylla_prepare
vendored
@@ -1,6 +1,6 @@
|
||||
#!/bin/sh -e
|
||||
|
||||
if [ -f /etc/scylla/ami_disabled ]; then
|
||||
if [ "$AMI" = "yes" ] && [ -f /etc/scylla/ami_disabled ]; then
|
||||
rm /etc/scylla/ami_disabled
|
||||
exit 1
|
||||
fi
|
||||
|
||||
8
dist/common/scripts/scylla_sysconfig_setup
vendored
8
dist/common/scripts/scylla_sysconfig_setup
vendored
@@ -12,6 +12,7 @@ print_usage() {
|
||||
echo " --homedir scylla home directory"
|
||||
echo " --confdir scylla config directory"
|
||||
echo " --setup-nic setup NIC's interrupts, RPS, XPS"
|
||||
echo " --ami AMI instance mode"
|
||||
exit 1
|
||||
}
|
||||
|
||||
@@ -60,6 +61,10 @@ while [ $# -gt 0 ]; do
|
||||
SETUP_NIC=1
|
||||
shift 1
|
||||
;;
|
||||
"--ami")
|
||||
AMI=yes
|
||||
shift 1
|
||||
;;
|
||||
*)
|
||||
print_usage
|
||||
;;
|
||||
@@ -71,7 +76,7 @@ echo Setting parameters on $SYSCONFIG/scylla-server
|
||||
ETHDRV=`/usr/lib/scylla/dpdk_nic_bind.py --status | grep if=$NIC | sed -e "s/^.*drv=//" -e "s/ .*$//"`
|
||||
ETHPCIID=`/usr/lib/scylla/dpdk_nic_bind.py --status | grep if=$NIC | awk '{print $1}'`
|
||||
NR_CPU=`cat /proc/cpuinfo |grep processor|wc -l`
|
||||
if [ $NR_CPU -ge 8 ] && [ "$SET_NIC" = "no" ]; then
|
||||
if [ "$AMI" = "yes" ] && [ $NR_CPU -ge 8 ] && [ "$SET_NIC" = "no" ]; then
|
||||
NR=$((NR_CPU - 1))
|
||||
SET_NIC="yes"
|
||||
SCYLLA_ARGS="$SCYLLA_ARGS --cpuset 1-$NR --smp $NR"
|
||||
@@ -86,5 +91,6 @@ sed -e s#^NETWORK_MODE=.*#NETWORK_MODE=$NETWORK_MODE# \
|
||||
-e s#^SCYLLA_CONF=.*#SCYLLA_CONF=$SCYLLA_CONF# \
|
||||
-e s#^SET_NIC=.*#SET_NIC=$SET_NIC# \
|
||||
-e "s#^SCYLLA_ARGS=.*#SCYLLA_ARGS=\"$SCYLLA_ARGS\"#" \
|
||||
-e s#^AMI=.*#AMI=$AMI# \
|
||||
$SYSCONFIG/scylla-server > /tmp/scylla-server
|
||||
mv /tmp/scylla-server $SYSCONFIG/scylla-server
|
||||
|
||||
4
dist/common/sysconfig/scylla-server
vendored
4
dist/common/sysconfig/scylla-server
vendored
@@ -40,5 +40,5 @@ SCYLLA_ARGS="--log-to-syslog 1 --log-to-stdout 0 --default-log-level info --coll
|
||||
## scylla arguments (for dpdk mode)
|
||||
#SCYLLA_ARGS="--log-to-syslog 1 --log-to-stdout 0 --default-log-level info --collectd-address=127.0.0.1:25826 --collectd=1 --collectd-poll-period 3000 --network-stack native --dpdk-pmd"
|
||||
|
||||
# scylla IO parameters (max-io-requests and max-io-queues) are automatically
|
||||
# configured, saved at /etc/scylla.d/io.conf
|
||||
# setup as AMI instance
|
||||
AMI=no
|
||||
|
||||
1
dist/redhat/systemd/scylla-io-setup.service
vendored
1
dist/redhat/systemd/scylla-io-setup.service
vendored
@@ -4,6 +4,7 @@ After=network.target
|
||||
|
||||
[Service]
|
||||
Type=oneshot
|
||||
EnvironmentFile=/etc/sysconfig/scylla-server
|
||||
ExecStart=/usr/lib/scylla/scylla_io_setup
|
||||
RemainAfterExit=yes
|
||||
TimeoutStartSec=1800
|
||||
|
||||
4
dist/ubuntu/debian/scylla-server.init
vendored
4
dist/ubuntu/debian/scylla-server.init
vendored
@@ -29,8 +29,8 @@ SCRIPTNAME=/etc/init.d/$NAME
|
||||
[ -x "$DAEMON" ] || exit 0
|
||||
|
||||
# Read configuration variable file if it is present
|
||||
[ -r /etc/default/$NAME ] && . /etc/default/$NAME
|
||||
export NETWORK_MODE TAP BRIDGE ETHDRV ETHPCIID NR_HUGEPAGES USER GROUP SCYLLA_HOME SCYLLA_CONF SCYLLA_ARGS
|
||||
eval "`grep -v -e "^\s*#" -e "^$" /etc/default/scylla-server|sed -e 's/^/export /'`"
|
||||
eval "`grep -v -e "^\s*#" -e "^$" /etc/scylla.d/*.conf|sed -e 's/^/export /'`"
|
||||
|
||||
# Define LSB log_* functions.
|
||||
. /lib/lsb/init-functions
|
||||
|
||||
12
dist/ubuntu/debian/scylla-server.upstart
vendored
12
dist/ubuntu/debian/scylla-server.upstart
vendored
@@ -25,20 +25,20 @@ chdir /var/lib/scylla
|
||||
env HOME=/var/lib/scylla
|
||||
|
||||
pre-start script
|
||||
. /etc/default/scylla-server
|
||||
export NETWORK_MODE TAP BRIDGE ETHDRV ETHPCIID NR_HUGEPAGES USER GROUP SCYLLA_HOME SCYLLA_CONF SCYLLA_ARGS SCYLLA_IO
|
||||
eval "`grep -v -e "^\s*#" -e "^$" /etc/default/scylla-server|sed -e 's/^/export /'`"
|
||||
eval "`grep -v -e "^\s*#" -e "^$" /etc/scylla.d/*.conf|sed -e 's/^/export /'`"
|
||||
sudo /usr/lib/scylla/scylla_prepare
|
||||
sudo /usr/lib/scylla/scylla_io_setup
|
||||
end script
|
||||
|
||||
script
|
||||
. /etc/default/scylla-server
|
||||
export NETWORK_MODE TAP BRIDGE ETHDRV ETHPCIID NR_HUGEPAGES USER GROUP SCYLLA_HOME SCYLLA_CONF SCYLLA_ARGS SCYLLA_IO
|
||||
eval "`grep -v -e "^\s*#" -e "^$" /etc/default/scylla-server|sed -e 's/^/export /'`"
|
||||
eval "`grep -v -e "^\s*#" -e "^$" /etc/scylla.d/*.conf|sed -e 's/^/export /'`"
|
||||
exec /usr/bin/scylla $SCYLLA_ARGS $SCYLLA_IO
|
||||
end script
|
||||
|
||||
post-stop script
|
||||
. /etc/default/scylla-server
|
||||
export NETWORK_MODE TAP BRIDGE ETHDRV ETHPCIID NR_HUGEPAGES USER GROUP SCYLLA_HOME SCYLLA_CONF SCYLLA_ARGS SCYLLA_IO
|
||||
eval "`grep -v -e "^\s*#" -e "^$" /etc/default/scylla-server|sed -e 's/^/export /'`"
|
||||
eval "`grep -v -e "^\s*#" -e "^$" /etc/scylla.d/*.conf|sed -e 's/^/export /'`"
|
||||
sudo /usr/lib/scylla/scylla_stop
|
||||
end script
|
||||
|
||||
7
dist/ubuntu/rules.in
vendored
7
dist/ubuntu/rules.in
vendored
@@ -8,6 +8,7 @@ SYSCTL = $(CURDIR)/debian/scylla-server/etc/sysctl.d
|
||||
SUDOERS = $(CURDIR)/debian/scylla-server/etc/sudoers.d
|
||||
LIMITS= $(CURDIR)/debian/scylla-server/etc/security/limits.d
|
||||
COLLECTD= $(CURDIR)/debian/scylla-server/etc/collectd/collectd.conf.d
|
||||
SCYLLAD= $(CURDIR)/debian/scylla-server/etc/scylla.d
|
||||
LIBS = $(CURDIR)/debian/scylla-server/usr/lib
|
||||
CONF = $(CURDIR)/debian/scylla-server/etc/scylla
|
||||
|
||||
@@ -33,6 +34,9 @@ override_dh_auto_install:
|
||||
mkdir -p $(COLLECTD) && \
|
||||
cp $(CURDIR)/dist/common/collectd.d/scylla.conf $(COLLECTD)
|
||||
|
||||
mkdir -p $(SCYLLAD) && \
|
||||
cp $(CURDIR)/dist/common/scylla.d/io.conf $(SCYLLAD)
|
||||
|
||||
mkdir -p $(CONF) && \
|
||||
cp $(CURDIR)/conf/scylla.yaml $(CONF)
|
||||
cp $(CURDIR)/conf/cassandra-rackdc.properties $(CONF)
|
||||
@@ -68,6 +72,9 @@ override_dh_auto_install:
|
||||
mkdir -p $(CURDIR)/debian/scylla-server/var/lib/scylla/commitlog
|
||||
mkdir -p $(CURDIR)/debian/scylla-server/var/lib/scylla/coredump
|
||||
|
||||
override_dh_installinit:
|
||||
dh_installinit --no-start
|
||||
|
||||
override_dh_strip:
|
||||
dh_strip --dbg-package=scylla-server-dbg
|
||||
%:
|
||||
|
||||
@@ -62,7 +62,12 @@ static const std::map<application_state, sstring> application_state_names = {
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const application_state& m) {
|
||||
os << application_state_names.at(m);
|
||||
auto it = application_state_names.find(m);
|
||||
if (it != application_state_names.end()) {
|
||||
os << application_state_names.at(m);
|
||||
} else {
|
||||
os << "UNKNOWN";
|
||||
}
|
||||
return os;
|
||||
}
|
||||
|
||||
|
||||
@@ -414,8 +414,14 @@ future<> gossiper::apply_state_locally(const std::map<inet_address, endpoint_sta
|
||||
// Runs inside seastar::async context
|
||||
void gossiper::remove_endpoint(inet_address endpoint) {
|
||||
// do subscribers first so anything in the subscriber that depends on gossiper state won't get confused
|
||||
_subscribers.for_each([endpoint] (auto& subscriber) {
|
||||
subscriber->on_remove(endpoint);
|
||||
// We can not run on_remove callbacks here becasue on_remove in
|
||||
// storage_service might take the gossiper::timer_callback_lock
|
||||
seastar::async([this, endpoint] {
|
||||
_subscribers.for_each([endpoint] (auto& subscriber) {
|
||||
subscriber->on_remove(endpoint);
|
||||
});
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.warn("Fail to call on_remove callback: {}", ep);
|
||||
});
|
||||
|
||||
if(_seeds.count(endpoint)) {
|
||||
|
||||
10
init.cc
10
init.cc
@@ -24,7 +24,6 @@
|
||||
#include "gms/failure_detector.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "service/pending_range_calculator_service.hh"
|
||||
#include "to_string.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
|
||||
@@ -34,14 +33,9 @@
|
||||
// until proper shutdown is done.
|
||||
|
||||
future<> init_storage_service(distributed<database>& db) {
|
||||
return service::get_pending_range_calculator_service().start(std::ref(db)).then([] {
|
||||
return service::init_storage_service(db).then([] {
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([] { return service::get_pending_range_calculator_service().stop(); });
|
||||
}).then([&db] {
|
||||
return service::init_storage_service(db).then([] {
|
||||
// #293 - do not stop anything
|
||||
//engine().at_exit([] { return service::deinit_storage_service(); });
|
||||
});
|
||||
//engine().at_exit([] { return service::deinit_storage_service(); });
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
4
log.cc
4
log.cc
@@ -100,7 +100,7 @@ logger::really_do_log(log_level level, const char* fmt, stringer** s, size_t n)
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count() % 1000;
|
||||
auto tm = std::chrono::system_clock::to_time_t(now);
|
||||
char tmp[100];
|
||||
strftime(tmp, sizeof(tmp), "%Y-%m-%d %T", std::localtime(&tm));
|
||||
strftime(tmp, sizeof(tmp), " %Y-%m-%d %T", std::localtime(&tm));
|
||||
out << tmp << sprint(",%03d", residual_millis);
|
||||
syslog_offset += 24;
|
||||
}
|
||||
@@ -258,6 +258,8 @@ std::ostream& operator<<(std::ostream& out, const std::exception_ptr& eptr) {
|
||||
out << " (error " << e.code() << ", " << e.code().message() << ")";
|
||||
} catch(const std::exception& e) {
|
||||
out << " (" << e.what() << ")";
|
||||
} catch(...) {
|
||||
// no extra info
|
||||
}
|
||||
}
|
||||
return out;
|
||||
|
||||
405
main.cc
405
main.cc
@@ -276,7 +276,8 @@ int main(int ac, char** av) {
|
||||
engine().set_strict_dma(false);
|
||||
}
|
||||
|
||||
return read_config(opts, *cfg).then([cfg, &db, &qp, &proxy, &mm, &ctx, &opts, &dirs]() {
|
||||
return seastar::async([cfg, &db, &qp, &proxy, &mm, &ctx, &opts, &dirs] {
|
||||
read_config(opts, *cfg).get();
|
||||
apply_logger_settings(cfg->default_log_level(), cfg->logger_log_level(),
|
||||
cfg->log_to_stdout(), cfg->log_to_syslog());
|
||||
verify_rlimit(cfg->developer_mode());
|
||||
@@ -292,9 +293,19 @@ int main(int ac, char** av) {
|
||||
sstring broadcast_rpc_address = cfg->broadcast_rpc_address();
|
||||
|
||||
if (!broadcast_address.empty()) {
|
||||
utils::fb_utilities::set_broadcast_address(broadcast_address);
|
||||
try {
|
||||
utils::fb_utilities::set_broadcast_address(broadcast_address);
|
||||
} catch (...) {
|
||||
startlog.error("Bad configuration: invalid 'broadcast_address': {}: {}", broadcast_address, std::current_exception());
|
||||
throw bad_configuration_error();
|
||||
}
|
||||
} else if (!listen_address.empty()) {
|
||||
utils::fb_utilities::set_broadcast_address(listen_address);
|
||||
try {
|
||||
utils::fb_utilities::set_broadcast_address(listen_address);
|
||||
} catch (...) {
|
||||
startlog.error("Bad configuration: invalid 'listen_address': {}: {}", listen_address, std::current_exception());
|
||||
throw bad_configuration_error();
|
||||
}
|
||||
} else {
|
||||
startlog.error("Bad configuration: neither listen_address nor broadcast_address are defined\n");
|
||||
throw bad_configuration_error();
|
||||
@@ -332,225 +343,195 @@ int main(int ac, char** av) {
|
||||
using namespace locator;
|
||||
// Re-apply strict-dma after we've read the config file, this time
|
||||
// to all reactors
|
||||
return parallel_for_each(boost::irange(0u, smp::count), [devmode = opts.count("developer-mode")] (unsigned cpu) {
|
||||
smp::invoke_on_all([devmode = opts.count("developer-mode")] {
|
||||
if (devmode) {
|
||||
engine().set_strict_dma(false);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}).then([cfg] {
|
||||
supervisor_notify("creating snitch");
|
||||
return i_endpoint_snitch::create_snitch(cfg->endpoint_snitch());
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([] { return i_endpoint_snitch::stop_snitch(); });
|
||||
}).then([api_address] {
|
||||
supervisor_notify("determining DNS name");
|
||||
return dns::gethostbyname(api_address);
|
||||
}).then([&db, api_address, api_port, &ctx] (dns::hostent e){
|
||||
supervisor_notify("starting API server");
|
||||
auto ip = e.addresses[0].in.s_addr;
|
||||
return ctx.http_server.start().then([api_address, api_port, ip, &ctx] {
|
||||
return api::set_server_init(ctx);
|
||||
}).then([api_address, api_port, ip, &ctx] {
|
||||
return ctx.http_server.listen(ipv4_addr{ip, api_port});
|
||||
}).then([api_address, api_port] {
|
||||
print("Scylla API server listening on %s:%s ...\n", api_address, api_port);
|
||||
}).get();
|
||||
supervisor_notify("creating snitch");
|
||||
i_endpoint_snitch::create_snitch(cfg->endpoint_snitch()).get();
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([] { return i_endpoint_snitch::stop_snitch(); });
|
||||
supervisor_notify("determining DNS name");
|
||||
dns::hostent e = dns::gethostbyname(api_address).get0();
|
||||
supervisor_notify("starting API server");
|
||||
auto ip = e.addresses[0].in.s_addr;
|
||||
ctx.http_server.start().get();
|
||||
api::set_server_init(ctx).get();
|
||||
ctx.http_server.listen(ipv4_addr{ip, api_port}).get();
|
||||
print("Scylla API server listening on %s:%s ...\n", api_address, api_port);
|
||||
supervisor_notify("initializing storage service");
|
||||
init_storage_service(db).get();
|
||||
supervisor_notify("starting per-shard database core");
|
||||
// Note: changed from using a move here, because we want the config object intact.
|
||||
db.start(std::ref(*cfg)).get();
|
||||
engine().at_exit([&db] {
|
||||
// #293 - do not stop anything - not even db (for real)
|
||||
//return db.stop();
|
||||
// call stop on each db instance, but leave the shareded<database> pointers alive.
|
||||
return db.invoke_on_all([](auto& db) {
|
||||
return db.stop();
|
||||
}).then([] {
|
||||
return sstables::await_background_jobs_on_all_shards();
|
||||
}).then([] {
|
||||
::_exit(0);
|
||||
});
|
||||
}).then([&db] {
|
||||
supervisor_notify("initializing storage service");
|
||||
return init_storage_service(db);
|
||||
}).then([&ctx] {
|
||||
return api::set_server_storage_service(ctx);
|
||||
}).then([&db, cfg] {
|
||||
});
|
||||
supervisor_notify("creating data directories");
|
||||
dirs.touch_and_lock(db.local().get_config().data_file_directories()).get();
|
||||
supervisor_notify("creating commitlog directory");
|
||||
dirs.touch_and_lock(db.local().get_config().commitlog_directory()).get();
|
||||
supervisor_notify("verifying data and commitlog directories");
|
||||
std::unordered_set<sstring> directories;
|
||||
directories.insert(db.local().get_config().data_file_directories().cbegin(),
|
||||
db.local().get_config().data_file_directories().cend());
|
||||
directories.insert(db.local().get_config().commitlog_directory());
|
||||
parallel_for_each(directories, [&db] (sstring pathname) {
|
||||
return disk_sanity(pathname, db.local().get_config().developer_mode());
|
||||
}).get();
|
||||
|
||||
supervisor_notify("starting per-shard database core");
|
||||
// Note: changed from using a move here, because we want the config object intact.
|
||||
return db.start(std::ref(*cfg)).then([&db] {
|
||||
engine().at_exit([&db] {
|
||||
|
||||
// #293 - do not stop anything - not even db (for real)
|
||||
//return db.stop();
|
||||
// call stop on each db instance, but leave the shareded<database> pointers alive.
|
||||
return db.invoke_on_all([](auto& db) {
|
||||
return db.stop();
|
||||
}).then([] {
|
||||
return sstables::await_background_jobs_on_all_shards();
|
||||
}).then([] {
|
||||
::_exit(0);
|
||||
});
|
||||
});
|
||||
});
|
||||
}).then([cfg, listen_address] {
|
||||
supervisor_notify("starting gossip");
|
||||
// Moved local parameters here, esp since with the
|
||||
// ssl stuff it gets to be a lot.
|
||||
uint16_t storage_port = cfg->storage_port();
|
||||
uint16_t ssl_storage_port = cfg->ssl_storage_port();
|
||||
double phi = cfg->phi_convict_threshold();
|
||||
auto seed_provider= cfg->seed_provider();
|
||||
sstring cluster_name = cfg->cluster_name();
|
||||
|
||||
const auto& ssl_opts = cfg->server_encryption_options();
|
||||
auto encrypt_what = get_or_default(ssl_opts, "internode_encryption", "none");
|
||||
auto trust_store = get_or_default(ssl_opts, "truststore");
|
||||
auto cert = get_or_default(ssl_opts, "certificate", relative_conf_dir("scylla.crt").string());
|
||||
auto key = get_or_default(ssl_opts, "keyfile", relative_conf_dir("scylla.key").string());
|
||||
|
||||
return init_ms_fd_gossiper(listen_address
|
||||
, storage_port
|
||||
, ssl_storage_port
|
||||
, encrypt_what
|
||||
, trust_store
|
||||
, cert
|
||||
, key
|
||||
, seed_provider
|
||||
, cluster_name
|
||||
, phi);
|
||||
}).then([&ctx] {
|
||||
return api::set_server_gossip(ctx);
|
||||
}).then([&db] {
|
||||
supervisor_notify("starting streaming service");
|
||||
return streaming::stream_session::init_streaming_service(db);
|
||||
}).then([&ctx] {
|
||||
return api::set_server_stream_manager(ctx);
|
||||
}).then([&db] {
|
||||
supervisor_notify("starting messaging service");
|
||||
// Start handling REPAIR_CHECKSUM_RANGE messages
|
||||
return net::get_messaging_service().invoke_on_all([&db] (auto& ms) {
|
||||
ms.register_repair_checksum_range([&db] (sstring keyspace, sstring cf, query::range<dht::token> range) {
|
||||
return do_with(std::move(keyspace), std::move(cf), std::move(range),
|
||||
[&db] (auto& keyspace, auto& cf, auto& range) {
|
||||
return checksum_range(db, keyspace, cf, range);
|
||||
});
|
||||
});
|
||||
});
|
||||
}).then([&ctx](){
|
||||
return api::set_server_messaging_service(ctx);
|
||||
}).then([&proxy, &db] {
|
||||
supervisor_notify("starting storage proxy");
|
||||
return proxy.start(std::ref(db)).then([&proxy] {
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([&proxy] { return proxy.stop(); });
|
||||
});
|
||||
}).then([&ctx]() {
|
||||
return api::set_server_storage_proxy(ctx);
|
||||
}).then([&mm] {
|
||||
supervisor_notify("starting migration manager");
|
||||
return mm.start().then([&mm] {
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([&mm] { return mm.stop(); });
|
||||
});
|
||||
}).then([&db, &proxy, &qp] {
|
||||
supervisor_notify("starting query processor");
|
||||
return qp.start(std::ref(proxy), std::ref(db)).then([&qp] {
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([&qp] { return qp.stop(); });
|
||||
});
|
||||
}).then([&qp] {
|
||||
supervisor_notify("initializing batchlog manager");
|
||||
return db::get_batchlog_manager().start(std::ref(qp)).then([] {
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([] { return db::get_batchlog_manager().stop(); });
|
||||
});
|
||||
}).then([&db, &dirs] {
|
||||
supervisor_notify("creating data directories");
|
||||
return dirs.touch_and_lock(db.local().get_config().data_file_directories());
|
||||
}).then([&db, &dirs] {
|
||||
supervisor_notify("creating commitlog directory");
|
||||
return dirs.touch_and_lock(db.local().get_config().commitlog_directory());
|
||||
}).then([&db] {
|
||||
supervisor_notify("verifying data and commitlog directories");
|
||||
std::unordered_set<sstring> directories;
|
||||
directories.insert(db.local().get_config().data_file_directories().cbegin(),
|
||||
db.local().get_config().data_file_directories().cend());
|
||||
directories.insert(db.local().get_config().commitlog_directory());
|
||||
return do_with(std::move(directories), [&db] (auto& directories) {
|
||||
return parallel_for_each(directories, [&db] (sstring pathname) {
|
||||
return disk_sanity(pathname, db.local().get_config().developer_mode());
|
||||
});
|
||||
});
|
||||
}).then([&db] {
|
||||
supervisor_notify("loading sstables");
|
||||
return db.invoke_on_all([] (database& db) {
|
||||
return db.init_system_keyspace();
|
||||
}).then([&db] {
|
||||
auto& ks = db.local().find_keyspace(db::system_keyspace::NAME);
|
||||
return parallel_for_each(ks.metadata()->cf_meta_data(), [&ks] (auto& pair) {
|
||||
auto cfm = pair.second;
|
||||
return ks.make_directory_for_column_family(cfm->cf_name(), cfm->id());
|
||||
});
|
||||
});
|
||||
}).then([&db, &proxy] {
|
||||
supervisor_notify("loading sstables");
|
||||
return db.invoke_on_all([&proxy] (database& db) {
|
||||
return db.load_sstables(proxy);
|
||||
});
|
||||
}).then([&ctx] {
|
||||
return api::set_server_load_sstable(ctx);
|
||||
}).then([&db, &qp] {
|
||||
supervisor_notify("setting up system keyspace");
|
||||
return db::system_keyspace::setup(db, qp);
|
||||
}).then([&db, &qp] {
|
||||
supervisor_notify("starting commit log");
|
||||
auto cl = db.local().commitlog();
|
||||
if (cl == nullptr) {
|
||||
// Deletion of previous stale, temporary SSTables is done by Shard0. Therefore,
|
||||
// let's run Shard0 first. Technically, we could just have all shards agree on
|
||||
// the deletion and just delete it later, but that is prone to races.
|
||||
//
|
||||
// Those races are not supposed to happen during normal operation, but if we have
|
||||
// bugs, they can. Scylla's Github Issue #1014 is an example of a situation where
|
||||
// that can happen, making existing problems worse. So running a single shard first
|
||||
// and getting making sure that all temporary tables are deleted provides extra
|
||||
// protection against such situations.
|
||||
db.invoke_on(0, [] (database& db) { return db.init_system_keyspace(); }).get();
|
||||
db.invoke_on_all([] (database& db) {
|
||||
if (engine().cpu_id() == 0) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return cl->list_existing_segments().then([&db, &qp](auto paths) {
|
||||
if (paths.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return db.init_system_keyspace();
|
||||
}).get();
|
||||
supervisor_notify("starting gossip");
|
||||
// Moved local parameters here, esp since with the
|
||||
// ssl stuff it gets to be a lot.
|
||||
uint16_t storage_port = cfg->storage_port();
|
||||
uint16_t ssl_storage_port = cfg->ssl_storage_port();
|
||||
double phi = cfg->phi_convict_threshold();
|
||||
auto seed_provider= cfg->seed_provider();
|
||||
sstring cluster_name = cfg->cluster_name();
|
||||
|
||||
const auto& ssl_opts = cfg->server_encryption_options();
|
||||
auto encrypt_what = get_or_default(ssl_opts, "internode_encryption", "none");
|
||||
auto trust_store = get_or_default(ssl_opts, "truststore");
|
||||
auto cert = get_or_default(ssl_opts, "certificate", relative_conf_dir("scylla.crt").string());
|
||||
auto key = get_or_default(ssl_opts, "keyfile", relative_conf_dir("scylla.key").string());
|
||||
|
||||
init_ms_fd_gossiper(listen_address
|
||||
, storage_port
|
||||
, ssl_storage_port
|
||||
, encrypt_what
|
||||
, trust_store
|
||||
, cert
|
||||
, key
|
||||
, seed_provider
|
||||
, cluster_name
|
||||
, phi).get();
|
||||
supervisor_notify("starting messaging service");
|
||||
supervisor_notify("starting storage proxy");
|
||||
proxy.start(std::ref(db)).get();
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([&proxy] { return proxy.stop(); });
|
||||
supervisor_notify("starting migration manager");
|
||||
mm.start().get();
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([&mm] { return mm.stop(); });
|
||||
supervisor_notify("starting query processor");
|
||||
qp.start(std::ref(proxy), std::ref(db)).get();
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([&qp] { return qp.stop(); });
|
||||
supervisor_notify("initializing batchlog manager");
|
||||
db::get_batchlog_manager().start(std::ref(qp)).get();
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([] { return db::get_batchlog_manager().stop(); });
|
||||
supervisor_notify("loading sstables");
|
||||
auto& ks = db.local().find_keyspace(db::system_keyspace::NAME);
|
||||
parallel_for_each(ks.metadata()->cf_meta_data(), [&ks] (auto& pair) {
|
||||
auto cfm = pair.second;
|
||||
return ks.make_directory_for_column_family(cfm->cf_name(), cfm->id());
|
||||
}).get();
|
||||
supervisor_notify("loading sstables");
|
||||
// See comment on top of our call to init_system_keyspace as per why we invoke
|
||||
// on Shard0 first. Scylla's Github Issue #1014 for details
|
||||
db.invoke_on(0, [&proxy] (database& db) { return db.load_sstables(proxy); }).get();
|
||||
db.invoke_on_all([&proxy] (database& db) {
|
||||
if (engine().cpu_id() == 0) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return db.load_sstables(proxy);
|
||||
}).get();
|
||||
supervisor_notify("setting up system keyspace");
|
||||
db::system_keyspace::setup(db, qp).get();
|
||||
supervisor_notify("starting commit log");
|
||||
auto cl = db.local().commitlog();
|
||||
if (cl != nullptr) {
|
||||
auto paths = cl->list_existing_segments().get0();
|
||||
if (!paths.empty()) {
|
||||
supervisor_notify("replaying commit log");
|
||||
return db::commitlog_replayer::create_replayer(qp).then([paths](auto rp) {
|
||||
return do_with(std::move(rp), [paths = std::move(paths)](auto& rp) {
|
||||
return rp.recover(paths);
|
||||
});
|
||||
}).then([&db] {
|
||||
supervisor_notify("replaying commit log - flushing memtables");
|
||||
return db.invoke_on_all([] (database& db) {
|
||||
return db.flush_all_memtables();
|
||||
});
|
||||
}).then([paths] {
|
||||
supervisor_notify("replaying commit log - removing old commitlog segments");
|
||||
for (auto& path : paths) {
|
||||
::unlink(path.c_str());
|
||||
}
|
||||
auto rp = db::commitlog_replayer::create_replayer(qp).get0();
|
||||
rp.recover(paths).get();
|
||||
supervisor_notify("replaying commit log - flushing memtables");
|
||||
db.invoke_on_all([] (database& db) {
|
||||
return db.flush_all_memtables();
|
||||
}).get();
|
||||
supervisor_notify("replaying commit log - removing old commitlog segments");
|
||||
for (auto& path : paths) {
|
||||
::unlink(path.c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
api::set_server_storage_service(ctx).get();
|
||||
api::set_server_gossip(ctx).get();
|
||||
api::set_server_messaging_service(ctx).get();
|
||||
api::set_server_storage_proxy(ctx).get();
|
||||
api::set_server_load_sstable(ctx).get();
|
||||
supervisor_notify("initializing migration manager RPC verbs");
|
||||
service::get_migration_manager().invoke_on_all([] (auto& mm) {
|
||||
mm.init_messaging_service();
|
||||
}).get();
|
||||
supervisor_notify("initializing storage proxy RPC verbs");
|
||||
proxy.invoke_on_all([] (service::storage_proxy& p) {
|
||||
p.init_messaging_service();
|
||||
}).get();
|
||||
supervisor_notify("starting streaming service");
|
||||
streaming::stream_session::init_streaming_service(db).get();
|
||||
api::set_server_stream_manager(ctx).get();
|
||||
// Start handling REPAIR_CHECKSUM_RANGE messages
|
||||
net::get_messaging_service().invoke_on_all([&db] (auto& ms) {
|
||||
ms.register_repair_checksum_range([&db] (sstring keyspace, sstring cf, query::range<dht::token> range) {
|
||||
return do_with(std::move(keyspace), std::move(cf), std::move(range),
|
||||
[&db] (auto& keyspace, auto& cf, auto& range) {
|
||||
return checksum_range(db, keyspace, cf, range);
|
||||
});
|
||||
});
|
||||
}).then([] {
|
||||
supervisor_notify("starting storage service", true);
|
||||
auto& ss = service::get_local_storage_service();
|
||||
return ss.init_server();
|
||||
}).then([&ctx] {
|
||||
return api::set_server_storage_service(ctx);
|
||||
}).then([] {
|
||||
supervisor_notify("starting batchlog manager");
|
||||
return db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) {
|
||||
return b.start();
|
||||
});
|
||||
}).then([&db] {
|
||||
supervisor_notify("starting load broadcaster");
|
||||
// should be unique_ptr, but then lambda passed to at_exit will be non copieable and
|
||||
// casting to std::function<> will fail to compile
|
||||
auto lb = make_shared<service::load_broadcaster>(db, gms::get_local_gossiper());
|
||||
lb->start_broadcasting();
|
||||
service::get_local_storage_service().set_load_broadcaster(lb);
|
||||
engine().at_exit([lb = std::move(lb)] () mutable { return lb->stop_broadcasting(); });
|
||||
}).then([] {
|
||||
return gms::get_local_gossiper().wait_for_gossip_to_settle();
|
||||
}).then([&ctx] {
|
||||
return api::set_server_gossip_settle(ctx);
|
||||
}).then([start_thrift] () {
|
||||
supervisor_notify("starting native transport");
|
||||
return service::get_local_storage_service().start_native_transport().then([start_thrift] () {
|
||||
if (start_thrift) {
|
||||
return service::get_local_storage_service().start_rpc_server();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).then([&ctx] {
|
||||
return api::set_server_done(ctx);
|
||||
});
|
||||
}).then([] {
|
||||
}).get();
|
||||
supervisor_notify("starting storage service", true);
|
||||
auto& ss = service::get_local_storage_service();
|
||||
ss.init_server().get();
|
||||
api::set_server_storage_service(ctx).get();
|
||||
supervisor_notify("starting batchlog manager");
|
||||
db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) {
|
||||
return b.start();
|
||||
}).get();
|
||||
supervisor_notify("starting load broadcaster");
|
||||
// should be unique_ptr, but then lambda passed to at_exit will be non copieable and
|
||||
// casting to std::function<> will fail to compile
|
||||
auto lb = make_shared<service::load_broadcaster>(db, gms::get_local_gossiper());
|
||||
lb->start_broadcasting();
|
||||
service::get_local_storage_service().set_load_broadcaster(lb);
|
||||
engine().at_exit([lb = std::move(lb)] () mutable { return lb->stop_broadcasting(); });
|
||||
gms::get_local_gossiper().wait_for_gossip_to_settle().get();
|
||||
api::set_server_gossip_settle(ctx).get();
|
||||
supervisor_notify("starting native transport");
|
||||
service::get_local_storage_service().start_native_transport().get();
|
||||
if (start_thrift) {
|
||||
service::get_local_storage_service().start_rpc_server().get();
|
||||
}
|
||||
api::set_server_done(ctx).get();
|
||||
supervisor_notify("serving");
|
||||
// Register at_exit last, so that storage_service::drain_on_shutdown will be called first
|
||||
engine().at_exit([] {
|
||||
|
||||
@@ -113,11 +113,11 @@ struct messaging_service::rpc_protocol_wrapper : public rpc_protocol { using rpc
|
||||
class messaging_service::rpc_protocol_client_wrapper {
|
||||
std::unique_ptr<rpc_protocol::client> _p;
|
||||
public:
|
||||
rpc_protocol_client_wrapper(rpc_protocol& proto, ipv4_addr addr, ipv4_addr local = ipv4_addr())
|
||||
: _p(std::make_unique<rpc_protocol::client>(proto, addr, local)) {
|
||||
rpc_protocol_client_wrapper(rpc_protocol& proto, rpc::client_options opts, ipv4_addr addr, ipv4_addr local = ipv4_addr())
|
||||
: _p(std::make_unique<rpc_protocol::client>(proto, std::move(opts), addr, local)) {
|
||||
}
|
||||
rpc_protocol_client_wrapper(rpc_protocol& proto, ipv4_addr addr, ipv4_addr local, ::shared_ptr<seastar::tls::server_credentials> c)
|
||||
: _p(std::make_unique<rpc_protocol::client>(proto, addr, seastar::tls::connect(c, addr, local)))
|
||||
rpc_protocol_client_wrapper(rpc_protocol& proto, rpc::client_options opts, ipv4_addr addr, ipv4_addr local, ::shared_ptr<seastar::tls::server_credentials> c)
|
||||
: _p(std::make_unique<rpc_protocol::client>(proto, std::move(opts), addr, seastar::tls::connect(c, addr, local)))
|
||||
{}
|
||||
auto get_stats() const { return _p->get_stats(); }
|
||||
future<> stop() { return _p->stop(); }
|
||||
@@ -391,10 +391,14 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
|
||||
auto remote_addr = ipv4_addr(get_preferred_ip(id.addr).raw_addr(), must_encrypt ? _ssl_port : _port);
|
||||
auto local_addr = ipv4_addr{_listen_address.raw_addr(), 0};
|
||||
|
||||
rpc::client_options opts;
|
||||
// send keepalive messages each minute if connection is idle, drop connection after 10 failures
|
||||
opts.keepalive = std::experimental::optional<net::tcp_keepalive_params>({60s, 60s, 10});
|
||||
|
||||
auto client = must_encrypt ?
|
||||
::make_shared<rpc_protocol_client_wrapper>(*_rpc,
|
||||
::make_shared<rpc_protocol_client_wrapper>(*_rpc, std::move(opts),
|
||||
remote_addr, local_addr, _credentials) :
|
||||
::make_shared<rpc_protocol_client_wrapper>(*_rpc,
|
||||
::make_shared<rpc_protocol_client_wrapper>(*_rpc, std::move(opts),
|
||||
remote_addr, local_addr);
|
||||
|
||||
it = _clients[idx].emplace(id, shard_info(std::move(client))).first;
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 906b562a04...0739576bd6
@@ -50,18 +50,21 @@ public:
|
||||
virtual ~migration_listener()
|
||||
{ }
|
||||
|
||||
// The callback runs inside seastar thread
|
||||
virtual void on_create_keyspace(const sstring& ks_name) = 0;
|
||||
virtual void on_create_column_family(const sstring& ks_name, const sstring& cf_name) = 0;
|
||||
virtual void on_create_user_type(const sstring& ks_name, const sstring& type_name) = 0;
|
||||
virtual void on_create_function(const sstring& ks_name, const sstring& function_name) = 0;
|
||||
virtual void on_create_aggregate(const sstring& ks_name, const sstring& aggregate_name) = 0;
|
||||
|
||||
// The callback runs inside seastar thread
|
||||
virtual void on_update_keyspace(const sstring& ks_name) = 0;
|
||||
virtual void on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool columns_changed) = 0;
|
||||
virtual void on_update_user_type(const sstring& ks_name, const sstring& type_name) = 0;
|
||||
virtual void on_update_function(const sstring& ks_name, const sstring& function_name) = 0;
|
||||
virtual void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) = 0;
|
||||
|
||||
// The callback runs inside seastar thread
|
||||
virtual void on_drop_keyspace(const sstring& ks_name) = 0;
|
||||
virtual void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) = 0;
|
||||
virtual void on_drop_user_type(const sstring& ks_name, const sstring& type_name) = 0;
|
||||
|
||||
@@ -66,9 +66,43 @@ migration_manager::migration_manager()
|
||||
|
||||
future<> migration_manager::stop()
|
||||
{
|
||||
if (ms_inited) {
|
||||
uninit_messaging_service();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
void migration_manager::init_messaging_service()
|
||||
{
|
||||
ms_inited = true;
|
||||
auto& ms = net::get_local_messaging_service();
|
||||
ms.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector<frozen_mutation> m) {
|
||||
auto src = net::messaging_service::get_source(cinfo);
|
||||
do_with(std::move(m), get_local_shared_storage_proxy(), [src] (const std::vector<frozen_mutation>& mutations, shared_ptr<storage_proxy>& p) {
|
||||
return service::get_local_migration_manager().merge_schema_from(src, mutations);
|
||||
}).then_wrapped([src] (auto&& f) {
|
||||
if (f.failed()) {
|
||||
logger.error("Failed to update definitions from {}: {}", src, f.get_exception());
|
||||
} else {
|
||||
logger.debug("Applied definitions update from {}.", src);
|
||||
}
|
||||
});
|
||||
return net::messaging_service::no_wait();
|
||||
});
|
||||
ms.register_migration_request([this] () {
|
||||
return db::schema_tables::convert_schema_to_mutations(get_storage_proxy()).finally([p = get_local_shared_storage_proxy()] {
|
||||
// keep local proxy alive
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void migration_manager::uninit_messaging_service()
|
||||
{
|
||||
auto& ms = net::get_local_messaging_service();
|
||||
ms.unregister_migration_request();
|
||||
ms.unregister_definitions_update();
|
||||
}
|
||||
|
||||
void migration_manager::register_listener(migration_listener* listener)
|
||||
{
|
||||
_listeners.emplace_back(listener);
|
||||
@@ -197,8 +231,8 @@ bool migration_manager::should_pull_schema_from(const gms::inet_address& endpoin
|
||||
&& !gms::get_local_gossiper().is_gossip_only_member(endpoint);
|
||||
}
|
||||
|
||||
void migration_manager::notify_create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm)
|
||||
{
|
||||
future<> migration_manager::notify_create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
|
||||
return seastar::async([this, ksm] {
|
||||
auto&& name = ksm->name();
|
||||
for (auto&& listener : _listeners) {
|
||||
try {
|
||||
@@ -207,10 +241,11 @@ void migration_manager::notify_create_keyspace(const lw_shared_ptr<keyspace_meta
|
||||
logger.warn("Create keyspace notification failed {}: {}", name, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void migration_manager::notify_create_column_family(const schema_ptr& cfm)
|
||||
{
|
||||
future<> migration_manager::notify_create_column_family(const schema_ptr& cfm) {
|
||||
return seastar::async([this, cfm] {
|
||||
auto&& ks_name = cfm->ks_name();
|
||||
auto&& cf_name = cfm->cf_name();
|
||||
for (auto&& listener : _listeners) {
|
||||
@@ -220,6 +255,7 @@ void migration_manager::notify_create_column_family(const schema_ptr& cfm)
|
||||
logger.warn("Create column family notification failed {}.{}: {}", ks_name, cf_name, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#if 0
|
||||
@@ -242,8 +278,8 @@ public void notifyCreateAggregate(UDAggregate udf)
|
||||
}
|
||||
#endif
|
||||
|
||||
void migration_manager::notify_update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm)
|
||||
{
|
||||
future<> migration_manager::notify_update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
|
||||
return seastar::async([this, ksm] {
|
||||
auto&& name = ksm->name();
|
||||
for (auto&& listener : _listeners) {
|
||||
try {
|
||||
@@ -252,10 +288,11 @@ void migration_manager::notify_update_keyspace(const lw_shared_ptr<keyspace_meta
|
||||
logger.warn("Update keyspace notification failed {}: {}", name, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void migration_manager::notify_update_column_family(const schema_ptr& cfm, bool columns_changed)
|
||||
{
|
||||
future<> migration_manager::notify_update_column_family(const schema_ptr& cfm, bool columns_changed) {
|
||||
return seastar::async([this, cfm, columns_changed] {
|
||||
auto&& ks_name = cfm->ks_name();
|
||||
auto&& cf_name = cfm->cf_name();
|
||||
for (auto&& listener : _listeners) {
|
||||
@@ -265,6 +302,7 @@ void migration_manager::notify_update_column_family(const schema_ptr& cfm, bool
|
||||
logger.warn("Update column family notification failed {}.{}: {}", ks_name, cf_name, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#if 0
|
||||
@@ -287,8 +325,8 @@ public void notifyUpdateAggregate(UDAggregate udf)
|
||||
}
|
||||
#endif
|
||||
|
||||
void migration_manager::notify_drop_keyspace(const sstring& ks_name)
|
||||
{
|
||||
future<> migration_manager::notify_drop_keyspace(const sstring& ks_name) {
|
||||
return seastar::async([this, ks_name] {
|
||||
for (auto&& listener : _listeners) {
|
||||
try {
|
||||
listener->on_drop_keyspace(ks_name);
|
||||
@@ -296,10 +334,11 @@ void migration_manager::notify_drop_keyspace(const sstring& ks_name)
|
||||
logger.warn("Drop keyspace notification failed {}: {}", ks_name, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void migration_manager::notify_drop_column_family(const schema_ptr& cfm)
|
||||
{
|
||||
future<> migration_manager::notify_drop_column_family(const schema_ptr& cfm) {
|
||||
return seastar::async([this, cfm] {
|
||||
auto&& cf_name = cfm->cf_name();
|
||||
auto&& ks_name = cfm->ks_name();
|
||||
for (auto&& listener : _listeners) {
|
||||
@@ -309,6 +348,7 @@ void migration_manager::notify_drop_column_family(const schema_ptr& cfm)
|
||||
logger.warn("Drop column family notification failed {}.{}: {}", ks_name, cf_name, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
||||
@@ -52,10 +52,12 @@
|
||||
|
||||
namespace service {
|
||||
|
||||
class migration_manager {
|
||||
class migration_manager : public seastar::async_sharded_service<migration_manager> {
|
||||
std::vector<migration_listener*> _listeners;
|
||||
|
||||
static const std::chrono::milliseconds migration_delay;
|
||||
|
||||
bool ms_inited = false;
|
||||
public:
|
||||
migration_manager();
|
||||
|
||||
@@ -79,12 +81,12 @@ public:
|
||||
// Keep mutations alive around whole async operation.
|
||||
future<> merge_schema_from(net::messaging_service::msg_addr src, const std::vector<frozen_mutation>& mutations);
|
||||
|
||||
void notify_create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
|
||||
void notify_create_column_family(const schema_ptr& cfm);
|
||||
void notify_update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
|
||||
void notify_update_column_family(const schema_ptr& cfm, bool columns_changed);
|
||||
void notify_drop_keyspace(const sstring& ks_name);
|
||||
void notify_drop_column_family(const schema_ptr& cfm);
|
||||
future<> notify_create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
|
||||
future<> notify_create_column_family(const schema_ptr& cfm);
|
||||
future<> notify_update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
|
||||
future<> notify_update_column_family(const schema_ptr& cfm, bool columns_changed);
|
||||
future<> notify_drop_keyspace(const sstring& ks_name);
|
||||
future<> notify_drop_column_family(const schema_ptr& cfm);
|
||||
|
||||
bool should_pull_schema_from(const gms::inet_address& endpoint);
|
||||
|
||||
@@ -118,6 +120,10 @@ public:
|
||||
future<> stop();
|
||||
|
||||
bool is_ready_for_bootstrap();
|
||||
|
||||
void init_messaging_service();
|
||||
private:
|
||||
void uninit_messaging_service();
|
||||
};
|
||||
|
||||
extern distributed<migration_manager> _the_migration_manager;
|
||||
|
||||
@@ -1,90 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Modified by ScyllaDB
|
||||
* Copyright 2015 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "service/pending_range_calculator_service.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "database.hh"
|
||||
#include <seastar/core/sleep.hh>
|
||||
|
||||
namespace service {
|
||||
|
||||
distributed<pending_range_calculator_service> _the_pending_range_calculator_service;
|
||||
|
||||
void pending_range_calculator_service::run() {
|
||||
// long start = System.currentTimeMillis();
|
||||
auto keyspaces = _db.local().get_non_system_keyspaces();
|
||||
for (auto& keyspace_name : keyspaces) {
|
||||
auto& ks = _db.local().find_keyspace(keyspace_name);
|
||||
calculate_pending_ranges(ks.get_replication_strategy(), keyspace_name);
|
||||
}
|
||||
_update_jobs--;
|
||||
// logger.debug("finished calculation for {} keyspaces in {}ms", keyspaces.size(), System.currentTimeMillis() - start);
|
||||
}
|
||||
|
||||
void pending_range_calculator_service::calculate_pending_ranges(locator::abstract_replication_strategy& strategy, const sstring& keyspace_name) {
|
||||
get_local_storage_service().get_token_metadata().calculate_pending_ranges(strategy, keyspace_name);
|
||||
}
|
||||
|
||||
future<> pending_range_calculator_service::stop() {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> pending_range_calculator_service::update() {
|
||||
return smp::submit_to(0, [] {
|
||||
get_local_pending_range_calculator_service().do_update();
|
||||
});
|
||||
}
|
||||
|
||||
void pending_range_calculator_service::do_update() {
|
||||
assert(engine().cpu_id() == 0);
|
||||
get_local_pending_range_calculator_service()._update_jobs++;
|
||||
get_local_pending_range_calculator_service().run();
|
||||
}
|
||||
|
||||
future<> pending_range_calculator_service::block_until_finished() {
|
||||
// We want to be sure the job we're blocking for is actually finished and we can't trust the TPE's active job count
|
||||
return smp::submit_to(0, [] {
|
||||
return do_until(
|
||||
[] { return !(get_local_pending_range_calculator_service()._update_jobs > 0); },
|
||||
[] { return sleep(std::chrono::milliseconds(100)); });
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,70 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Modified by ScyllaDB
|
||||
* Copyright 2015 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "database.hh"
|
||||
#include <seastar/core/sharded.hh>
|
||||
|
||||
namespace service {
|
||||
|
||||
class pending_range_calculator_service {
|
||||
private:
|
||||
int _update_jobs{0};
|
||||
distributed<database>& _db;
|
||||
void calculate_pending_ranges(locator::abstract_replication_strategy& strategy, const sstring& keyspace_name);
|
||||
void run();
|
||||
public:
|
||||
pending_range_calculator_service(distributed<database>& db) : _db(db) {}
|
||||
void do_update();
|
||||
future<> update();
|
||||
future<> block_until_finished();
|
||||
future<> stop();
|
||||
};
|
||||
|
||||
extern distributed<pending_range_calculator_service> _the_pending_range_calculator_service;
|
||||
|
||||
inline distributed<pending_range_calculator_service>& get_pending_range_calculator_service() {
|
||||
return _the_pending_range_calculator_service;
|
||||
}
|
||||
|
||||
inline pending_range_calculator_service& get_local_pending_range_calculator_service() {
|
||||
return _the_pending_range_calculator_service.local();
|
||||
}
|
||||
|
||||
} // service
|
||||
@@ -205,12 +205,20 @@ class datacenter_write_response_handler : public abstract_write_response_handler
|
||||
}
|
||||
}
|
||||
public:
|
||||
using abstract_write_response_handler::abstract_write_response_handler;
|
||||
datacenter_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
|
||||
schema_ptr s, lw_shared_ptr<const frozen_mutation> mutation, std::unordered_set<gms::inet_address> targets,
|
||||
std::vector<gms::inet_address> pending_endpoints, std::vector<gms::inet_address> dead_endpoints) :
|
||||
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation),
|
||||
std::move(targets), boost::range::count_if(pending_endpoints, db::is_local), std::move(dead_endpoints)) {}
|
||||
};
|
||||
|
||||
class write_response_handler : public abstract_write_response_handler {
|
||||
public:
|
||||
using abstract_write_response_handler::abstract_write_response_handler;
|
||||
write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
|
||||
schema_ptr s, lw_shared_ptr<const frozen_mutation> mutation, std::unordered_set<gms::inet_address> targets,
|
||||
std::vector<gms::inet_address> pending_endpoints, std::vector<gms::inet_address> dead_endpoints) :
|
||||
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation),
|
||||
std::move(targets), pending_endpoints.size(), std::move(dead_endpoints)) {}
|
||||
};
|
||||
|
||||
class datacenter_sync_write_response_handler : public abstract_write_response_handler {
|
||||
@@ -228,16 +236,20 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha
|
||||
public:
|
||||
datacenter_sync_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
|
||||
schema_ptr s,
|
||||
lw_shared_ptr<const frozen_mutation> mutation, std::unordered_set<gms::inet_address> targets, size_t pending_endpoints,
|
||||
lw_shared_ptr<const frozen_mutation> mutation, std::unordered_set<gms::inet_address> targets, std::vector<gms::inet_address> pending_endpoints,
|
||||
std::vector<gms::inet_address> dead_endpoints) :
|
||||
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation), targets, pending_endpoints, dead_endpoints) {
|
||||
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation), targets, 0, dead_endpoints) {
|
||||
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
|
||||
|
||||
for (auto& target : targets) {
|
||||
auto dc = snitch_ptr->get_datacenter(target);
|
||||
|
||||
if (_dc_responses.find(dc) == _dc_responses.end()) {
|
||||
_dc_responses.emplace(dc, db::local_quorum_for(ks, dc));
|
||||
auto pending_for_dc = boost::range::count_if(pending_endpoints, [&snitch_ptr, &dc] (gms::inet_address& ep){
|
||||
return snitch_ptr->get_datacenter(ep) == dc;
|
||||
});
|
||||
_dc_responses.emplace(dc, db::local_quorum_for(ks, dc) + pending_for_dc).first;
|
||||
_pending_endpoints += pending_for_dc;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -316,24 +328,21 @@ storage_proxy::response_id_type storage_proxy::create_write_response_handler(sch
|
||||
{
|
||||
std::unique_ptr<abstract_write_response_handler> h;
|
||||
auto& rs = ks.get_replication_strategy();
|
||||
size_t pending_count = pending_endpoints.size();
|
||||
|
||||
auto m = make_lw_shared<const frozen_mutation>(std::move(mutation));
|
||||
|
||||
if (db::is_datacenter_local(cl)) {
|
||||
pending_count = std::count_if(pending_endpoints.begin(), pending_endpoints.end(), db::is_local);
|
||||
h = std::make_unique<datacenter_write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), pending_count, std::move(dead_endpoints));
|
||||
h = std::make_unique<datacenter_write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints));
|
||||
} else if (cl == db::consistency_level::EACH_QUORUM && rs.get_type() == locator::replication_strategy_type::network_topology){
|
||||
h = std::make_unique<datacenter_sync_write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), pending_count, std::move(dead_endpoints));
|
||||
h = std::make_unique<datacenter_sync_write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints));
|
||||
} else {
|
||||
h = std::make_unique<write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), pending_count, std::move(dead_endpoints));
|
||||
h = std::make_unique<write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints));
|
||||
}
|
||||
return register_response_handler(std::move(h));
|
||||
}
|
||||
|
||||
storage_proxy::~storage_proxy() {}
|
||||
storage_proxy::storage_proxy(distributed<database>& db) : _db(db) {
|
||||
init_messaging_service();
|
||||
_collectd_registrations = std::make_unique<scollectd::registrations>(scollectd::registrations({
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("storage_proxy"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
@@ -2686,24 +2695,6 @@ future<> storage_proxy::truncate_blocking(sstring keyspace, sstring cfname) {
|
||||
|
||||
void storage_proxy::init_messaging_service() {
|
||||
auto& ms = net::get_local_messaging_service();
|
||||
ms.register_definitions_update([] (const rpc::client_info& cinfo, std::vector<frozen_mutation> m) {
|
||||
auto src = net::messaging_service::get_source(cinfo);
|
||||
do_with(std::move(m), get_local_shared_storage_proxy(), [src] (const std::vector<frozen_mutation>& mutations, shared_ptr<storage_proxy>& p) {
|
||||
return service::get_local_migration_manager().merge_schema_from(src, mutations);
|
||||
}).then_wrapped([src] (auto&& f) {
|
||||
if (f.failed()) {
|
||||
logger.error("Failed to update definitions from {}: {}", src, f.get_exception());
|
||||
} else {
|
||||
logger.debug("Applied definitions update from {}.", src);
|
||||
}
|
||||
});
|
||||
return net::messaging_service::no_wait();
|
||||
});
|
||||
ms.register_migration_request([] () {
|
||||
return db::schema_tables::convert_schema_to_mutations(get_storage_proxy()).finally([p = get_local_shared_storage_proxy()] {
|
||||
// keep local proxy alive
|
||||
});
|
||||
});
|
||||
ms.register_mutation([] (const rpc::client_info& cinfo, frozen_mutation in, std::vector<gms::inet_address> forward, gms::inet_address reply_to, unsigned shard, storage_proxy::response_id_type response_id) {
|
||||
return do_with(std::move(in), get_local_shared_storage_proxy(), [&cinfo, forward = std::move(forward), reply_to, shard, response_id] (const frozen_mutation& m, shared_ptr<storage_proxy>& p) {
|
||||
return when_all(
|
||||
@@ -2791,8 +2782,6 @@ void storage_proxy::init_messaging_service() {
|
||||
|
||||
void storage_proxy::uninit_messaging_service() {
|
||||
auto& ms = net::get_local_messaging_service();
|
||||
ms.unregister_definitions_update();
|
||||
ms.unregister_migration_request();
|
||||
ms.unregister_mutation();
|
||||
ms.unregister_mutation_done();
|
||||
ms.unregister_read_data();
|
||||
|
||||
@@ -121,7 +121,6 @@ private:
|
||||
std::uniform_real_distribution<> _read_repair_chance = std::uniform_real_distribution<>(0,1);
|
||||
std::unique_ptr<scollectd::registrations> _collectd_registrations;
|
||||
private:
|
||||
void init_messaging_service();
|
||||
void uninit_messaging_service();
|
||||
future<foreign_ptr<lw_shared_ptr<query::result>>> query_singular(lw_shared_ptr<query::read_command> cmd, std::vector<query::partition_range>&& partition_ranges, db::consistency_level cl);
|
||||
response_id_type register_response_handler(std::unique_ptr<abstract_write_response_handler>&& h);
|
||||
@@ -174,6 +173,8 @@ public:
|
||||
return _db;
|
||||
}
|
||||
|
||||
void init_messaging_service();
|
||||
|
||||
future<> mutate_locally(const mutation& m);
|
||||
future<> mutate_locally(const schema_ptr&, const frozen_mutation& m);
|
||||
future<> mutate_locally(std::vector<mutation> mutations);
|
||||
|
||||
@@ -54,7 +54,6 @@
|
||||
#include "locator/local_strategy.hh"
|
||||
#include "version.hh"
|
||||
#include "unimplemented.hh"
|
||||
#include "service/pending_range_calculator_service.hh"
|
||||
#include "streaming/stream_plan.hh"
|
||||
#include "streaming/stream_state.hh"
|
||||
#include "dht/range_streamer.hh"
|
||||
@@ -266,7 +265,7 @@ void storage_service::join_token_ring(int delay) {
|
||||
}
|
||||
set_mode(mode::JOINING, "schema complete, ready to bootstrap", true);
|
||||
set_mode(mode::JOINING, "waiting for pending range calculation", true);
|
||||
get_local_pending_range_calculator_service().block_until_finished().get();
|
||||
block_until_update_pending_ranges_finished().get();
|
||||
set_mode(mode::JOINING, "calculation complete, ready to bootstrap", true);
|
||||
logger.debug("... got ring + schema info");
|
||||
|
||||
@@ -293,7 +292,7 @@ void storage_service::join_token_ring(int delay) {
|
||||
set_mode(mode::JOINING, "waiting for schema information to complete", true);
|
||||
sleep(std::chrono::seconds(1)).get();
|
||||
}
|
||||
get_local_pending_range_calculator_service().block_until_finished().get();
|
||||
block_until_update_pending_ranges_finished().get();
|
||||
}
|
||||
logger.info("Checking bootstrapping/leaving/moving nodes: ok");
|
||||
|
||||
@@ -477,7 +476,7 @@ void storage_service::handle_state_bootstrap(inet_address endpoint) {
|
||||
}
|
||||
|
||||
_token_metadata.add_bootstrap_tokens(tokens, endpoint);
|
||||
get_local_pending_range_calculator_service().update().get();
|
||||
update_pending_ranges().get();
|
||||
|
||||
auto& gossiper = gms::get_local_gossiper();
|
||||
if (gossiper.uses_host_id(endpoint)) {
|
||||
@@ -575,7 +574,7 @@ void storage_service::handle_state_normal(inet_address endpoint) {
|
||||
// a race where natural endpoint was updated to contain node A, but A was
|
||||
// not yet removed from pending endpoints
|
||||
_token_metadata.update_normal_tokens(tokens_to_update_in_metadata, endpoint);
|
||||
get_local_pending_range_calculator_service().do_update();
|
||||
do_update_pending_ranges();
|
||||
|
||||
for (auto ep : endpoints_to_remove) {
|
||||
remove_endpoint(ep);
|
||||
@@ -622,7 +621,7 @@ void storage_service::handle_state_normal(inet_address endpoint) {
|
||||
}).get();
|
||||
}
|
||||
|
||||
get_local_pending_range_calculator_service().update().get();
|
||||
update_pending_ranges().get();
|
||||
if (logger.is_enabled(logging::log_level::debug)) {
|
||||
auto ver = _token_metadata.get_ring_version();
|
||||
for (auto& x : _token_metadata.get_token_to_endpoint()) {
|
||||
@@ -657,7 +656,7 @@ void storage_service::handle_state_leaving(inet_address endpoint) {
|
||||
// at this point the endpoint is certainly a member with this token, so let's proceed
|
||||
// normally
|
||||
_token_metadata.add_leaving_endpoint(endpoint);
|
||||
get_local_pending_range_calculator_service().update().get();
|
||||
update_pending_ranges().get();
|
||||
}
|
||||
|
||||
void storage_service::handle_state_left(inet_address endpoint, std::vector<sstring> pieces) {
|
||||
@@ -680,7 +679,7 @@ void storage_service::handle_state_moving(inet_address endpoint, std::vector<sst
|
||||
auto token = dht::global_partitioner().from_sstring(pieces[1]);
|
||||
logger.debug("Node {} state moving, new token {}", endpoint, token);
|
||||
_token_metadata.add_moving_endpoint(token, endpoint);
|
||||
get_local_pending_range_calculator_service().update().get();
|
||||
update_pending_ranges().get();
|
||||
}
|
||||
|
||||
void storage_service::handle_state_removing(inet_address endpoint, std::vector<sstring> pieces) {
|
||||
@@ -710,7 +709,7 @@ void storage_service::handle_state_removing(inet_address endpoint, std::vector<s
|
||||
logger.debug("Tokens {} removed manually (endpoint was {})", remove_tokens, endpoint);
|
||||
// Note that the endpoint is being removed
|
||||
_token_metadata.add_leaving_endpoint(endpoint);
|
||||
get_local_pending_range_calculator_service().update().get();
|
||||
update_pending_ranges().get();
|
||||
// find the endpoint coordinating this removal that we need to notify when we're done
|
||||
auto state = gossiper.get_endpoint_state_for_endpoint(endpoint);
|
||||
if (!state) {
|
||||
@@ -831,7 +830,7 @@ void storage_service::on_change(inet_address endpoint, application_state state,
|
||||
void storage_service::on_remove(gms::inet_address endpoint) {
|
||||
logger.debug("endpoint={} on_remove", endpoint);
|
||||
_token_metadata.remove_endpoint(endpoint);
|
||||
get_local_pending_range_calculator_service().update().get();
|
||||
update_pending_ranges().get();
|
||||
}
|
||||
|
||||
void storage_service::on_dead(gms::inet_address endpoint, gms::endpoint_state state) {
|
||||
@@ -980,9 +979,7 @@ future<> storage_service::drain_on_shutdown() {
|
||||
ss.shutdown_client_servers().get();
|
||||
logger.info("Drain on shutdown: shutdown rpc and cql server done");
|
||||
|
||||
net::get_messaging_service().invoke_on_all([] (auto& ms) {
|
||||
return ms.stop();
|
||||
}).get();
|
||||
ss.do_stop_ms().get();
|
||||
logger.info("Drain on shutdown: shutdown messaging_service done");
|
||||
|
||||
auth::auth::shutdown().get();
|
||||
@@ -995,6 +992,14 @@ future<> storage_service::drain_on_shutdown() {
|
||||
return db.commitlog()->shutdown();
|
||||
}).get();
|
||||
logger.info("Drain on shutdown: shutdown commitlog done");
|
||||
|
||||
// NOTE: We currently don't destory migration_manager nor
|
||||
// storage_service in scylla, so when we reach here
|
||||
// migration_manager should to be still alive. Be careful, when
|
||||
// scylla starts to destroy migration_manager in the shutdown
|
||||
// process.
|
||||
service::get_local_migration_manager().unregister_listener(&ss);
|
||||
|
||||
logger.info("Drain on shutdown: done");
|
||||
});
|
||||
});
|
||||
@@ -1066,6 +1071,10 @@ future<> storage_service::init_server(int delay) {
|
||||
logger.info("CQL supported versions: {} (default: {})", StringUtils.join(ClientState.getCQLSupportedVersion(), ","), ClientState.DEFAULT_CQL_VERSION);
|
||||
#endif
|
||||
_initialized = true;
|
||||
|
||||
// Register storage_service to migration_manager so we can update
|
||||
// pending ranges when keyspace is chagned
|
||||
service::get_local_migration_manager().register_listener(this);
|
||||
#if 0
|
||||
try
|
||||
{
|
||||
@@ -1470,6 +1479,18 @@ future<> storage_service::stop_gossiping() {
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_service::do_stop_ms() {
|
||||
if (_ms_stopped) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
_ms_stopped = true;
|
||||
return net::get_messaging_service().invoke_on_all([] (auto& ms) {
|
||||
return ms.stop();
|
||||
}).then([] {
|
||||
logger.info("messaging_service stopped");
|
||||
});
|
||||
}
|
||||
|
||||
future<> check_snapshot_not_exist(database& db, sstring ks_name, sstring name) {
|
||||
auto& ks = db.find_keyspace(ks_name);
|
||||
return parallel_for_each(ks.metadata()->cf_meta_data(), [&db, ks_name = std::move(ks_name), name = std::move(name)] (auto& pair) {
|
||||
@@ -1763,7 +1784,7 @@ future<> storage_service::decommission() {
|
||||
throw std::runtime_error(sprint("Node in %s state; wait for status to become normal or restart", ss._operation_mode));
|
||||
}
|
||||
|
||||
get_local_pending_range_calculator_service().block_until_finished().get();
|
||||
ss.update_pending_ranges().get();
|
||||
|
||||
auto non_system_keyspaces = db.get_non_system_keyspaces();
|
||||
for (const auto& keyspace_name : non_system_keyspaces) {
|
||||
@@ -1788,9 +1809,7 @@ future<> storage_service::decommission() {
|
||||
logger.debug("DECOMMISSIONING: shutdown rpc and cql server done");
|
||||
gms::get_local_gossiper().stop_gossiping().get();
|
||||
logger.debug("DECOMMISSIONING: stop_gossiping done");
|
||||
net::get_messaging_service().invoke_on_all([] (auto& ms) {
|
||||
return ms.stop();
|
||||
}).get();
|
||||
ss.do_stop_ms().get();
|
||||
// StageManager.shutdownNow();
|
||||
db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::DECOMMISSIONED).get();
|
||||
logger.debug("DECOMMISSIONING: set_bootstrap_state done");
|
||||
@@ -1862,7 +1881,7 @@ future<> storage_service::remove_node(sstring host_id_string) {
|
||||
}
|
||||
ss._removing_node = endpoint;
|
||||
tm.add_leaving_endpoint(endpoint);
|
||||
get_local_pending_range_calculator_service().update().get();
|
||||
ss.update_pending_ranges().get();
|
||||
|
||||
// the gossiper will handle spoofing this node's state to REMOVING_TOKEN for us
|
||||
// we add our own token so other nodes to let us know when they're done
|
||||
@@ -1942,10 +1961,8 @@ future<> storage_service::drain() {
|
||||
ss.shutdown_client_servers().get();
|
||||
gms::get_local_gossiper().stop_gossiping().get();
|
||||
|
||||
ss.set_mode(mode::DRAINING, "shutting down MessageService", false);
|
||||
net::get_messaging_service().invoke_on_all([] (auto& ms) {
|
||||
return ms.stop();
|
||||
}).get();
|
||||
ss.set_mode(mode::DRAINING, "shutting down messaging_service", false);
|
||||
ss.do_stop_ms().get();
|
||||
|
||||
#if 0
|
||||
StorageProxy.instance.verifyNoHintsInProgress();
|
||||
@@ -2236,7 +2253,7 @@ void storage_service::excise(std::unordered_set<token> tokens, inet_address endp
|
||||
}
|
||||
}).get();
|
||||
|
||||
get_local_pending_range_calculator_service().update().get();
|
||||
update_pending_ranges().get();
|
||||
}
|
||||
|
||||
void storage_service::excise(std::unordered_set<token> tokens, inet_address endpoint, int64_t expire_time) {
|
||||
@@ -2285,7 +2302,7 @@ future<> storage_service::confirm_replication(inet_address node) {
|
||||
void storage_service::leave_ring() {
|
||||
db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::NEEDS_BOOTSTRAP).get();
|
||||
_token_metadata.remove_endpoint(get_broadcast_address());
|
||||
get_local_pending_range_calculator_service().update().get();
|
||||
update_pending_ranges().get();
|
||||
|
||||
auto& gossiper = gms::get_local_gossiper();
|
||||
auto expire_time = gossiper.compute_expire_time().time_since_epoch().count();
|
||||
@@ -2384,7 +2401,7 @@ future<> storage_service::start_leaving() {
|
||||
auto& gossiper = gms::get_local_gossiper();
|
||||
return gossiper.add_local_application_state(application_state::STATUS, value_factory.leaving(get_local_tokens())).then([this] {
|
||||
_token_metadata.add_leaving_endpoint(get_broadcast_address());
|
||||
return get_local_pending_range_calculator_service().update();
|
||||
return update_pending_ranges();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2741,7 +2758,7 @@ future<> storage_service::move(token new_token) {
|
||||
|
||||
auto keyspaces_to_process = ss._db.local().get_non_system_keyspaces();
|
||||
|
||||
get_local_pending_range_calculator_service().block_until_finished().get();
|
||||
ss.block_until_update_pending_ranges_finished().get();
|
||||
|
||||
// checking if data is moving to this node
|
||||
for (auto keyspace_name : keyspaces_to_process) {
|
||||
@@ -2786,5 +2803,46 @@ std::chrono::milliseconds storage_service::get_ring_delay() {
|
||||
return std::chrono::milliseconds(ring_delay);
|
||||
}
|
||||
|
||||
void storage_service::do_update_pending_ranges() {
|
||||
if (engine().cpu_id() != 0) {
|
||||
throw std::runtime_error("do_update_pending_ranges should be called on cpu zero");
|
||||
}
|
||||
// long start = System.currentTimeMillis();
|
||||
auto keyspaces = _db.local().get_non_system_keyspaces();
|
||||
for (auto& keyspace_name : keyspaces) {
|
||||
auto& ks = _db.local().find_keyspace(keyspace_name);
|
||||
auto& strategy = ks.get_replication_strategy();
|
||||
get_local_storage_service().get_token_metadata().calculate_pending_ranges(strategy, keyspace_name);
|
||||
}
|
||||
// logger.debug("finished calculation for {} keyspaces in {}ms", keyspaces.size(), System.currentTimeMillis() - start);
|
||||
}
|
||||
|
||||
future<> storage_service::update_pending_ranges() {
|
||||
return get_storage_service().invoke_on(0, [] (auto& ss){
|
||||
ss._update_jobs++;
|
||||
ss.do_update_pending_ranges();
|
||||
// calculate_pending_ranges will modify token_metadata, we need to repliate to other cores
|
||||
return ss.replicate_to_all_cores().finally([&ss, ss0 = ss.shared_from_this()] {
|
||||
ss._update_jobs--;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_service::block_until_update_pending_ranges_finished() {
|
||||
// We want to be sure the job we're blocking for is actually finished and we can't trust the TPE's active job count
|
||||
return smp::submit_to(0, [] {
|
||||
return do_until(
|
||||
[] { return !(get_local_storage_service()._update_jobs > 0); },
|
||||
[] { return sleep(std::chrono::milliseconds(100)); });
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_service::keyspace_changed(const sstring& ks_name) {
|
||||
// Update pending ranges since keyspace can be changed after we calculate pending ranges.
|
||||
return update_pending_ranges().handle_exception([ks_name] (auto ep) {
|
||||
logger.warn("Failed to update pending ranges for ks = {}: {}", ks_name, ep);
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace service
|
||||
|
||||
|
||||
@@ -84,7 +84,7 @@ int get_generation_number();
|
||||
* This class will also maintain histograms of the load information
|
||||
* of other nodes in the cluster.
|
||||
*/
|
||||
class storage_service : public gms::i_endpoint_state_change_subscriber, public seastar::async_sharded_service<storage_service> {
|
||||
class storage_service : public service::migration_listener, public gms::i_endpoint_state_change_subscriber, public seastar::async_sharded_service<storage_service> {
|
||||
public:
|
||||
struct snapshot_details {
|
||||
int64_t live;
|
||||
@@ -108,6 +108,7 @@ private:
|
||||
private final AtomicLong notificationSerialNumber = new AtomicLong();
|
||||
#endif
|
||||
distributed<database>& _db;
|
||||
int _update_jobs{0};
|
||||
// Note that this is obviously only valid for the current shard. Users of
|
||||
// this facility should elect a shard to be the coordinator based on any
|
||||
// given objective criteria
|
||||
@@ -119,6 +120,7 @@ private:
|
||||
shared_ptr<distributed<transport::cql_server>> _cql_server;
|
||||
shared_ptr<distributed<thrift_server>> _thrift_server;
|
||||
sstring _operation_in_progress;
|
||||
bool _ms_stopped = false;
|
||||
public:
|
||||
storage_service(distributed<database>& db)
|
||||
: _db(db) {
|
||||
@@ -127,6 +129,11 @@ public:
|
||||
// Needed by distributed<>
|
||||
future<> stop();
|
||||
|
||||
future<> keyspace_changed(const sstring& ks_name);
|
||||
void do_update_pending_ranges();
|
||||
future<> update_pending_ranges();
|
||||
future<> block_until_update_pending_ranges_finished();
|
||||
|
||||
const locator::token_metadata& get_token_metadata() const {
|
||||
return _token_metadata;
|
||||
}
|
||||
@@ -285,6 +292,7 @@ public:
|
||||
private:
|
||||
future<> do_stop_rpc_server();
|
||||
future<> do_stop_native_transport();
|
||||
future<> do_stop_ms();
|
||||
#if 0
|
||||
public void stopTransports()
|
||||
{
|
||||
@@ -696,6 +704,26 @@ public:
|
||||
virtual void on_dead(gms::inet_address endpoint, gms::endpoint_state state) override;
|
||||
virtual void on_remove(gms::inet_address endpoint) override;
|
||||
virtual void on_restart(gms::inet_address endpoint, gms::endpoint_state state) override;
|
||||
|
||||
public:
|
||||
// For migration_listener
|
||||
virtual void on_create_keyspace(const sstring& ks_name) override { keyspace_changed(ks_name).get(); }
|
||||
virtual void on_create_column_family(const sstring& ks_name, const sstring& cf_name) override {}
|
||||
virtual void on_create_user_type(const sstring& ks_name, const sstring& type_name) override {}
|
||||
virtual void on_create_function(const sstring& ks_name, const sstring& function_name) override {}
|
||||
virtual void on_create_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
|
||||
|
||||
virtual void on_update_keyspace(const sstring& ks_name) override { keyspace_changed(ks_name).get(); }
|
||||
virtual void on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool) override {}
|
||||
virtual void on_update_user_type(const sstring& ks_name, const sstring& type_name) override {}
|
||||
virtual void on_update_function(const sstring& ks_name, const sstring& function_name) override {}
|
||||
virtual void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
|
||||
|
||||
virtual void on_drop_keyspace(const sstring& ks_name) override { keyspace_changed(ks_name).get(); }
|
||||
virtual void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {}
|
||||
virtual void on_drop_user_type(const sstring& ks_name, const sstring& type_name) override {}
|
||||
virtual void on_drop_function(const sstring& ks_name, const sstring& function_name) override {}
|
||||
virtual void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
|
||||
private:
|
||||
void update_peer_info(inet_address endpoint);
|
||||
void do_update_system_peers_table(gms::inet_address endpoint, const application_state& state, const versioned_value& value);
|
||||
|
||||
@@ -495,52 +495,59 @@ class mutation_reader::impl {
|
||||
private:
|
||||
mp_row_consumer _consumer;
|
||||
std::experimental::optional<data_consume_context> _context;
|
||||
std::experimental::optional<future<data_consume_context>> _context_future;
|
||||
std::function<future<data_consume_context> ()> _get_context;
|
||||
public:
|
||||
impl(sstable& sst, schema_ptr schema, uint64_t start, uint64_t end,
|
||||
const io_priority_class &pc)
|
||||
: _consumer(schema, pc)
|
||||
, _context(sst.data_consume_rows(_consumer, start, end)) { }
|
||||
, _get_context([&sst, this, start, end] {
|
||||
return make_ready_future<data_consume_context>(sst.data_consume_rows(_consumer, start, end));
|
||||
}) { }
|
||||
impl(sstable& sst, schema_ptr schema,
|
||||
const io_priority_class &pc)
|
||||
: _consumer(schema, pc)
|
||||
, _context(sst.data_consume_rows(_consumer)) { }
|
||||
impl(sstable& sst, schema_ptr schema, future<uint64_t> start, future<uint64_t> end, const io_priority_class& pc)
|
||||
, _get_context([this, &sst] {
|
||||
return make_ready_future<data_consume_context>(sst.data_consume_rows(_consumer));
|
||||
}) { }
|
||||
impl(sstable& sst, schema_ptr schema, std::function<future<uint64_t>()> start, std::function<future<uint64_t>()> end, const io_priority_class& pc)
|
||||
: _consumer(schema, pc)
|
||||
, _context_future(start.then([this, &sst, end = std::move(end)] (uint64_t start) mutable {
|
||||
return end.then([this, &sst, start] (uint64_t end) mutable {
|
||||
return sst.data_consume_rows(_consumer, start, end);
|
||||
});
|
||||
})) { }
|
||||
impl() : _consumer() { }
|
||||
, _get_context([this, &sst, start = std::move(start), end = std::move(end)] () {
|
||||
return start().then([this, &sst, end = std::move(end)] (uint64_t start) {
|
||||
return end().then([this, &sst, start] (uint64_t end) {
|
||||
return make_ready_future<data_consume_context>(sst.data_consume_rows(_consumer, start, end));
|
||||
});
|
||||
});
|
||||
}) { }
|
||||
impl() : _consumer(), _get_context() { }
|
||||
|
||||
// Reference to _consumer is passed to data_consume_rows() in the constructor so we must not allow move/copy
|
||||
impl(impl&&) = delete;
|
||||
impl(const impl&) = delete;
|
||||
|
||||
future<mutation_opt> read() {
|
||||
if (_context) {
|
||||
return _context->read().then([this] {
|
||||
// We want after returning a mutation that _consumer.mut()
|
||||
// will be left in unengaged state (so on EOF we return an
|
||||
// unengaged optional). Moving _consumer.mut is *not* enough.
|
||||
auto ret = std::move(_consumer.mut);
|
||||
_consumer.mut = {};
|
||||
return std::move(ret);
|
||||
});
|
||||
} else if (_context_future) {
|
||||
return _context_future->then([this] (auto context) {
|
||||
_context = std::move(context);
|
||||
return _context->read().then([this] {
|
||||
auto ret = std::move(_consumer.mut);
|
||||
_consumer.mut = {};
|
||||
return std::move(ret);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (!_get_context) {
|
||||
// empty mutation reader returns EOF immediately
|
||||
return make_ready_future<mutation_opt>();
|
||||
}
|
||||
|
||||
if (_context) {
|
||||
return do_read();
|
||||
}
|
||||
return (_get_context)().then([this] (data_consume_context context) {
|
||||
_context = std::move(context);
|
||||
return do_read();
|
||||
});
|
||||
}
|
||||
private:
|
||||
future<mutation_opt> do_read() {
|
||||
return _context->read().then([this] {
|
||||
// We want after returning a mutation that _consumer.mut()
|
||||
// will be left in unengaged state (so on EOF we return an
|
||||
// unengaged optional). Moving _consumer.mut is *not* enough.
|
||||
auto ret = std::move(_consumer.mut);
|
||||
_consumer.mut = {};
|
||||
return std::move(ret);
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
@@ -649,17 +656,19 @@ sstable::read_range_rows(schema_ptr schema, const query::partition_range& range,
|
||||
fail(unimplemented::cause::WRAP_AROUND);
|
||||
}
|
||||
|
||||
future<uint64_t> start = range.start()
|
||||
? (range.start()->is_inclusive()
|
||||
auto start = [this, range, schema, &pc] {
|
||||
return range.start() ? (range.start()->is_inclusive()
|
||||
? lower_bound(schema, range.start()->value(), pc)
|
||||
: upper_bound(schema, range.start()->value(), pc))
|
||||
: make_ready_future<uint64_t>(0);
|
||||
};
|
||||
|
||||
future<uint64_t> end = range.end()
|
||||
? (range.end()->is_inclusive()
|
||||
auto end = [this, range, schema, &pc] {
|
||||
return range.end() ? (range.end()->is_inclusive()
|
||||
? upper_bound(schema, range.end()->value(), pc)
|
||||
: lower_bound(schema, range.end()->value(), pc))
|
||||
: make_ready_future<uint64_t>(data_size());
|
||||
};
|
||||
|
||||
return std::make_unique<mutation_reader::impl>(
|
||||
*this, std::move(schema), std::move(start), std::move(end), pc);
|
||||
|
||||
@@ -52,7 +52,14 @@ namespace sstables {
|
||||
|
||||
logging::logger sstlog("sstable");
|
||||
|
||||
thread_local std::unordered_map<sstring, unsigned> sstable::_shards_agreeing_to_remove_sstable;
|
||||
future<file> new_sstable_component_file(sstring name, open_flags flags) {
|
||||
return open_file_dma(name, flags).handle_exception([name] (auto ep) {
|
||||
sstlog.error("Could not create SSTable component {}. Found exception: {}", name, ep);
|
||||
return make_exception_future<file>(ep);
|
||||
});
|
||||
}
|
||||
|
||||
thread_local std::unordered_map<sstring, std::unordered_set<unsigned>> sstable::_shards_agreeing_to_remove_sstable;
|
||||
|
||||
static utils::phased_barrier& background_jobs() {
|
||||
static thread_local utils::phased_barrier gate;
|
||||
@@ -749,7 +756,19 @@ void sstable::write_toc(const io_priority_class& pc) {
|
||||
sstlog.debug("Writing TOC file {} ", file_path);
|
||||
|
||||
// Writing TOC content to temporary file.
|
||||
file f = open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).get0();
|
||||
// If creation of temporary TOC failed, it implies that that boot failed to
|
||||
// delete a sstable with temporary for this column family, or there is a
|
||||
// sstable being created in parallel with the same generation.
|
||||
file f = new_sstable_component_file(file_path, open_flags::wo | open_flags::create | open_flags::exclusive).get0();
|
||||
|
||||
bool toc_exists = file_exists(filename(sstable::component_type::TOC)).get0();
|
||||
if (toc_exists) {
|
||||
// TOC will exist at this point if write_components() was called with
|
||||
// the generation of a sstable that exists.
|
||||
f.close().get();
|
||||
remove_file(file_path).get();
|
||||
throw std::runtime_error(sprint("SSTable write failed due to existence of TOC file for generation %ld of %s.%s", _generation, _ks, _cf));
|
||||
}
|
||||
|
||||
file_output_stream_options options;
|
||||
options.buffer_size = 4096;
|
||||
@@ -792,7 +811,7 @@ void write_crc(const sstring file_path, checksum& c) {
|
||||
sstlog.debug("Writing CRC file {} ", file_path);
|
||||
|
||||
auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive;
|
||||
file f = open_file_dma(file_path, oflags).get0();
|
||||
file f = new_sstable_component_file(file_path, oflags).get0();
|
||||
|
||||
file_output_stream_options options;
|
||||
options.buffer_size = 4096;
|
||||
@@ -806,7 +825,7 @@ void write_digest(const sstring file_path, uint32_t full_checksum) {
|
||||
sstlog.debug("Writing Digest file {} ", file_path);
|
||||
|
||||
auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive;
|
||||
auto f = open_file_dma(file_path, oflags).get0();
|
||||
auto f = new_sstable_component_file(file_path, oflags).get0();
|
||||
|
||||
file_output_stream_options options;
|
||||
options.buffer_size = 4096;
|
||||
@@ -877,7 +896,7 @@ template <sstable::component_type Type, typename T>
|
||||
void sstable::write_simple(T& component, const io_priority_class& pc) {
|
||||
auto file_path = filename(Type);
|
||||
sstlog.debug(("Writing " + _component_map[Type] + " file {} ").c_str(), file_path);
|
||||
file f = open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).get0();
|
||||
file f = new_sstable_component_file(file_path, open_flags::wo | open_flags::create | open_flags::exclusive).get0();
|
||||
|
||||
file_output_stream_options options;
|
||||
options.buffer_size = sstable_buffer_size;
|
||||
@@ -946,8 +965,8 @@ future<> sstable::open_data() {
|
||||
|
||||
future<> sstable::create_data() {
|
||||
auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive;
|
||||
return when_all(open_file_dma(filename(component_type::Index), oflags),
|
||||
open_file_dma(filename(component_type::Data), oflags)).then([this] (auto files) {
|
||||
return when_all(new_sstable_component_file(filename(component_type::Index), oflags),
|
||||
new_sstable_component_file(filename(component_type::Data), oflags)).then([this] (auto files) {
|
||||
// FIXME: If both files could not be created, the first get below will
|
||||
// throw an exception, and second get() will not be attempted, and
|
||||
// we'll get a warning about the second future being destructed
|
||||
@@ -1723,9 +1742,11 @@ sstable::shared_remove_by_toc_name(sstring toc_name, bool shared) {
|
||||
return remove_by_toc_name(toc_name);
|
||||
} else {
|
||||
auto shard = std::hash<sstring>()(toc_name) % smp::count;
|
||||
return smp::submit_to(shard, [toc_name] {
|
||||
auto& counter = _shards_agreeing_to_remove_sstable[toc_name];
|
||||
if (++counter == smp::count) {
|
||||
return smp::submit_to(shard, [toc_name, src_shard = engine().cpu_id()] {
|
||||
auto& remove_set = _shards_agreeing_to_remove_sstable[toc_name];
|
||||
remove_set.insert(src_shard);
|
||||
auto counter = remove_set.size();
|
||||
if (counter == smp::count) {
|
||||
_shards_agreeing_to_remove_sstable.erase(toc_name);
|
||||
return remove_by_toc_name(toc_name);
|
||||
} else {
|
||||
|
||||
@@ -343,7 +343,7 @@ private:
|
||||
static std::unordered_map<version_types, sstring, enum_hash<version_types>> _version_string;
|
||||
static std::unordered_map<format_types, sstring, enum_hash<format_types>> _format_string;
|
||||
static std::unordered_map<component_type, sstring, enum_hash<component_type>> _component_map;
|
||||
static thread_local std::unordered_map<sstring, unsigned> _shards_agreeing_to_remove_sstable;
|
||||
static thread_local std::unordered_map<sstring, std::unordered_set<unsigned>> _shards_agreeing_to_remove_sstable;
|
||||
|
||||
std::unordered_set<component_type, enum_hash<component_type>> _components;
|
||||
|
||||
|
||||
@@ -41,7 +41,6 @@
|
||||
#include "gms/failure_detector.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "service/pending_range_calculator_service.hh"
|
||||
#include "auth/auth.hh"
|
||||
|
||||
// TODO : remove once shutdown is ok.
|
||||
@@ -49,12 +48,8 @@
|
||||
// Simpler to copy the code from init.cc than trying to do clever parameterization
|
||||
// and whatnot.
|
||||
static future<> tst_init_storage_service(distributed<database>& db) {
|
||||
return service::get_pending_range_calculator_service().start(std::ref(db)).then([] {
|
||||
engine().at_exit([] { return service::get_pending_range_calculator_service().stop(); });
|
||||
}).then([&db] {
|
||||
return service::init_storage_service(db).then([] {
|
||||
engine().at_exit([] { return service::deinit_storage_service(); });
|
||||
});
|
||||
return service::init_storage_service(db).then([] {
|
||||
engine().at_exit([] { return service::deinit_storage_service(); });
|
||||
});
|
||||
}
|
||||
|
||||
@@ -343,7 +338,6 @@ public:
|
||||
_db->stop().get();
|
||||
|
||||
service::get_storage_service().stop().get();
|
||||
service::get_pending_range_calculator_service().stop().get();
|
||||
|
||||
locator::i_endpoint_snitch::stop_snitch().get();
|
||||
|
||||
|
||||
@@ -29,7 +29,6 @@
|
||||
#include "gms/failure_detector.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "core/reactor.hh"
|
||||
#include "service/pending_range_calculator_service.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "core/distributed.hh"
|
||||
#include "database.hh"
|
||||
@@ -39,7 +38,6 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
|
||||
distributed<database> db;
|
||||
utils::fb_utilities::set_broadcast_address(gms::inet_address("127.0.0.1"));
|
||||
locator::i_endpoint_snitch::create_snitch("SimpleSnitch").get();
|
||||
service::get_pending_range_calculator_service().start(std::ref(db));
|
||||
service::get_storage_service().start(std::ref(db)).get();
|
||||
db.start().get();
|
||||
net::get_messaging_service().start(gms::inet_address("127.0.0.1")).get();
|
||||
@@ -51,7 +49,6 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
|
||||
net::get_messaging_service().stop().get();
|
||||
db.stop().get();
|
||||
service::get_storage_service().stop().get();
|
||||
service::get_pending_range_calculator_service().stop().get();
|
||||
locator::i_endpoint_snitch::stop_snitch().get();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -68,6 +68,7 @@ future<>
|
||||
with_column_family(schema_ptr s, column_family::config cfg, Func func) {
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm);
|
||||
cf->mark_ready_for_writes();
|
||||
return func(*cf).then([cf, cm] {
|
||||
return cf->stop();
|
||||
}).finally([cf, cm] {});
|
||||
@@ -404,9 +405,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) {
|
||||
cfg.enable_disk_writes = false;
|
||||
cfg.enable_incremental_backups = false;
|
||||
cfg.cf_stats = &*cf_stats;
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
return do_with(make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm), [s, cm] (auto& cf_ptr) mutable {
|
||||
column_family& cf = *cf_ptr;
|
||||
return with_column_family(s, cfg, [s] (auto& cf) mutable {
|
||||
std::map<int32_t, std::map<int32_t, int32_t>> shadow, result;
|
||||
|
||||
const column_definition& r1_col = *s->get_column_definition("r1");
|
||||
|
||||
@@ -987,6 +987,7 @@ SEASTAR_TEST_CASE(compaction_manager_test) {
|
||||
cfg.enable_incremental_backups = false;
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm);
|
||||
cf->start();
|
||||
cf->mark_ready_for_writes();
|
||||
cf->set_compaction_strategy(sstables::compaction_strategy_type::size_tiered);
|
||||
|
||||
auto generations = make_lw_shared<std::vector<unsigned long>>({1, 2, 3, 4});
|
||||
@@ -1063,6 +1064,7 @@ SEASTAR_TEST_CASE(compact) {
|
||||
auto s = builder.build();
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
auto cf = make_lw_shared<column_family>(s, column_family::config(), column_family::no_commitlog(), *cm);
|
||||
cf->mark_ready_for_writes();
|
||||
|
||||
return open_sstables("tests/sstables/compaction", {1,2,3}).then([s = std::move(s), cf, cm, generation] (auto sstables) {
|
||||
return test_setup::do_with_test_directory([sstables, s, generation, cf, cm] {
|
||||
@@ -1161,6 +1163,7 @@ static future<std::vector<unsigned long>> compact_sstables(std::vector<unsigned
|
||||
{{"p1", utf8_type}}, {{"c1", utf8_type}}, {{"r1", utf8_type}}, {}, utf8_type));
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
auto cf = make_lw_shared<column_family>(s, column_family::config(), column_family::no_commitlog(), *cm);
|
||||
cf->mark_ready_for_writes();
|
||||
|
||||
auto generations = make_lw_shared<std::vector<unsigned long>>(std::move(generations_to_compact));
|
||||
auto sstables = make_lw_shared<std::vector<sstables::shared_sstable>>();
|
||||
@@ -1670,6 +1673,7 @@ SEASTAR_TEST_CASE(leveled_01) {
|
||||
cfg.enable_disk_writes = false;
|
||||
cfg.enable_commitlog = false;
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), cm);
|
||||
cf->mark_ready_for_writes();
|
||||
|
||||
auto key_and_token_pair = token_generation_for_current_shard(50);
|
||||
auto min_key = key_and_token_pair[0].first;
|
||||
@@ -1714,6 +1718,7 @@ SEASTAR_TEST_CASE(leveled_02) {
|
||||
cfg.enable_disk_writes = false;
|
||||
cfg.enable_commitlog = false;
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), cm);
|
||||
cf->mark_ready_for_writes();
|
||||
|
||||
auto key_and_token_pair = token_generation_for_current_shard(50);
|
||||
auto min_key = key_and_token_pair[0].first;
|
||||
@@ -1768,6 +1773,7 @@ SEASTAR_TEST_CASE(leveled_03) {
|
||||
cfg.enable_disk_writes = false;
|
||||
cfg.enable_commitlog = false;
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), cm);
|
||||
cf->mark_ready_for_writes();
|
||||
|
||||
auto key_and_token_pair = token_generation_for_current_shard(50);
|
||||
auto min_key = key_and_token_pair[0].first;
|
||||
@@ -1826,6 +1832,7 @@ SEASTAR_TEST_CASE(leveled_04) {
|
||||
cfg.enable_disk_writes = false;
|
||||
cfg.enable_commitlog = false;
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), cm);
|
||||
cf->mark_ready_for_writes();
|
||||
|
||||
auto key_and_token_pair = token_generation_for_current_shard(50);
|
||||
auto min_key = key_and_token_pair[0].first;
|
||||
@@ -2159,6 +2166,7 @@ SEASTAR_TEST_CASE(tombstone_purge_test) {
|
||||
}).then([s, tmp, sstables] {
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
auto cf = make_lw_shared<column_family>(s, column_family::config(), column_family::no_commitlog(), *cm);
|
||||
cf->mark_ready_for_writes();
|
||||
auto create = [tmp] {
|
||||
return make_lw_shared<sstable>("ks", "cf", tmp->path, 3, la, big);
|
||||
};
|
||||
@@ -2259,6 +2267,7 @@ SEASTAR_TEST_CASE(sstable_rewrite) {
|
||||
};
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
auto cf = make_lw_shared<column_family>(s, column_family::config(), column_family::no_commitlog(), *cm);
|
||||
cf->mark_ready_for_writes();
|
||||
std::vector<shared_sstable> sstables;
|
||||
sstables.push_back(std::move(sstp));
|
||||
|
||||
|
||||
@@ -36,6 +36,7 @@
|
||||
#include "database.hh"
|
||||
#include <memory>
|
||||
#include "sstable_test.hh"
|
||||
#include "tmpdir.hh"
|
||||
|
||||
using namespace sstables;
|
||||
|
||||
@@ -169,10 +170,11 @@ SEASTAR_TEST_CASE(big_summary_query_32) {
|
||||
return summary_query<32, 0xc4000, 182>("tests/sstables/bigsummary", 76);
|
||||
}
|
||||
|
||||
static future<sstable_ptr> do_write_sst(sstring dir, unsigned long generation) {
|
||||
auto sst = make_lw_shared<sstable>("ks", "cf", dir, generation, la, big);
|
||||
return sst->load().then([sst, generation] {
|
||||
static future<sstable_ptr> do_write_sst(sstring load_dir, sstring write_dir, unsigned long generation) {
|
||||
auto sst = make_lw_shared<sstable>("ks", "cf", load_dir, generation, la, big);
|
||||
return sst->load().then([sst, write_dir, generation] {
|
||||
sstables::test(sst).change_generation_number(generation + 1);
|
||||
sstables::test(sst).change_dir(write_dir);
|
||||
auto fut = sstables::test(sst).store();
|
||||
return std::move(fut).then([sst = std::move(sst)] {
|
||||
return make_ready_future<sstable_ptr>(std::move(sst));
|
||||
@@ -180,8 +182,8 @@ static future<sstable_ptr> do_write_sst(sstring dir, unsigned long generation) {
|
||||
});
|
||||
}
|
||||
|
||||
static future<> write_sst_info(sstring dir, unsigned long generation) {
|
||||
return do_write_sst(dir, generation).then([] (auto ptr) { return make_ready_future<>(); });
|
||||
static future<> write_sst_info(sstring load_dir, sstring write_dir, unsigned long generation) {
|
||||
return do_write_sst(load_dir, write_dir, generation).then([] (auto ptr) { return make_ready_future<>(); });
|
||||
}
|
||||
|
||||
using bufptr_t = std::unique_ptr<char [], free_deleter>;
|
||||
@@ -223,11 +225,12 @@ static future<> compare_files(sstdesc file1, sstdesc file2, sstable::component_t
|
||||
}
|
||||
|
||||
static future<> check_component_integrity(sstable::component_type component) {
|
||||
return write_sst_info("tests/sstables/compressed", 1).then([component] {
|
||||
auto tmp = make_lw_shared<tmpdir>();
|
||||
return write_sst_info("tests/sstables/compressed", tmp->path, 1).then([component, tmp] {
|
||||
return compare_files(sstdesc{"tests/sstables/compressed", 1 },
|
||||
sstdesc{"tests/sstables/compressed", 2 },
|
||||
sstdesc{tmp->path, 2 },
|
||||
component);
|
||||
});
|
||||
}).then([tmp] {});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(check_compressed_info_func) {
|
||||
@@ -235,8 +238,9 @@ SEASTAR_TEST_CASE(check_compressed_info_func) {
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(check_summary_func) {
|
||||
return do_write_sst("tests/sstables/compressed", 1).then([] (auto sst1) {
|
||||
auto sst2 = make_lw_shared<sstable>("ks", "cf", "tests/sstables/compressed", 2, la, big);
|
||||
auto tmp = make_lw_shared<tmpdir>();
|
||||
return do_write_sst("tests/sstables/compressed", tmp->path, 1).then([tmp] (auto sst1) {
|
||||
auto sst2 = make_lw_shared<sstable>("ks", "cf", tmp->path, 2, la, big);
|
||||
return sstables::test(sst2).read_summary().then([sst1, sst2] {
|
||||
summary& sst1_s = sstables::test(sst1).get_summary();
|
||||
summary& sst2_s = sstables::test(sst2).get_summary();
|
||||
@@ -247,7 +251,7 @@ SEASTAR_TEST_CASE(check_summary_func) {
|
||||
BOOST_REQUIRE(sst1_s.first_key.value == sst2_s.first_key.value);
|
||||
BOOST_REQUIRE(sst1_s.last_key.value == sst2_s.last_key.value);
|
||||
});
|
||||
});
|
||||
}).then([tmp] {});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(check_filter_func) {
|
||||
@@ -255,8 +259,9 @@ SEASTAR_TEST_CASE(check_filter_func) {
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(check_statistics_func) {
|
||||
return do_write_sst("tests/sstables/compressed", 1).then([] (auto sst1) {
|
||||
auto sst2 = make_lw_shared<sstable>("ks", "cf", "tests/sstables/compressed", 2, la, big);
|
||||
auto tmp = make_lw_shared<tmpdir>();
|
||||
return do_write_sst("tests/sstables/compressed", tmp->path, 1).then([tmp] (auto sst1) {
|
||||
auto sst2 = make_lw_shared<sstable>("ks", "cf", tmp->path, 2, la, big);
|
||||
return sstables::test(sst2).read_statistics().then([sst1, sst2] {
|
||||
statistics& sst1_s = sstables::test(sst1).get_statistics();
|
||||
statistics& sst2_s = sstables::test(sst2).get_statistics();
|
||||
@@ -271,19 +276,20 @@ SEASTAR_TEST_CASE(check_statistics_func) {
|
||||
});
|
||||
// TODO: compare the field contents from both sstables.
|
||||
});
|
||||
});
|
||||
}).then([tmp] {});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(check_toc_func) {
|
||||
return do_write_sst("tests/sstables/compressed", 1).then([] (auto sst1) {
|
||||
auto sst2 = make_lw_shared<sstable>("ks", "cf", "tests/sstables/compressed", 2, la, big);
|
||||
auto tmp = make_lw_shared<tmpdir>();
|
||||
return do_write_sst("tests/sstables/compressed", tmp->path, 1).then([tmp] (auto sst1) {
|
||||
auto sst2 = make_lw_shared<sstable>("ks", "cf", tmp->path, 2, la, big);
|
||||
return sstables::test(sst2).read_toc().then([sst1, sst2] {
|
||||
auto& sst1_c = sstables::test(sst1).get_components();
|
||||
auto& sst2_c = sstables::test(sst2).get_components();
|
||||
|
||||
BOOST_REQUIRE(sst1_c == sst2_c);
|
||||
});
|
||||
});
|
||||
}).then([tmp] {});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(uncompressed_random_access_read) {
|
||||
@@ -857,6 +863,7 @@ SEASTAR_TEST_CASE(reshuffle) {
|
||||
cfg.enable_incremental_backups = false;
|
||||
auto cf = make_lw_shared<column_family>(uncompressed_schema(), cfg, column_family::no_commitlog(), *cm);
|
||||
cf->start();
|
||||
cf->mark_ready_for_writes();
|
||||
return cf->reshuffle_sstables(3).then([cm, cf] (std::vector<sstables::entry_descriptor> reshuffled) {
|
||||
BOOST_REQUIRE(reshuffled.size() == 2);
|
||||
BOOST_REQUIRE(reshuffled[0].generation == 3);
|
||||
|
||||
@@ -100,6 +100,10 @@ public:
|
||||
_sst->_generation = generation;
|
||||
}
|
||||
|
||||
void change_dir(sstring dir) {
|
||||
_sst->_dir = dir;
|
||||
}
|
||||
|
||||
future<> store() {
|
||||
_sst->_components.erase(sstable::component_type::Index);
|
||||
_sst->_components.erase(sstable::component_type::Data);
|
||||
|
||||
@@ -135,6 +135,8 @@ void test_timestamp_like_string_conversions(data_type timestamp_type) {
|
||||
BOOST_REQUIRE(timestamp_type->equal(timestamp_type->from_string("2015-07-03T12:30:00+1230"), timestamp_type->decompose(tp)));
|
||||
BOOST_REQUIRE(timestamp_type->equal(timestamp_type->from_string("2015-07-02T23:00-0100"), timestamp_type->decompose(tp)));
|
||||
|
||||
BOOST_REQUIRE_EQUAL(timestamp_type->to_string(timestamp_type->decompose(tp)), "2015-07-03T00:00:00");
|
||||
|
||||
auto now = time(nullptr);
|
||||
auto local_now = *localtime(&now);
|
||||
char buf[100];
|
||||
|
||||
20
types.cc
20
types.cc
@@ -40,6 +40,14 @@
|
||||
#include <boost/multiprecision/cpp_int.hpp>
|
||||
#include "utils/big_decimal.hh"
|
||||
|
||||
template<typename T>
|
||||
sstring time_point_to_string(const T& tp)
|
||||
{
|
||||
auto timestamp = tp.time_since_epoch().count();
|
||||
auto time = boost::posix_time::from_time_t(0) + boost::posix_time::milliseconds(timestamp);
|
||||
return boost::posix_time::to_iso_extended_string(time);
|
||||
}
|
||||
|
||||
static const char* int32_type_name = "org.apache.cassandra.db.marshal.Int32Type";
|
||||
static const char* long_type_name = "org.apache.cassandra.db.marshal.LongType";
|
||||
static const char* ascii_type_name = "org.apache.cassandra.db.marshal.AsciiType";
|
||||
@@ -421,7 +429,11 @@ public:
|
||||
}
|
||||
virtual bytes from_string(sstring_view s) const override;
|
||||
virtual sstring to_string(const bytes& b) const override {
|
||||
throw std::runtime_error(sprint("%s not implemented", __PRETTY_FUNCTION__));
|
||||
auto v = deserialize(b);
|
||||
if (v.is_null()) {
|
||||
return "";
|
||||
}
|
||||
return time_point_to_string(from_value(v).get());
|
||||
}
|
||||
virtual ::shared_ptr<cql3::cql3_type> as_cql3_type() const override {
|
||||
return cql3::cql3_type::timestamp;
|
||||
@@ -684,7 +696,11 @@ public:
|
||||
return b;
|
||||
}
|
||||
virtual sstring to_string(const bytes& b) const override {
|
||||
throw std::runtime_error(sprint("%s not implemented", __PRETTY_FUNCTION__));
|
||||
auto v = deserialize(b);
|
||||
if (v.is_null()) {
|
||||
return "";
|
||||
}
|
||||
return time_point_to_string(from_value(v).get());
|
||||
}
|
||||
virtual ::shared_ptr<cql3::cql3_type> as_cql3_type() const override {
|
||||
return cql3::cql3_type::timestamp;
|
||||
|
||||
@@ -1211,9 +1211,11 @@ public:
|
||||
assert(seg->is_empty());
|
||||
free_segment(seg);
|
||||
}
|
||||
_closed_occupancy = {};
|
||||
if (_active) {
|
||||
assert(_active->is_empty());
|
||||
free_segment(_active);
|
||||
_active = nullptr;
|
||||
}
|
||||
if (_group) {
|
||||
_group->del(this);
|
||||
|
||||
Reference in New Issue
Block a user