Compare commits
28 Commits
scylla-2.2
...
next-2.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d27eb734a7 | ||
|
|
e6aeb490b5 | ||
|
|
2e3b09b593 | ||
|
|
92c74f4e0b | ||
|
|
89d835e9e3 | ||
|
|
263a740084 | ||
|
|
7f24b5319e | ||
|
|
fe16c0e985 | ||
|
|
f85badaaac | ||
|
|
2193d41683 | ||
|
|
1e1f0c29bf | ||
|
|
84d4588b5f | ||
|
|
7b43b26709 | ||
|
|
0ed01acf15 | ||
|
|
7ce160f408 | ||
|
|
5017d9b46a | ||
|
|
50b6ab3552 | ||
|
|
b1652823aa | ||
|
|
02b24aec34 | ||
|
|
22eea4d8cf | ||
|
|
d257f6d57c | ||
|
|
6fca92ac3c | ||
|
|
26e3917046 | ||
|
|
3892594a93 | ||
|
|
4b24439841 | ||
|
|
a02a4592d8 | ||
|
|
b6e1c08451 | ||
|
|
9469afcd27 |
@@ -1,6 +1,6 @@
|
||||
#!/bin/sh
|
||||
|
||||
VERSION=2.2.0
|
||||
VERSION=2.2.2
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -2193,11 +2193,11 @@
|
||||
"description":"The column family"
|
||||
},
|
||||
"total":{
|
||||
"type":"int",
|
||||
"type":"long",
|
||||
"description":"The total snapshot size"
|
||||
},
|
||||
"live":{
|
||||
"type":"int",
|
||||
"type":"long",
|
||||
"description":"The live snapshot size"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -149,7 +149,9 @@ static sstring gensalt() {
|
||||
// blowfish 2011 fix, blowfish, sha512, sha256, md5
|
||||
for (sstring pfx : { "$2y$", "$2a$", "$6$", "$5$", "$1$" }) {
|
||||
salt = pfx + input;
|
||||
if (crypt_r("fisk", salt.c_str(), &tlcrypt)) {
|
||||
const char* e = crypt_r("fisk", salt.c_str(), &tlcrypt);
|
||||
|
||||
if (e && (e[0] != '*')) {
|
||||
prefix = pfx;
|
||||
return salt;
|
||||
}
|
||||
|
||||
@@ -60,6 +60,7 @@ class cache_flat_mutation_reader final : public flat_mutation_reader::impl {
|
||||
// - _next_row_in_range = _next.position() < _upper_bound
|
||||
// - _last_row points at a direct predecessor of the next row which is going to be read.
|
||||
// Used for populating continuity.
|
||||
// - _population_range_starts_before_all_rows is set accordingly
|
||||
reading_from_underlying,
|
||||
|
||||
end_of_stream
|
||||
@@ -86,6 +87,13 @@ class cache_flat_mutation_reader final : public flat_mutation_reader::impl {
|
||||
partition_snapshot_row_cursor _next_row;
|
||||
bool _next_row_in_range = false;
|
||||
|
||||
// True iff current population interval, since the previous clustering row, starts before all clustered rows.
|
||||
// We cannot just look at _lower_bound, because emission of range tombstones changes _lower_bound and
|
||||
// because we mark clustering intervals as continuous when consuming a clustering_row, it would prevent
|
||||
// us from marking the interval as continuous.
|
||||
// Valid when _state == reading_from_underlying.
|
||||
bool _population_range_starts_before_all_rows;
|
||||
|
||||
future<> do_fill_buffer(db::timeout_clock::time_point);
|
||||
void copy_from_cache_to_buffer();
|
||||
future<> process_static_row(db::timeout_clock::time_point);
|
||||
@@ -226,6 +234,7 @@ inline
|
||||
future<> cache_flat_mutation_reader::do_fill_buffer(db::timeout_clock::time_point timeout) {
|
||||
if (_state == state::move_to_underlying) {
|
||||
_state = state::reading_from_underlying;
|
||||
_population_range_starts_before_all_rows = _lower_bound.is_before_all_clustered_rows(*_schema);
|
||||
auto end = _next_row_in_range ? position_in_partition(_next_row.position())
|
||||
: position_in_partition(_upper_bound);
|
||||
return _read_context->fast_forward_to(position_range{_lower_bound, std::move(end)}, timeout).then([this, timeout] {
|
||||
@@ -351,7 +360,7 @@ future<> cache_flat_mutation_reader::read_from_underlying(db::timeout_clock::tim
|
||||
|
||||
inline
|
||||
bool cache_flat_mutation_reader::ensure_population_lower_bound() {
|
||||
if (!_ck_ranges_curr->start()) {
|
||||
if (_population_range_starts_before_all_rows) {
|
||||
return true;
|
||||
}
|
||||
if (!_last_row.refresh(*_snp)) {
|
||||
@@ -406,6 +415,7 @@ inline
|
||||
void cache_flat_mutation_reader::maybe_add_to_cache(const clustering_row& cr) {
|
||||
if (!can_populate()) {
|
||||
_last_row = nullptr;
|
||||
_population_range_starts_before_all_rows = false;
|
||||
_read_context->cache().on_mispopulate();
|
||||
return;
|
||||
}
|
||||
@@ -439,6 +449,7 @@ void cache_flat_mutation_reader::maybe_add_to_cache(const clustering_row& cr) {
|
||||
with_allocator(standard_allocator(), [&] {
|
||||
_last_row = partition_snapshot_row_weakref(*_snp, it, true);
|
||||
});
|
||||
_population_range_starts_before_all_rows = false;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -67,6 +67,12 @@ class error_collector : public error_listener<RecognizerType, ExceptionBaseType>
|
||||
*/
|
||||
const sstring_view _query;
|
||||
|
||||
/**
|
||||
* An empty bitset to be used as a workaround for AntLR null dereference
|
||||
* bug.
|
||||
*/
|
||||
static typename ExceptionBaseType::BitsetListType _empty_bit_list;
|
||||
|
||||
public:
|
||||
|
||||
/**
|
||||
@@ -144,6 +150,14 @@ private:
|
||||
break;
|
||||
}
|
||||
default:
|
||||
// AntLR Exception class has a bug of dereferencing a null
|
||||
// pointer in the displayRecognitionError. The following
|
||||
// if statement makes sure it will not be null before the
|
||||
// call to that function (displayRecognitionError).
|
||||
// bug reference: https://github.com/antlr/antlr3/issues/191
|
||||
if (!ex->get_expectingSet()) {
|
||||
ex->set_expectingSet(&_empty_bit_list);
|
||||
}
|
||||
ex->displayRecognitionError(token_names, msg);
|
||||
}
|
||||
return msg.str();
|
||||
@@ -345,4 +359,8 @@ private:
|
||||
#endif
|
||||
};
|
||||
|
||||
template<typename RecognizerType, typename TokenType, typename ExceptionBaseType>
|
||||
typename ExceptionBaseType::BitsetListType
|
||||
error_collector<RecognizerType,TokenType,ExceptionBaseType>::_empty_bit_list = typename ExceptionBaseType::BitsetListType();
|
||||
|
||||
}
|
||||
|
||||
@@ -209,19 +209,18 @@ void query_options::prepare(const std::vector<::shared_ptr<column_specification>
|
||||
}
|
||||
|
||||
auto& names = *_names;
|
||||
std::vector<cql3::raw_value> ordered_values;
|
||||
std::vector<cql3::raw_value_view> ordered_values;
|
||||
ordered_values.reserve(specs.size());
|
||||
for (auto&& spec : specs) {
|
||||
auto& spec_name = spec->name->text();
|
||||
for (size_t j = 0; j < names.size(); j++) {
|
||||
if (names[j] == spec_name) {
|
||||
ordered_values.emplace_back(_values[j]);
|
||||
ordered_values.emplace_back(_value_views[j]);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
_values = std::move(ordered_values);
|
||||
fill_value_views();
|
||||
_value_views = std::move(ordered_values);
|
||||
}
|
||||
|
||||
void query_options::fill_value_views()
|
||||
|
||||
@@ -202,6 +202,14 @@ public:
|
||||
const query_options& options,
|
||||
gc_clock::time_point now) const override;
|
||||
|
||||
virtual std::vector<bytes_opt> values_raw(const query_options& options) const = 0;
|
||||
|
||||
virtual std::vector<bytes_opt> values(const query_options& options) const override {
|
||||
std::vector<bytes_opt> ret = values_raw(options);
|
||||
std::sort(ret.begin(),ret.end());
|
||||
ret.erase(std::unique(ret.begin(),ret.end()),ret.end());
|
||||
return ret;
|
||||
}
|
||||
#if 0
|
||||
@Override
|
||||
protected final boolean isSupportedBy(SecondaryIndex index)
|
||||
@@ -224,7 +232,7 @@ public:
|
||||
return abstract_restriction::term_uses_function(_values, ks_name, function_name);
|
||||
}
|
||||
|
||||
virtual std::vector<bytes_opt> values(const query_options& options) const override {
|
||||
virtual std::vector<bytes_opt> values_raw(const query_options& options) const override {
|
||||
std::vector<bytes_opt> ret;
|
||||
for (auto&& v : _values) {
|
||||
ret.emplace_back(to_bytes_opt(v->bind_and_get(options)));
|
||||
@@ -249,7 +257,7 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
virtual std::vector<bytes_opt> values(const query_options& options) const override {
|
||||
virtual std::vector<bytes_opt> values_raw(const query_options& options) const override {
|
||||
auto&& lval = dynamic_pointer_cast<multi_item_terminal>(_marker->bind(options));
|
||||
if (!lval) {
|
||||
throw exceptions::invalid_request_exception("Invalid null value for IN restriction");
|
||||
|
||||
@@ -105,9 +105,11 @@ public:
|
||||
virtual void reset() = 0;
|
||||
|
||||
virtual assignment_testable::test_result test_assignment(database& db, const sstring& keyspace, ::shared_ptr<column_specification> receiver) override {
|
||||
if (receiver->type == get_type()) {
|
||||
auto t1 = receiver->type->underlying_type();
|
||||
auto t2 = get_type()->underlying_type();
|
||||
if (t1 == t2) {
|
||||
return assignment_testable::test_result::EXACT_MATCH;
|
||||
} else if (receiver->type->is_value_compatible_with(*get_type())) {
|
||||
} else if (t1->is_value_compatible_with(*t2)) {
|
||||
return assignment_testable::test_result::WEAKLY_ASSIGNABLE;
|
||||
} else {
|
||||
return assignment_testable::test_result::NOT_ASSIGNABLE;
|
||||
|
||||
@@ -53,6 +53,9 @@ update_parameters::get_prefetched_list(
|
||||
return {};
|
||||
}
|
||||
|
||||
if (column.is_static()) {
|
||||
ckey = clustering_key_view::make_empty();
|
||||
}
|
||||
auto i = _prefetched->rows.find(std::make_pair(std::move(pkey), std::move(ckey)));
|
||||
if (i == _prefetched->rows.end()) {
|
||||
return {};
|
||||
|
||||
@@ -1637,9 +1637,9 @@ future<> distributed_loader::open_sstable(distributed<database>& db, sstables::e
|
||||
// to distribute evenly the resource usage among all shards.
|
||||
|
||||
return db.invoke_on(column_family::calculate_shard_from_sstable_generation(comps.generation),
|
||||
[&db, comps = std::move(comps), func = std::move(func), pc] (database& local) {
|
||||
[&db, comps = std::move(comps), func = std::move(func), &pc] (database& local) {
|
||||
|
||||
return with_semaphore(local.sstable_load_concurrency_sem(), 1, [&db, &local, comps = std::move(comps), func = std::move(func), pc] {
|
||||
return with_semaphore(local.sstable_load_concurrency_sem(), 1, [&db, &local, comps = std::move(comps), func = std::move(func), &pc] {
|
||||
auto& cf = local.find_column_family(comps.ks, comps.cf);
|
||||
|
||||
auto f = sstables::sstable::load_shared_components(cf.schema(), cf._config.datadir, comps.generation, comps.version, comps.format, pc);
|
||||
|
||||
2
dist/ami/files/.bash_profile
vendored
2
dist/ami/files/.bash_profile
vendored
@@ -120,7 +120,7 @@ else
|
||||
fi
|
||||
fi
|
||||
echo -n " "
|
||||
/usr/lib/scylla/scylla_ec2_check
|
||||
/usr/lib/scylla/scylla_ec2_check --nic eth0
|
||||
if [ $? -eq 0 ]; then
|
||||
echo
|
||||
fi
|
||||
|
||||
30
dist/common/scripts/scylla_ec2_check
vendored
30
dist/common/scripts/scylla_ec2_check
vendored
@@ -2,6 +2,12 @@
|
||||
|
||||
. /usr/lib/scylla/scylla_lib.sh
|
||||
|
||||
print_usage() {
|
||||
echo "scylla_ec2_check --nic eth0"
|
||||
echo " --nic specify NIC"
|
||||
exit 1
|
||||
}
|
||||
|
||||
get_en_interface_type() {
|
||||
TYPE=`curl -s http://169.254.169.254/latest/meta-data/instance-type|cut -d . -f 1`
|
||||
SUBTYPE=`curl -s http://169.254.169.254/latest/meta-data/instance-type|cut -d . -f 2`
|
||||
@@ -18,7 +24,7 @@ get_en_interface_type() {
|
||||
}
|
||||
|
||||
is_vpc_enabled() {
|
||||
MAC=`cat /sys/class/net/eth0/address`
|
||||
MAC=`cat /sys/class/net/$1/address`
|
||||
VPC_AVAIL=`curl -s http://169.254.169.254/latest/meta-data/network/interfaces/macs/$MAC/|grep vpc-id`
|
||||
[ -n "$VPC_AVAIL" ]
|
||||
}
|
||||
@@ -27,9 +33,27 @@ if ! is_ec2; then
|
||||
exit 0
|
||||
fi
|
||||
|
||||
if [ $# -eq 0 ]; then
|
||||
print_usage
|
||||
fi
|
||||
while [ $# -gt 0 ]; do
|
||||
case "$1" in
|
||||
"--nic")
|
||||
verify_args $@
|
||||
NIC="$2"
|
||||
shift 2
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
if ! is_valid_nic $NIC; then
|
||||
echo "NIC $NIC doesn't exist."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
TYPE=`curl -s http://169.254.169.254/latest/meta-data/instance-type`
|
||||
EN=`get_en_interface_type`
|
||||
DRIVER=`ethtool -i eth0|awk '/^driver:/ {print $2}'`
|
||||
DRIVER=`ethtool -i $NIC|awk '/^driver:/ {print $2}'`
|
||||
if [ "$EN" = "" ]; then
|
||||
tput setaf 1
|
||||
tput bold
|
||||
@@ -39,7 +63,7 @@ if [ "$EN" = "" ]; then
|
||||
echo "More documentation available at: "
|
||||
echo "http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/enhanced-networking.html#enabling_enhanced_networking"
|
||||
exit 1
|
||||
elif ! is_vpc_enabled; then
|
||||
elif ! is_vpc_enabled $NIC; then
|
||||
tput setaf 1
|
||||
tput bold
|
||||
echo "VPC is not enabled!"
|
||||
|
||||
4
dist/common/scripts/scylla_lib.sh
vendored
4
dist/common/scripts/scylla_lib.sh
vendored
@@ -91,6 +91,10 @@ create_perftune_conf() {
|
||||
/usr/lib/scylla/perftune.py --tune net --nic "$nic" $mode --dump-options-file > /etc/scylla.d/perftune.yaml
|
||||
}
|
||||
|
||||
is_valid_nic() {
|
||||
[ -d /sys/class/net/$1 ]
|
||||
}
|
||||
|
||||
. /etc/os-release
|
||||
if is_debian_variant || is_gentoo_variant; then
|
||||
SYSCONFIG=/etc/default
|
||||
|
||||
57
dist/common/scripts/scylla_setup
vendored
57
dist/common/scripts/scylla_setup
vendored
@@ -39,6 +39,27 @@ print_usage() {
|
||||
exit 1
|
||||
}
|
||||
|
||||
interactive_choose_nic() {
|
||||
NICS=$(for i in /sys/class/net/*;do nic=`basename $i`; if [ "$nic" != "lo" ]; then echo $nic; fi; done)
|
||||
NR_NICS=`echo $NICS|wc -w`
|
||||
if [ $NR_NICS -eq 0 ]; then
|
||||
echo "NIC not found."
|
||||
exit 1
|
||||
elif [ $NR_NICS -eq 1 ]; then
|
||||
NIC=$NICS
|
||||
else
|
||||
echo "Please select NIC from following list: "
|
||||
while true; do
|
||||
echo $NICS
|
||||
echo -n "> "
|
||||
read NIC
|
||||
if is_valid_nic $NIC; then
|
||||
break
|
||||
fi
|
||||
done
|
||||
fi
|
||||
}
|
||||
|
||||
interactive_ask_service() {
|
||||
echo $1
|
||||
echo $2
|
||||
@@ -112,14 +133,20 @@ run_setup_script() {
|
||||
name=$1
|
||||
shift 1
|
||||
$* &&:
|
||||
if [ $? -ne 0 ] && [ $INTERACTIVE -eq 1 ]; then
|
||||
printf "${RED}$name setup failed. press any key to continue...${NO_COLOR}\n"
|
||||
read
|
||||
return 1
|
||||
if [ $? -ne 0 ]; then
|
||||
if [ $INTERACTIVE -eq 1 ]; then
|
||||
printf "${RED}$name setup failed. press any key to continue...${NO_COLOR}\n"
|
||||
read
|
||||
return 1
|
||||
else
|
||||
printf "$name setup failed.\n"
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
return 0
|
||||
}
|
||||
|
||||
NIC="eth0"
|
||||
AMI=0
|
||||
SET_NIC=0
|
||||
DEV_MODE=0
|
||||
@@ -260,7 +287,8 @@ if is_ec2; then
|
||||
EC2_CHECK=$?
|
||||
fi
|
||||
if [ $EC2_CHECK -eq 1 ]; then
|
||||
/usr/lib/scylla/scylla_ec2_check
|
||||
interactive_choose_nic
|
||||
/usr/lib/scylla/scylla_ec2_check --nic $NIC
|
||||
fi
|
||||
fi
|
||||
|
||||
@@ -447,24 +475,6 @@ if [ $INTERACTIVE -eq 1 ]; then
|
||||
interactive_ask_service "Do you want to setup sysconfig?" "Answer yes to do system wide configuration customized for Scylla. Answer no to do nothing." "yes" &&:
|
||||
SYSCONFIG_SETUP=$?
|
||||
if [ $SYSCONFIG_SETUP -eq 1 ]; then
|
||||
NICS=$(for i in /sys/class/net/*;do nic=`basename $i`; if [ "$nic" != "lo" ]; then echo $nic; fi; done)
|
||||
NR_NICS=`echo $NICS|wc -w`
|
||||
if [ $NR_NICS -eq 0 ]; then
|
||||
echo "NIC not found."
|
||||
exit 1
|
||||
elif [ $NR_NICS -eq 1 ]; then
|
||||
NIC=$NICS
|
||||
else
|
||||
echo "Please select NIC from following list: "
|
||||
while true; do
|
||||
echo $NICS
|
||||
echo -n "> "
|
||||
read NIC
|
||||
if [ -e /sys/class/net/$NIC ]; then
|
||||
break
|
||||
fi
|
||||
done
|
||||
fi
|
||||
interactive_ask_service "Do you want to optimize NIC queue settings?" "Answer yes to enable network card optimization and improve performance. Answer no to skip this optimization." "yes" &&:
|
||||
SET_NIC=$?
|
||||
fi
|
||||
@@ -474,6 +484,7 @@ if [ $SYSCONFIG_SETUP -eq 1 ]; then
|
||||
if [ $SET_NIC -eq 1 ]; then
|
||||
SETUP_ARGS="--setup-nic"
|
||||
fi
|
||||
interactive_choose_nic
|
||||
run_setup_script "NIC queue" /usr/lib/scylla/scylla_sysconfig_setup --nic $NIC $SETUP_ARGS
|
||||
fi
|
||||
|
||||
|
||||
13
dist/debian/build_deb.sh
vendored
13
dist/debian/build_deb.sh
vendored
@@ -254,19 +254,18 @@ if [ "$TARGET" != "trusty" ]; then
|
||||
cp dist/common/systemd/node-exporter.service debian/scylla-server.node-exporter.service
|
||||
fi
|
||||
|
||||
sudo cp ./dist/debian/pbuilderrc ~root/.pbuilderrc
|
||||
if [ $NO_CLEAN -eq 0 ]; then
|
||||
sudo rm -fv /var/cache/pbuilder/scylla-server-$TARGET.tgz
|
||||
sudo -H DIST=$TARGET /usr/sbin/pbuilder clean
|
||||
sudo -H DIST=$TARGET /usr/sbin/pbuilder create --allow-untrusted
|
||||
sudo DIST=$TARGET /usr/sbin/pbuilder clean --configfile ./dist/debian/pbuilderrc
|
||||
sudo DIST=$TARGET /usr/sbin/pbuilder create --configfile ./dist/debian/pbuilderrc --allow-untrusted
|
||||
fi
|
||||
if [ $JOBS -ne 0 ]; then
|
||||
DEB_BUILD_OPTIONS="parallel=$JOBS"
|
||||
fi
|
||||
sudo -H DIST=$TARGET /usr/sbin/pbuilder update --allow-untrusted
|
||||
sudo -H DIST=$TARGET /usr/sbin/pbuilder update --configfile ./dist/debian/pbuilderrc --allow-untrusted
|
||||
if [ "$TARGET" = "trusty" ] || [ "$TARGET" = "xenial" ] || [ "$TARGET" = "yakkety" ] || [ "$TARGET" = "zesty" ] || [ "$TARGET" = "artful" ] || [ "$TARGET" = "bionic" ]; then
|
||||
sudo -H DIST=$TARGET /usr/sbin/pbuilder execute --save-after-exec dist/debian/ubuntu_enable_ppa.sh
|
||||
sudo DIST=$TARGET /usr/sbin/pbuilder execute --configfile ./dist/debian/pbuilderrc --save-after-exec dist/debian/ubuntu_enable_ppa.sh
|
||||
elif [ "$TARGET" = "jessie" ] || [ "$TARGET" = "stretch" ]; then
|
||||
sudo -H DIST=$TARGET /usr/sbin/pbuilder execute --save-after-exec dist/debian/debian_install_gpgkey.sh
|
||||
sudo DIST=$TARGET /usr/sbin/pbuilder execute --configfile ./dist/debian/pbuilderrc --save-after-exec dist/debian/debian_install_gpgkey.sh
|
||||
fi
|
||||
sudo -H DIST=$TARGET DEB_BUILD_OPTIONS=$DEB_BUILD_OPTIONS pdebuild --buildresult build/debs
|
||||
sudo -H DIST=$TARGET DEB_BUILD_OPTIONS=$DEB_BUILD_OPTIONS pdebuild --configfile ./dist/debian/pbuilderrc --buildresult build/debs
|
||||
|
||||
4
keys.hh
4
keys.hh
@@ -721,6 +721,10 @@ public:
|
||||
static const compound& get_compound_type(const schema& s) {
|
||||
return s.clustering_key_prefix_type();
|
||||
}
|
||||
|
||||
static clustering_key_prefix_view make_empty() {
|
||||
return { bytes_view() };
|
||||
}
|
||||
};
|
||||
|
||||
class clustering_key_prefix : public prefix_compound_wrapper<clustering_key_prefix, clustering_key_prefix_view, clustering_key> {
|
||||
|
||||
@@ -119,9 +119,17 @@ insert_token_range_to_sorted_container_while_unwrapping(
|
||||
const dht::token& tok,
|
||||
dht::token_range_vector& ret) {
|
||||
if (prev_tok < tok) {
|
||||
ret.emplace_back(
|
||||
dht::token_range::bound(prev_tok, false),
|
||||
dht::token_range::bound(tok, true));
|
||||
auto pos = ret.end();
|
||||
if (!ret.empty() && !std::prev(pos)->end()) {
|
||||
// We inserted a wrapped range (a, b] previously as
|
||||
// (-inf, b], (a, +inf). So now we insert in the next-to-last
|
||||
// position to keep the last range (a, +inf) at the end.
|
||||
pos = std::prev(pos);
|
||||
}
|
||||
ret.insert(pos,
|
||||
dht::token_range{
|
||||
dht::token_range::bound(prev_tok, false),
|
||||
dht::token_range::bound(tok, true)});
|
||||
} else {
|
||||
ret.emplace_back(
|
||||
dht::token_range::bound(prev_tok, false),
|
||||
|
||||
@@ -1089,7 +1089,7 @@ row::apply_monotonically(const column_definition& column, atomic_cell_or_collect
|
||||
if (_type == storage_type::vector && id < max_vector_size) {
|
||||
if (id >= _storage.vector.v.size()) {
|
||||
_storage.vector.v.resize(id);
|
||||
_storage.vector.v.emplace_back(cell_and_hash{std::move(value), std::move(hash)});
|
||||
_storage.vector.v.emplace_back(std::move(value), std::move(hash));
|
||||
_storage.vector.present.set(id);
|
||||
_size++;
|
||||
} else if (auto& cell_and_hash = _storage.vector.v[id]; !bool(cell_and_hash.cell)) {
|
||||
@@ -1753,9 +1753,10 @@ void mutation_querier::query_static_row(const row& r, tombstone current_tombston
|
||||
} else if (_short_reads_allowed) {
|
||||
seastar::measuring_output_stream stream;
|
||||
ser::qr_partition__static_row__cells<seastar::measuring_output_stream> out(stream, { });
|
||||
auto start = stream.size();
|
||||
get_compacted_row_slice(_schema, slice, column_kind::static_column,
|
||||
r, slice.static_columns, _static_cells_wr);
|
||||
_memory_accounter.update(stream.size());
|
||||
r, slice.static_columns, out);
|
||||
_memory_accounter.update(stream.size() - start);
|
||||
}
|
||||
if (_pw.requested_digest()) {
|
||||
max_timestamp max_ts{_pw.last_modified()};
|
||||
@@ -1816,8 +1817,9 @@ stop_iteration mutation_querier::consume(clustering_row&& cr, row_tombstone curr
|
||||
} else if (_short_reads_allowed) {
|
||||
seastar::measuring_output_stream stream;
|
||||
ser::qr_partition__rows<seastar::measuring_output_stream> out(stream, { });
|
||||
auto start = stream.size();
|
||||
write_row(out);
|
||||
stop = _memory_accounter.update_and_check(stream.size());
|
||||
stop = _memory_accounter.update_and_check(stream.size() - start);
|
||||
}
|
||||
|
||||
_live_clustering_rows++;
|
||||
|
||||
@@ -74,6 +74,17 @@ using cell_hash_opt = seastar::optimized_optional<cell_hash>;
|
||||
struct cell_and_hash {
|
||||
atomic_cell_or_collection cell;
|
||||
mutable cell_hash_opt hash;
|
||||
|
||||
cell_and_hash() = default;
|
||||
cell_and_hash(cell_and_hash&&) noexcept = default;
|
||||
cell_and_hash& operator=(cell_and_hash&&) noexcept = default;
|
||||
cell_and_hash(const cell_and_hash&) = default;
|
||||
cell_and_hash& operator=(const cell_and_hash&) = default;
|
||||
|
||||
cell_and_hash(atomic_cell_or_collection&& cell, cell_hash_opt hash)
|
||||
: cell(std::move(cell))
|
||||
, hash(hash)
|
||||
{ }
|
||||
};
|
||||
|
||||
//
|
||||
|
||||
@@ -273,6 +273,11 @@ public:
|
||||
return is_partition_end() || (_ck && _ck->is_empty(s) && _bound_weight > 0);
|
||||
}
|
||||
|
||||
bool is_before_all_clustered_rows(const schema& s) const {
|
||||
return _type < partition_region::clustered
|
||||
|| (_type == partition_region::clustered && _ck->is_empty(s) && _bound_weight < 0);
|
||||
}
|
||||
|
||||
template<typename Hasher>
|
||||
void feed_hash(Hasher& hasher, const schema& s) const {
|
||||
::feed_hash(hasher, _bound_weight);
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 6f61d7456e...88cb58cfbf
@@ -144,7 +144,11 @@ future<lowres_clock::duration> cache_hitrate_calculator::recalculate_hitrates()
|
||||
return _db.invoke_on_all([this, rates = std::move(rates), cpuid = engine().cpu_id()] (database& db) {
|
||||
sstring gstate;
|
||||
for (auto& cf : db.get_column_families() | boost::adaptors::filtered(non_system_filter)) {
|
||||
stat s = rates.at(cf.first);
|
||||
auto it = rates.find(cf.first);
|
||||
if (it == rates.end()) { // a table may be added before map/reduce compltes and this code runs
|
||||
continue;
|
||||
}
|
||||
stat s = it->second;
|
||||
float rate = 0;
|
||||
if (s.h) {
|
||||
rate = s.h / (s.h + s.m);
|
||||
|
||||
@@ -3220,9 +3220,22 @@ storage_proxy::query_partition_key_range(lw_shared_ptr<query::read_command> cmd,
|
||||
slogger.debug("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
|
||||
result_rows_per_range, cmd->row_limit, ranges.size(), concurrency_factor);
|
||||
|
||||
// The call to `query_partition_key_range_concurrent()` below
|
||||
// updates `cmd` directly when processing the results. Under
|
||||
// some circumstances, when the query executes without deferring,
|
||||
// this updating will happen before the lambda object is constructed
|
||||
// and hence the updates will be visible to the lambda. This will
|
||||
// result in the merger below trimming the results according to the
|
||||
// updated (decremented) limits and causing the paging logic to
|
||||
// declare the query exhausted due to the non-full page. To avoid
|
||||
// this save the original values of the limits here and pass these
|
||||
// to the lambda below.
|
||||
const auto row_limit = cmd->row_limit;
|
||||
const auto partition_limit = cmd->partition_limit;
|
||||
|
||||
return query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, ranges.begin(), std::move(ranges), concurrency_factor,
|
||||
std::move(trace_state), cmd->row_limit, cmd->partition_limit)
|
||||
.then([row_limit = cmd->row_limit, partition_limit = cmd->partition_limit](std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results) {
|
||||
.then([row_limit, partition_limit](std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results) {
|
||||
query::result_merger merger(row_limit, partition_limit);
|
||||
merger.reserve(results.size());
|
||||
|
||||
@@ -3585,6 +3598,7 @@ future<> storage_proxy::truncate_blocking(sstring keyspace, sstring cfname) {
|
||||
std::rethrow_exception(ep);
|
||||
} catch (rpc::timeout_error& e) {
|
||||
slogger.trace("Truncation of {} timed out: {}", cfname, e.what());
|
||||
throw;
|
||||
} catch (...) {
|
||||
throw;
|
||||
}
|
||||
|
||||
@@ -215,3 +215,22 @@ SEASTAR_TEST_CASE(test_aggregate_count) {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_reverse_type_aggregation) {
|
||||
return do_with_cql_env_thread([&] (auto& e) {
|
||||
e.execute_cql("CREATE TABLE test(p int, c timestamp, v int, primary key (p, c)) with clustering order by (c desc)").get();
|
||||
e.execute_cql("INSERT INTO test(p, c, v) VALUES (1, 1, 1)").get();
|
||||
e.execute_cql("INSERT INTO test(p, c, v) VALUES (1, 2, 1)").get();
|
||||
|
||||
{
|
||||
auto tp = db_clock::from_time_t({ 0 }) + std::chrono::milliseconds(1);
|
||||
auto msg = e.execute_cql("SELECT min(c) FROM test").get0();
|
||||
assert_that(msg).is_rows().with_size(1).with_row({{timestamp_type->decompose(tp)}});
|
||||
}
|
||||
{
|
||||
auto tp = db_clock::from_time_t({ 0 }) + std::chrono::milliseconds(2);
|
||||
auto msg = e.execute_cql("SELECT max(c) FROM test").get0();
|
||||
assert_that(msg).is_rows().with_size(1).with_row({{timestamp_type->decompose(tp)}});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2047,10 +2047,9 @@ SEASTAR_TEST_CASE(test_in_restriction) {
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
return e.execute_cql("select r1 from tir where p1 in (2, 0, 2, 1);");
|
||||
}).then([&e] (shared_ptr<cql_transport::messages::result_message> msg) {
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
assert_that(msg).is_rows().with_rows_ignore_order({
|
||||
{int32_type->decompose(4)},
|
||||
{int32_type->decompose(0)},
|
||||
{int32_type->decompose(4)},
|
||||
{int32_type->decompose(1)},
|
||||
{int32_type->decompose(2)},
|
||||
{int32_type->decompose(3)},
|
||||
@@ -2072,6 +2071,22 @@ SEASTAR_TEST_CASE(test_in_restriction) {
|
||||
{int32_type->decompose(2)},
|
||||
{int32_type->decompose(1)},
|
||||
});
|
||||
return e.prepare("select r1 from tir where p1 in ?");
|
||||
}).then([&e] (cql3::prepared_cache_key_type prepared_id){
|
||||
auto my_list_type = list_type_impl::get_instance(int32_type, true);
|
||||
std::vector<cql3::raw_value> raw_values;
|
||||
auto in_values_list = my_list_type->decompose(make_list_value(my_list_type,
|
||||
list_type_impl::native_type{{int(2), int(0), int(2), int(1)}}));
|
||||
raw_values.emplace_back(cql3::raw_value::make_value(in_values_list));
|
||||
return e.execute_prepared(prepared_id,raw_values);
|
||||
}).then([&e] (shared_ptr<cql_transport::messages::result_message> msg) {
|
||||
assert_that(msg).is_rows().with_rows_ignore_order({
|
||||
{int32_type->decompose(4)},
|
||||
{int32_type->decompose(0)},
|
||||
{int32_type->decompose(1)},
|
||||
{int32_type->decompose(2)},
|
||||
{int32_type->decompose(3)},
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -2622,3 +2637,66 @@ SEASTAR_TEST_CASE(test_empty_partition_range_scan) {
|
||||
assert_that(res).is_rows().is_empty();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_static_multi_cell_static_lists_with_ckey) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
e.execute_cql("CREATE TABLE t (p int, c int, slist list<int> static, v int, PRIMARY KEY (p, c));").get();
|
||||
e.execute_cql("INSERT INTO t (p, c, slist, v) VALUES (1, 1, [1], 1); ").get();
|
||||
|
||||
{
|
||||
e.execute_cql("UPDATE t SET slist[0] = 3, v = 3 WHERE p = 1 AND c = 1;").get();
|
||||
auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0();
|
||||
auto slist_type = list_type_impl::get_instance(int32_type, true);
|
||||
assert_that(msg).is_rows().with_row({
|
||||
{ slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({{3}}))) },
|
||||
{ int32_type->decompose(3) }
|
||||
});
|
||||
}
|
||||
{
|
||||
e.execute_cql("UPDATE t SET slist = [4], v = 4 WHERE p = 1 AND c = 1;").get();
|
||||
auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0();
|
||||
auto slist_type = list_type_impl::get_instance(int32_type, true);
|
||||
assert_that(msg).is_rows().with_row({
|
||||
{ slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({{4}}))) },
|
||||
{ int32_type->decompose(4) }
|
||||
});
|
||||
}
|
||||
{
|
||||
e.execute_cql("UPDATE t SET slist = [3] + slist , v = 5 WHERE p = 1 AND c = 1;").get();
|
||||
auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0();
|
||||
auto slist_type = list_type_impl::get_instance(int32_type, true);
|
||||
assert_that(msg).is_rows().with_row({
|
||||
{ slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({3, 4}))) },
|
||||
{ int32_type->decompose(5) }
|
||||
});
|
||||
}
|
||||
{
|
||||
e.execute_cql("UPDATE t SET slist = slist + [5] , v = 6 WHERE p = 1 AND c = 1;").get();
|
||||
auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0();
|
||||
auto slist_type = list_type_impl::get_instance(int32_type, true);
|
||||
assert_that(msg).is_rows().with_row({
|
||||
{ slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({3, 4, 5}))) },
|
||||
{ int32_type->decompose(6) }
|
||||
});
|
||||
}
|
||||
{
|
||||
e.execute_cql("DELETE slist[2] from t WHERE p = 1;").get();
|
||||
auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0();
|
||||
auto slist_type = list_type_impl::get_instance(int32_type, true);
|
||||
assert_that(msg).is_rows().with_row({
|
||||
{ slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({3, 4}))) },
|
||||
{ int32_type->decompose(6) }
|
||||
});
|
||||
}
|
||||
{
|
||||
e.execute_cql("UPDATE t SET slist = slist - [4] , v = 7 WHERE p = 1 AND c = 1;").get();
|
||||
auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0();
|
||||
auto slist_type = list_type_impl::get_instance(int32_type, true);
|
||||
assert_that(msg).is_rows().with_row({
|
||||
{ slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({3}))) },
|
||||
{ int32_type->decompose(7) }
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -26,11 +26,13 @@
|
||||
|
||||
#include <boost/test/unit_test.hpp>
|
||||
#include <query-result-set.hh>
|
||||
#include <query-result-writer.hh>
|
||||
|
||||
#include "tests/test_services.hh"
|
||||
#include "tests/test-utils.hh"
|
||||
#include "tests/mutation_assertions.hh"
|
||||
#include "tests/result_set_assertions.hh"
|
||||
#include "tests/mutation_source_test.hh"
|
||||
|
||||
#include "mutation_query.hh"
|
||||
#include "core/do_with.hh"
|
||||
@@ -525,3 +527,22 @@ SEASTAR_TEST_CASE(test_partition_limit) {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_result_size_calculation) {
|
||||
random_mutation_generator gen(random_mutation_generator::generate_counters::no);
|
||||
std::vector<mutation> mutations = gen(1);
|
||||
schema_ptr s = gen.schema();
|
||||
mutation_source source = make_source(std::move(mutations));
|
||||
query::result_memory_limiter l;
|
||||
query::partition_slice slice = make_full_slice(*s);
|
||||
slice.options.set<query::partition_slice::option::allow_short_read>();
|
||||
|
||||
query::result::builder digest_only_builder(slice, query::result_options{query::result_request::only_digest, query::digest_algorithm::xxHash}, l.new_digest_read(query::result_memory_limiter::maximum_result_size).get0());
|
||||
data_query(s, source, query::full_partition_range, slice, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(), gc_clock::now(), digest_only_builder).get0();
|
||||
|
||||
query::result::builder result_and_digest_builder(slice, query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash}, l.new_data_read(query::result_memory_limiter::maximum_result_size).get0());
|
||||
data_query(s, source, query::full_partition_range, slice, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(), gc_clock::now(), result_and_digest_builder).get0();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(digest_only_builder.memory_accounter().used_memory(), result_and_digest_builder.memory_accounter().used_memory());
|
||||
}
|
||||
|
||||
|
||||
@@ -30,6 +30,7 @@
|
||||
#include <map>
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
#include <boost/range/algorithm/adjacent_find.hpp>
|
||||
|
||||
static logging::logger nlogger("NetworkTopologyStrategyLogger");
|
||||
|
||||
@@ -52,6 +53,26 @@ void print_natural_endpoints(double point, const std::vector<inet_address> v) {
|
||||
nlogger.debug("{}", strm.str());
|
||||
}
|
||||
|
||||
#ifndef DEBUG
|
||||
static void verify_sorted(const dht::token_range_vector& trv) {
|
||||
auto not_strictly_before = [] (const dht::token_range a, const dht::token_range b) {
|
||||
return !b.start()
|
||||
|| !a.end()
|
||||
|| a.end()->value() > b.start()->value()
|
||||
|| (a.end()->value() == b.start()->value() && a.end()->is_inclusive() && b.start()->is_inclusive());
|
||||
};
|
||||
BOOST_CHECK(boost::adjacent_find(trv, not_strictly_before) == trv.end());
|
||||
}
|
||||
#endif
|
||||
|
||||
static void check_ranges_are_sorted(abstract_replication_strategy* ars, gms::inet_address ep) {
|
||||
// Too slow in debug mode
|
||||
#ifndef DEBUG
|
||||
verify_sorted(ars->get_ranges(ep));
|
||||
verify_sorted(ars->get_primary_ranges(ep));
|
||||
#endif
|
||||
}
|
||||
|
||||
void strategy_sanity_check(
|
||||
abstract_replication_strategy* ars_ptr,
|
||||
const std::map<sstring, sstring>& options) {
|
||||
@@ -150,6 +171,7 @@ void full_ring_check(const std::vector<ring_point>& ring_points,
|
||||
auto endpoints2 = ars_ptr->get_natural_endpoints(t2);
|
||||
|
||||
endpoints_check(ars_ptr, endpoints2);
|
||||
check_ranges_are_sorted(ars_ptr, rp.host);
|
||||
BOOST_CHECK(cache_hit_count + 1 == ars_ptr->get_cache_hits_count());
|
||||
BOOST_CHECK(endpoints1 == endpoints2);
|
||||
}
|
||||
|
||||
@@ -775,3 +775,7 @@ FUNC_START(__crc32_vpmsum)
|
||||
|
||||
FUNC_END(__crc32_vpmsum)
|
||||
#endif
|
||||
|
||||
// Mark the stack as non-executable so the final executable won't
|
||||
// have an executable stack
|
||||
.section .note.GNU-stack,"",@progbits
|
||||
|
||||
@@ -68,10 +68,11 @@ public:
|
||||
|
||||
// Starts a new phase and waits for all operations started in any of the earlier phases.
|
||||
// It is fine to start multiple awaits in parallel.
|
||||
// Strong exception guarantees.
|
||||
future<> advance_and_await() {
|
||||
auto new_gate = make_lw_shared<gate>();
|
||||
++_phase;
|
||||
auto old_gate = std::move(_gate);
|
||||
_gate = make_lw_shared<gate>();
|
||||
auto old_gate = std::exchange(_gate, std::move(new_gate));
|
||||
return old_gate->close().then([old_gate, op = start()] {});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user