Compare commits
74 Commits
next
...
scylla-1.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
906ddc16b2 | ||
|
|
be0b7336d5 | ||
|
|
58d92b304b | ||
|
|
cc8ab6de2e | ||
|
|
5dca11087e | ||
|
|
5971f7f4fa | ||
|
|
b6d2a73c56 | ||
|
|
2db8626dbf | ||
|
|
ba5d52c94e | ||
|
|
ffed8a5603 | ||
|
|
ec51c8e1b8 | ||
|
|
50056a6df6 | ||
|
|
184b62d790 | ||
|
|
f5a1f402f5 | ||
|
|
9f09812733 | ||
|
|
e9b7352adb | ||
|
|
2461a85c0f | ||
|
|
9503145e38 | ||
|
|
9d99dd46cb | ||
|
|
c9dfbf7913 | ||
|
|
4f02a5f4b3 | ||
|
|
7457ed982d | ||
|
|
16a5be622c | ||
|
|
caab57bb24 | ||
|
|
3efa1211ec | ||
|
|
3898216831 | ||
|
|
0af39f2d0c | ||
|
|
e296fef581 | ||
|
|
5ee6a00b0f | ||
|
|
64df5f3f38 | ||
|
|
259b2592d4 | ||
|
|
51eba96c14 | ||
|
|
66e8204c79 | ||
|
|
7f1c63afa3 | ||
|
|
8547f34d60 | ||
|
|
a3078c9b9d | ||
|
|
00692d891e | ||
|
|
94aa879d19 | ||
|
|
8361b01b9d | ||
|
|
67e80fd595 | ||
|
|
b3915e0363 | ||
|
|
985c4ffcc6 | ||
|
|
c56fc99b7f | ||
|
|
85d33e2ee4 | ||
|
|
ffeef2f072 | ||
|
|
d3ffa00eb2 | ||
|
|
ad50d83302 | ||
|
|
c6a9844dfe | ||
|
|
dececbc0b9 | ||
|
|
f2031bf3db | ||
|
|
da77b8885f | ||
|
|
86434378d1 | ||
|
|
e5d24d5940 | ||
|
|
0a2d4204bd | ||
|
|
74b8f63e8f | ||
|
|
9b764b726b | ||
|
|
07ba03ce7b | ||
|
|
de690a6997 | ||
|
|
7b53e969d2 | ||
|
|
c384b23112 | ||
|
|
3688542323 | ||
|
|
7916182cfa | ||
|
|
ec1fd3945f | ||
|
|
653e250d04 | ||
|
|
6255076c20 | ||
|
|
420ebe28fd | ||
|
|
a6179476c5 | ||
|
|
342726a23c | ||
|
|
e9946032f4 | ||
|
|
5e0b113732 | ||
|
|
c70faa4f23 | ||
|
|
15ad4c9033 | ||
|
|
d094329b6e | ||
|
|
dcab915f21 |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#!/bin/sh
|
||||
|
||||
VERSION=666.development
|
||||
VERSION=1.2.5
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -219,8 +219,9 @@ static future<json::json_return_type> sum_sstable(http_context& ctx, const sstr
|
||||
auto uuid = get_uuid(name, ctx.db.local());
|
||||
return ctx.db.map_reduce0([uuid, total](database& db) {
|
||||
std::unordered_map<sstring, uint64_t> m;
|
||||
for (auto t :*((total) ? db.find_column_family(uuid).get_sstables_including_compacted_undeleted() :
|
||||
db.find_column_family(uuid).get_sstables()).get()) {
|
||||
auto sstables = (total) ? db.find_column_family(uuid).get_sstables_including_compacted_undeleted() :
|
||||
db.find_column_family(uuid).get_sstables();
|
||||
for (auto t : *sstables) {
|
||||
m[t.second->get_filename()] = t.second->bytes_on_disk();
|
||||
}
|
||||
return m;
|
||||
@@ -234,8 +235,9 @@ static future<json::json_return_type> sum_sstable(http_context& ctx, const sstr
|
||||
static future<json::json_return_type> sum_sstable(http_context& ctx, bool total) {
|
||||
return map_reduce_cf_raw(ctx, std::unordered_map<sstring, uint64_t>(), [total](column_family& cf) {
|
||||
std::unordered_map<sstring, uint64_t> m;
|
||||
for (auto t :*((total) ? cf.get_sstables_including_compacted_undeleted() :
|
||||
cf.get_sstables()).get()) {
|
||||
auto sstables = (total) ? cf.get_sstables_including_compacted_undeleted() :
|
||||
cf.get_sstables();
|
||||
for (auto t : *sstables) {
|
||||
m[t.second->get_filename()] = t.second->bytes_on_disk();
|
||||
}
|
||||
return m;
|
||||
|
||||
@@ -97,7 +97,7 @@ namespace std {
|
||||
template <>
|
||||
struct hash<auth::data_resource> {
|
||||
size_t operator()(const auth::data_resource & v) const {
|
||||
return std::hash<sstring>()(v.name());
|
||||
return v.hash_value();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -41,6 +41,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "utils/hash.hh"
|
||||
#include <iosfwd>
|
||||
#include <set>
|
||||
#include <seastar/core/sstring.hh>
|
||||
@@ -137,6 +138,10 @@ public:
|
||||
|
||||
bool operator==(const data_resource&) const;
|
||||
bool operator<(const data_resource&) const;
|
||||
|
||||
size_t hash_value() const {
|
||||
return utils::tuple_hash()(_ks, _cf);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
|
||||
@@ -28,7 +28,11 @@ class checked_file_impl : public file_impl {
|
||||
public:
|
||||
|
||||
checked_file_impl(disk_error_signal_type& s, file f)
|
||||
: _signal(s) , _file(f) {}
|
||||
: _signal(s) , _file(f) {
|
||||
_memory_dma_alignment = f.memory_dma_alignment();
|
||||
_disk_read_dma_alignment = f.disk_read_dma_alignment();
|
||||
_disk_write_dma_alignment = f.disk_write_dma_alignment();
|
||||
}
|
||||
|
||||
virtual future<size_t> write_dma(uint64_t pos, const void* buffer, size_t len, const io_priority_class& pc) override {
|
||||
return do_io_check(_signal, [&] {
|
||||
|
||||
127
clustering_bounds_comparator.hh
Normal file
127
clustering_bounds_comparator.hh
Normal file
@@ -0,0 +1,127 @@
|
||||
|
||||
/*
|
||||
* Copyright (C) 2016 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "keys.hh"
|
||||
#include "schema.hh"
|
||||
#include "range.hh"
|
||||
|
||||
/**
|
||||
* Represents the kind of bound in a range tombstone.
|
||||
*/
|
||||
enum class bound_kind : uint8_t {
|
||||
excl_end = 0,
|
||||
incl_start = 1,
|
||||
// values 2 to 5 are reserved for forward Origin compatibility
|
||||
incl_end = 6,
|
||||
excl_start = 7,
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, const bound_kind k);
|
||||
|
||||
bound_kind invert_kind(bound_kind k);
|
||||
int32_t weight(bound_kind k);
|
||||
|
||||
static inline bound_kind flip_bound_kind(bound_kind bk)
|
||||
{
|
||||
switch (bk) {
|
||||
case bound_kind::excl_end: return bound_kind::excl_start;
|
||||
case bound_kind::incl_end: return bound_kind::incl_start;
|
||||
case bound_kind::excl_start: return bound_kind::excl_end;
|
||||
case bound_kind::incl_start: return bound_kind::incl_end;
|
||||
}
|
||||
abort();
|
||||
}
|
||||
|
||||
class bound_view {
|
||||
const static thread_local clustering_key empty_prefix;
|
||||
public:
|
||||
const clustering_key_prefix& prefix;
|
||||
bound_kind kind;
|
||||
bound_view(const clustering_key_prefix& prefix, bound_kind kind)
|
||||
: prefix(prefix)
|
||||
, kind(kind)
|
||||
{ }
|
||||
struct compare {
|
||||
// To make it assignable and to avoid taking a schema_ptr, we
|
||||
// wrap the schema reference.
|
||||
std::reference_wrapper<const schema> _s;
|
||||
compare(const schema& s) : _s(s)
|
||||
{ }
|
||||
bool operator()(const clustering_key_prefix& p1, int32_t w1, const clustering_key_prefix& p2, int32_t w2) const {
|
||||
auto type = _s.get().clustering_key_prefix_type();
|
||||
auto res = prefix_equality_tri_compare(type->types().begin(),
|
||||
type->begin(p1), type->end(p1),
|
||||
type->begin(p2), type->end(p2),
|
||||
tri_compare);
|
||||
if (res) {
|
||||
return res < 0;
|
||||
}
|
||||
auto d1 = p1.size(_s);
|
||||
auto d2 = p2.size(_s);
|
||||
if (d1 == d2) {
|
||||
return w1 < w2;
|
||||
}
|
||||
return d1 < d2 ? w1 <= 0 : w2 > 0;
|
||||
}
|
||||
bool operator()(const bound_view b, const clustering_key_prefix& p) const {
|
||||
return operator()(b.prefix, weight(b.kind), p, 0);
|
||||
}
|
||||
bool operator()(const clustering_key_prefix& p, const bound_view b) const {
|
||||
return operator()(p, 0, b.prefix, weight(b.kind));
|
||||
}
|
||||
bool operator()(const bound_view b1, const bound_view b2) const {
|
||||
return operator()(b1.prefix, weight(b1.kind), b2.prefix, weight(b2.kind));
|
||||
}
|
||||
};
|
||||
bool equal(const schema& s, const bound_view other) const {
|
||||
return kind == other.kind && prefix.equal(s, other.prefix);
|
||||
}
|
||||
bool adjacent(const schema& s, const bound_view other) const {
|
||||
return invert_kind(other.kind) == kind && prefix.equal(s, other.prefix);
|
||||
}
|
||||
static bound_view bottom(const schema& s) {
|
||||
return {empty_prefix, bound_kind::incl_start};
|
||||
}
|
||||
static bound_view top(const schema& s) {
|
||||
return {empty_prefix, bound_kind::incl_end};
|
||||
}
|
||||
/*
|
||||
template<template<typename> typename T, typename U>
|
||||
concept bool Range() {
|
||||
return requires (T<U> range) {
|
||||
{ range.start() } -> stdx::optional<U>;
|
||||
{ range.end() } -> stdx::optional<U>;
|
||||
};
|
||||
};*/
|
||||
template<template<typename> typename Range>
|
||||
static std::pair<bound_view, bound_view> from_range(const schema& s, const Range<clustering_key_prefix>& range) {
|
||||
return {
|
||||
range.start() ? bound_view(range.start()->value(), range.start()->is_inclusive() ? bound_kind::incl_start : bound_kind::excl_start) : bottom(s),
|
||||
range.end() ? bound_view(range.end()->value(), range.end()->is_inclusive() ? bound_kind::incl_end : bound_kind::excl_end) : top(s),
|
||||
};
|
||||
}
|
||||
friend std::ostream& operator<<(std::ostream& out, const bound_view& b) {
|
||||
return out << "{bound: prefix=" << b.prefix << ", kind=" << b.kind << "}";
|
||||
}
|
||||
};
|
||||
@@ -51,6 +51,9 @@ public:
|
||||
// Return a list of sstables to be compacted after applying the strategy.
|
||||
compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<lw_shared_ptr<sstable>> candidates);
|
||||
|
||||
// Return if parallel compaction is allowed by strategy.
|
||||
bool parallel_compaction() const;
|
||||
|
||||
static sstring name(compaction_strategy_type type) {
|
||||
switch (type) {
|
||||
case compaction_strategy_type::null:
|
||||
|
||||
@@ -35,7 +35,7 @@ class converting_mutation_partition_applier : public mutation_partition_visitor
|
||||
deletable_row* _current_row;
|
||||
private:
|
||||
static bool is_compatible(const column_definition& new_def, const data_type& old_type, column_kind kind) {
|
||||
return new_def.kind == kind && new_def.type->is_value_compatible_with(*old_type);
|
||||
return ::is_compatible(new_def.kind, kind) && new_def.type->is_value_compatible_with(*old_type);
|
||||
}
|
||||
void accept_cell(row& dst, column_kind kind, const column_definition& new_def, const data_type& old_type, atomic_cell_view cell) {
|
||||
if (is_compatible(new_def, old_type, kind) && cell.timestamp() > new_def.dropped_at()) {
|
||||
|
||||
@@ -394,6 +394,9 @@ public:
|
||||
return bounds_range_type::bound(prefix, is_inclusive(b));
|
||||
};
|
||||
auto range = bounds_range_type(read_bound(statements::bound::START), read_bound(statements::bound::END));
|
||||
if (query::is_wrap_around(range, *_schema)) {
|
||||
return {};
|
||||
}
|
||||
return { range };
|
||||
}
|
||||
#if 0
|
||||
|
||||
@@ -352,7 +352,9 @@ single_column_primary_key_restrictions<partition_key>::bounds_ranges(const query
|
||||
template<>
|
||||
std::vector<query::clustering_range>
|
||||
single_column_primary_key_restrictions<clustering_key_prefix>::bounds_ranges(const query_options& options) const {
|
||||
auto bounds = compute_bounds(options);
|
||||
auto wrapping_bounds = compute_bounds(options);
|
||||
auto bounds = boost::copy_range<query::clustering_row_ranges>(wrapping_bounds
|
||||
| boost::adaptors::filtered([&](auto&& r) { return !query::is_wrap_around(r, *_schema); }));
|
||||
auto less_cmp = clustering_key_prefix::less_compare(*_schema);
|
||||
std::sort(bounds.begin(), bounds.end(), [&] (query::clustering_range& x, query::clustering_range& y) {
|
||||
if (!x.start() && !y.start()) {
|
||||
|
||||
220
database.cc
220
database.cc
@@ -251,15 +251,24 @@ column_family::make_sstable_reader(schema_ptr s,
|
||||
const query::partition_range& pr,
|
||||
query::clustering_key_filtering_context ck_filtering,
|
||||
const io_priority_class& pc) const {
|
||||
// restricts a reader's concurrency if the configuration specifies it
|
||||
auto restrict_reader = [&] (mutation_reader&& in) {
|
||||
if (_config.read_concurrency_config.sem) {
|
||||
return make_restricted_reader(_config.read_concurrency_config, 1, std::move(in));
|
||||
} else {
|
||||
return std::move(in);
|
||||
}
|
||||
};
|
||||
|
||||
if (pr.is_singular() && pr.start()->value().has_key()) {
|
||||
const dht::ring_position& pos = pr.start()->value();
|
||||
if (dht::shard_of(pos.token()) != engine().cpu_id()) {
|
||||
return make_empty_reader(); // range doesn't belong to this shard
|
||||
}
|
||||
return make_mutation_reader<single_key_sstable_reader>(std::move(s), _sstables, *pos.key(), ck_filtering, pc);
|
||||
return restrict_reader(make_mutation_reader<single_key_sstable_reader>(std::move(s), _sstables, *pos.key(), ck_filtering, pc));
|
||||
} else {
|
||||
// range_sstable_reader is not movable so we need to wrap it
|
||||
return make_mutation_reader<range_sstable_reader>(std::move(s), _sstables, pr, ck_filtering, pc);
|
||||
return restrict_reader(make_mutation_reader<range_sstable_reader>(std::move(s), _sstables, pr, ck_filtering, pc));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -328,7 +337,7 @@ column_family::make_reader(schema_ptr s,
|
||||
}
|
||||
|
||||
std::vector<mutation_reader> readers;
|
||||
readers.reserve(_memtables->size() + _sstables->size());
|
||||
readers.reserve(_memtables->size() + 1);
|
||||
|
||||
// We're assuming that cache and memtables are both read atomically
|
||||
// for single-key queries, so we don't need to special case memtable
|
||||
@@ -484,12 +493,75 @@ static bool belongs_to_current_shard(const schema& s, const partition_key& first
|
||||
return (s1 <= me) && (me <= s2);
|
||||
}
|
||||
|
||||
static bool belongs_to_other_shard(const schema& s, const partition_key& first, const partition_key& last) {
|
||||
auto key_shard = [&s] (const partition_key& pk) {
|
||||
auto token = dht::global_partitioner().get_token(s, pk);
|
||||
return dht::shard_of(token);
|
||||
};
|
||||
auto s1 = key_shard(first);
|
||||
auto s2 = key_shard(last);
|
||||
auto me = engine().cpu_id();
|
||||
return (s1 != me) || (me != s2);
|
||||
}
|
||||
|
||||
static bool belongs_to_current_shard(const schema& s, range<partition_key> r) {
|
||||
assert(r.start());
|
||||
assert(r.end());
|
||||
return belongs_to_current_shard(s, r.start()->value(), r.end()->value());
|
||||
}
|
||||
|
||||
static bool belongs_to_other_shard(const schema& s, range<partition_key> r) {
|
||||
assert(r.start());
|
||||
assert(r.end());
|
||||
return belongs_to_other_shard(s, r.start()->value(), r.end()->value());
|
||||
}
|
||||
|
||||
future<> column_family::load_sstable(sstables::sstable&& sstab, bool reset_level) {
|
||||
auto sst = make_lw_shared<sstables::sstable>(std::move(sstab));
|
||||
return sst->get_sstable_key_range(*_schema).then([this, sst, reset_level] (range<partition_key> r) mutable {
|
||||
// Checks whether or not sstable belongs to current shard.
|
||||
if (!belongs_to_current_shard(*_schema, r)) {
|
||||
dblog.debug("sstable {} not relevant for this shard, ignoring", sst->get_filename());
|
||||
sst->mark_for_deletion();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
bool in_other_shard = belongs_to_other_shard(*_schema, std::move(r));
|
||||
return sst->load().then([this, sst, in_other_shard, reset_level] () mutable {
|
||||
if (in_other_shard) {
|
||||
// If we're here, this sstable is shared by this and other
|
||||
// shard(s). Shared sstables cannot be deleted until all
|
||||
// shards compacted them, so to reduce disk space usage we
|
||||
// want to start splitting them now.
|
||||
// However, we need to delay this compaction until we read all
|
||||
// the sstables belonging to this CF, because we need all of
|
||||
// them to know which tombstones we can drop, and what
|
||||
// generation number is free.
|
||||
_sstables_need_rewrite.push_back(sst);
|
||||
}
|
||||
if (reset_level) {
|
||||
// When loading a migrated sstable, set level to 0 because
|
||||
// it may overlap with existing tables in levels > 0.
|
||||
// This step is optional, because even if we didn't do this
|
||||
// scylla would detect the overlap, and bring back some of
|
||||
// the sstables to level 0.
|
||||
sst->set_sstable_level(0);
|
||||
}
|
||||
add_sstable(sst);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// load_sstable() wants to start rewriting sstables which are shared between
|
||||
// several shards, but we can't start any compaction before all the sstables
|
||||
// of this CF were loaded. So call this function to start rewrites, if any.
|
||||
void column_family::start_rewrite() {
|
||||
for (auto sst : _sstables_need_rewrite) {
|
||||
dblog.info("Splitting {} for shard", sst->get_filename());
|
||||
_compaction_manager.submit_sstable_rewrite(this, sst);
|
||||
}
|
||||
_sstables_need_rewrite.clear();
|
||||
}
|
||||
|
||||
future<sstables::entry_descriptor> column_family::probe_file(sstring sstdir, sstring fname) {
|
||||
|
||||
using namespace sstables;
|
||||
@@ -514,24 +586,9 @@ future<sstables::entry_descriptor> column_family::probe_file(sstring sstdir, sst
|
||||
}
|
||||
}
|
||||
|
||||
auto sst = std::make_unique<sstables::sstable>(_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation, comps.version, comps.format);
|
||||
auto fut = sst->get_sstable_key_range(*_schema);
|
||||
return std::move(fut).then([this, sst = std::move(sst), sstdir = std::move(sstdir), comps] (range<partition_key> r) mutable {
|
||||
// Checks whether or not sstable belongs to current shard.
|
||||
if (!belongs_to_current_shard(*_schema, std::move(r))) {
|
||||
dblog.debug("sstable {} not relevant for this shard, ignoring",
|
||||
sstables::sstable::filename(sstdir, _schema->ks_name(), _schema->cf_name(), comps.version, comps.generation, comps.format,
|
||||
sstables::sstable::component_type::Data));
|
||||
sstable::mark_sstable_for_deletion(_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation, comps.version, comps.format);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
auto fut = sst->load();
|
||||
return std::move(fut).then([this, sst = std::move(sst)] () mutable {
|
||||
add_sstable(std::move(*sst));
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).then_wrapped([fname, comps] (future<> f) {
|
||||
return load_sstable(sstables::sstable(
|
||||
_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation,
|
||||
comps.version, comps.format)).then_wrapped([fname, comps] (future<> f) {
|
||||
try {
|
||||
f.get();
|
||||
} catch (malformed_sstable_exception& e) {
|
||||
@@ -1033,29 +1090,14 @@ future<> column_family::cleanup_sstables(sstables::compaction_descriptor descrip
|
||||
future<>
|
||||
column_family::load_new_sstables(std::vector<sstables::entry_descriptor> new_tables) {
|
||||
return parallel_for_each(new_tables, [this] (auto comps) {
|
||||
auto sst = make_lw_shared<sstables::sstable>(_schema->ks_name(), _schema->cf_name(), _config.datadir, comps.generation, comps.version, comps.format);
|
||||
return sst->load().then([this, sst] {
|
||||
// This sets in-memory level of sstable to 0.
|
||||
// When loading a migrated sstable, it's important to set it to level 0 because
|
||||
// leveled compaction relies on a level > 0 having no overlapping sstables.
|
||||
// If Scylla reboots before migrated sstable gets compacted, leveled strategy
|
||||
// is smart enough to detect a sstable that overlaps and set its in-memory
|
||||
// level to 0.
|
||||
return sst->set_sstable_level(0);
|
||||
}).then([this, sst] {
|
||||
auto first = sst->get_first_partition_key(*_schema);
|
||||
auto last = sst->get_last_partition_key(*_schema);
|
||||
if (belongs_to_current_shard(*_schema, first, last)) {
|
||||
this->add_sstable(sst);
|
||||
} else {
|
||||
sst->mark_for_deletion();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
return this->load_sstable(sstables::sstable(
|
||||
_schema->ks_name(), _schema->cf_name(), _config.datadir,
|
||||
comps.generation, comps.version, comps.format), true);
|
||||
}).then([this] {
|
||||
start_rewrite();
|
||||
// Drop entire cache for this column family because it may be populated
|
||||
// with stale data.
|
||||
get_row_cache().clear();
|
||||
return get_row_cache().clear();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1338,6 +1380,38 @@ database::setup_collectd() {
|
||||
, "total_operations", "total_reads")
|
||||
, scollectd::make_typed(scollectd::data_type::DERIVE, _stats->total_reads)
|
||||
));
|
||||
|
||||
_collectd.push_back(
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("database"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "total_operations", "sstable_read_queue_overloads")
|
||||
, scollectd::make_typed(scollectd::data_type::COUNTER, _stats->sstable_read_queue_overloaded)
|
||||
));
|
||||
|
||||
_collectd.push_back(
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("database"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "queue_length", "active_reads")
|
||||
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] { return max_concurrent_reads() - _read_concurrency_sem.current(); })
|
||||
));
|
||||
_collectd.push_back(
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("database"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "queue_length", "queued_reads")
|
||||
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] { return _read_concurrency_sem.waiters(); })
|
||||
));
|
||||
_collectd.push_back(
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("database"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "queue_length", "active_reads_system_keyspace")
|
||||
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] { return max_system_concurrent_reads() - _system_read_concurrency_sem.current(); })
|
||||
));
|
||||
_collectd.push_back(
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("database"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "queue_length", "queued_reads_system_keyspace")
|
||||
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] { return _system_read_concurrency_sem.waiters(); })
|
||||
));
|
||||
}
|
||||
|
||||
database::~database() {
|
||||
@@ -1360,10 +1434,10 @@ future<> database::populate_keyspace(sstring datadir, sstring ks_name) {
|
||||
} else {
|
||||
dblog.info("Populating Keyspace {}", ks_name);
|
||||
auto& ks = i->second;
|
||||
return parallel_for_each(std::cbegin(_column_families), std::cend(_column_families),
|
||||
[ks_name, &ks] (const std::pair<utils::UUID, lw_shared_ptr<column_family>>& e) {
|
||||
utils::UUID uuid = e.first;
|
||||
lw_shared_ptr<column_family> cf = e.second;
|
||||
return parallel_for_each(ks.metadata()->cf_meta_data() | boost::adaptors::map_values,
|
||||
[ks_name, &ks, this] (schema_ptr s) {
|
||||
utils::UUID uuid = s->id();
|
||||
lw_shared_ptr<column_family> cf = _column_families[uuid];
|
||||
sstring cfname = cf->schema()->cf_name();
|
||||
auto sstdir = ks.column_family_directory(cfname, uuid);
|
||||
dblog.info("Keyspace {}: Reading CF {} ", ks_name, cfname);
|
||||
@@ -1707,6 +1781,7 @@ keyspace::make_column_family_config(const schema& s) const {
|
||||
cfg.max_streaming_memtable_size = _config.max_streaming_memtable_size;
|
||||
cfg.dirty_memory_region_group = _config.dirty_memory_region_group;
|
||||
cfg.streaming_dirty_memory_region_group = _config.streaming_dirty_memory_region_group;
|
||||
cfg.read_concurrency_config = _config.read_concurrency_config;
|
||||
cfg.cf_stats = _config.cf_stats;
|
||||
cfg.enable_incremental_backups = _config.enable_incremental_backups;
|
||||
|
||||
@@ -1889,6 +1964,7 @@ column_family::query(schema_ptr s, const query::read_command& cmd, query::result
|
||||
auto add_partition = [&qs] (uint32_t live_rows, mutation&& m) {
|
||||
auto pb = qs.builder.add_partition(*qs.schema, m.key());
|
||||
m.partition().query_compacted(pb, *qs.schema, live_rows);
|
||||
qs.limit -= live_rows;
|
||||
};
|
||||
return do_with(querying_reader(qs.schema, as_mutation_source(), range, qs.cmd.slice, qs.limit, qs.cmd.timestamp, add_partition),
|
||||
[] (auto&& rd) { return rd.read(); });
|
||||
@@ -1896,10 +1972,10 @@ column_family::query(schema_ptr s, const query::read_command& cmd, query::result
|
||||
return make_ready_future<lw_shared_ptr<query::result>>(
|
||||
make_lw_shared<query::result>(qs.builder.build()));
|
||||
}).finally([lc, this]() mutable {
|
||||
_stats.reads.mark(lc);
|
||||
if (lc.is_start()) {
|
||||
_stats.estimated_read.add(lc.latency(), _stats.reads.hist.count);
|
||||
}
|
||||
_stats.reads.mark(lc);
|
||||
if (lc.is_start()) {
|
||||
_stats.estimated_read.add(lc.latency(), _stats.reads.hist.count);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1936,7 +2012,7 @@ std::unordered_set<sstring> database::get_initial_tokens() {
|
||||
std::unordered_set<sstring> tokens;
|
||||
sstring tokens_string = get_config().initial_token();
|
||||
try {
|
||||
boost::split(tokens, tokens_string, boost::is_any_of(sstring(",")));
|
||||
boost::split(tokens, tokens_string, boost::is_any_of(sstring(", ")));
|
||||
} catch (...) {
|
||||
throw std::runtime_error(sprint("Unable to parse initial_token=%s", tokens_string));
|
||||
}
|
||||
@@ -2166,6 +2242,14 @@ database::make_keyspace_config(const keyspace_metadata& ksm) {
|
||||
}
|
||||
cfg.dirty_memory_region_group = &_dirty_memory_region_group;
|
||||
cfg.streaming_dirty_memory_region_group = &_streaming_dirty_memory_region_group;
|
||||
cfg.read_concurrency_config.sem = &_read_concurrency_sem;
|
||||
cfg.read_concurrency_config.timeout = _cfg->read_request_timeout_in_ms() * 1ms;
|
||||
// Assume a queued read takes up 10kB of memory, and allow 2% of memory to be filled up with such reads.
|
||||
cfg.read_concurrency_config.max_queue_length = memory::stats().total_memory() * 0.02 / 10000;
|
||||
cfg.read_concurrency_config.raise_queue_overloaded_exception = [this] {
|
||||
++_stats->sstable_read_queue_overloaded;
|
||||
throw std::runtime_error("sstable inactive read queue overloaded");
|
||||
};
|
||||
cfg.cf_stats = &_cf_stats;
|
||||
cfg.enable_incremental_backups = _enable_incremental_backups;
|
||||
return cfg;
|
||||
@@ -2257,7 +2341,7 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun
|
||||
// gotten all things to disk. Again, need queue-ish or something.
|
||||
f = cf.flush();
|
||||
} else {
|
||||
cf.clear();
|
||||
f = cf.clear();
|
||||
}
|
||||
|
||||
return cf.run_with_compaction_disabled([f = std::move(f), &cf, auto_snapshot, tsf = std::move(tsf)]() mutable {
|
||||
@@ -2633,21 +2717,29 @@ future<> column_family::flush_streaming_mutations(std::vector<query::partition_r
|
||||
// temporary counter measure.
|
||||
return with_gate(_streaming_flush_gate, [this, ranges = std::move(ranges)] {
|
||||
return _streaming_memtables->seal_active_memtable().finally([this, ranges = std::move(ranges)] {
|
||||
if (_config.enable_cache) {
|
||||
for (auto& range : ranges) {
|
||||
_cache.invalidate(range);
|
||||
}
|
||||
if (!_config.enable_cache) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return do_with(std::move(ranges), [this] (auto& ranges) {
|
||||
return parallel_for_each(ranges, [this](auto&& range) {
|
||||
return _cache.invalidate(range);
|
||||
});
|
||||
});
|
||||
return do_with(std::move(ranges), [this] (auto& ranges) {
|
||||
return parallel_for_each(ranges, [this](auto&& range) {
|
||||
return _cache.invalidate(range);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void column_family::clear() {
|
||||
_cache.clear();
|
||||
future<> column_family::clear() {
|
||||
_memtables->clear();
|
||||
_memtables->add_memtable();
|
||||
_streaming_memtables->clear();
|
||||
_streaming_memtables->add_memtable();
|
||||
return _cache.clear();
|
||||
}
|
||||
|
||||
// NOTE: does not need to be futurized, but might eventually, depending on
|
||||
@@ -2673,13 +2765,13 @@ future<db::replay_position> column_family::discard_sstables(db_clock::time_point
|
||||
|
||||
_sstables = std::move(pruned);
|
||||
dblog.debug("cleaning out row cache");
|
||||
_cache.clear();
|
||||
|
||||
return parallel_for_each(remove, [](sstables::shared_sstable s) {
|
||||
return sstables::delete_atomically({s});
|
||||
}).then([rp] {
|
||||
return make_ready_future<db::replay_position>(rp);
|
||||
}).finally([remove] {}); // keep the objects alive until here.
|
||||
return _cache.clear().then([rp, remove = std::move(remove)] () mutable {
|
||||
return parallel_for_each(remove, [](sstables::shared_sstable s) {
|
||||
return sstables::delete_atomically({s});
|
||||
}).then([rp] {
|
||||
return make_ready_future<db::replay_position>(rp);
|
||||
}).finally([remove] {}); // keep the objects alive until here.
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
21
database.hh
21
database.hh
@@ -249,6 +249,7 @@ public:
|
||||
size_t max_streaming_memtable_size = 5'000'000;
|
||||
logalloc::region_group* dirty_memory_region_group = nullptr;
|
||||
logalloc::region_group* streaming_dirty_memory_region_group = nullptr;
|
||||
restricted_mutation_reader_config read_concurrency_config;
|
||||
::cf_stats* cf_stats = nullptr;
|
||||
};
|
||||
struct no_commitlog {};
|
||||
@@ -310,6 +311,11 @@ private:
|
||||
// have not been deleted yet, so must not GC any tombstones in other sstables
|
||||
// that may delete data in these sstables:
|
||||
std::vector<sstables::shared_sstable> _sstables_compacted_but_not_deleted;
|
||||
// sstables that are shared between several shards so we want to rewrite
|
||||
// them (split the data belonging to this shard to a separate sstable),
|
||||
// but for correct compaction we need to start the compaction only after
|
||||
// reading all sstables.
|
||||
std::vector<sstables::shared_sstable> _sstables_need_rewrite;
|
||||
// Control background fibers waiting for sstables to be deleted
|
||||
seastar::gate _sstable_deletion_gate;
|
||||
// There are situations in which we need to stop writing sstables. Flushers will take
|
||||
@@ -338,6 +344,7 @@ private:
|
||||
void update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable);
|
||||
void add_sstable(sstables::sstable&& sstable);
|
||||
void add_sstable(lw_shared_ptr<sstables::sstable> sstable);
|
||||
future<> load_sstable(sstables::sstable&& sstab, bool reset_level = false);
|
||||
lw_shared_ptr<memtable> new_memtable();
|
||||
lw_shared_ptr<memtable> new_streaming_memtable();
|
||||
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> memt);
|
||||
@@ -463,7 +470,7 @@ public:
|
||||
future<> flush();
|
||||
future<> flush(const db::replay_position&);
|
||||
future<> flush_streaming_mutations(std::vector<query::partition_range> ranges = std::vector<query::partition_range>{});
|
||||
void clear(); // discards memtable(s) without flushing them to disk.
|
||||
future<> clear(); // discards memtable(s) without flushing them to disk.
|
||||
future<db::replay_position> discard_sstables(db_clock::time_point);
|
||||
|
||||
// Important warning: disabling writes will only have an effect in the current shard.
|
||||
@@ -634,6 +641,7 @@ private:
|
||||
future<sstables::entry_descriptor> probe_file(sstring sstdir, sstring fname);
|
||||
void check_valid_rp(const db::replay_position&) const;
|
||||
public:
|
||||
void start_rewrite();
|
||||
// Iterate over all partitions. Protocol is the same as std::all_of(),
|
||||
// so that iteration can be stopped by returning false.
|
||||
future<bool> for_all_partitions_slow(schema_ptr, std::function<bool (const dht::decorated_key&, const mutation_partition&)> func) const;
|
||||
@@ -743,6 +751,7 @@ public:
|
||||
size_t max_streaming_memtable_size = 5'000'000;
|
||||
logalloc::region_group* dirty_memory_region_group = nullptr;
|
||||
logalloc::region_group* streaming_dirty_memory_region_group = nullptr;
|
||||
restricted_mutation_reader_config read_concurrency_config;
|
||||
::cf_stats* cf_stats = nullptr;
|
||||
};
|
||||
private:
|
||||
@@ -822,9 +831,12 @@ public:
|
||||
|
||||
class database {
|
||||
::cf_stats _cf_stats;
|
||||
static constexpr size_t max_concurrent_reads() { return 100; }
|
||||
static constexpr size_t max_system_concurrent_reads() { return 10; }
|
||||
struct db_stats {
|
||||
uint64_t total_writes = 0;
|
||||
uint64_t total_reads = 0;
|
||||
uint64_t sstable_read_queue_overloaded = 0;
|
||||
};
|
||||
|
||||
lw_shared_ptr<db_stats> _stats;
|
||||
@@ -834,6 +846,10 @@ class database {
|
||||
size_t _streaming_memtable_total_space = 500 << 20;
|
||||
logalloc::region_group _dirty_memory_region_group;
|
||||
logalloc::region_group _streaming_dirty_memory_region_group;
|
||||
semaphore _read_concurrency_sem{max_concurrent_reads()};
|
||||
restricted_mutation_reader_config _read_concurrency_config;
|
||||
semaphore _system_read_concurrency_sem{max_system_concurrent_reads()};
|
||||
restricted_mutation_reader_config _system_read_concurrency_config;
|
||||
|
||||
std::unordered_map<sstring, keyspace> _keyspaces;
|
||||
std::unordered_map<utils::UUID, lw_shared_ptr<column_family>> _column_families;
|
||||
@@ -979,6 +995,9 @@ public:
|
||||
std::unordered_set<sstring> get_initial_tokens();
|
||||
std::experimental::optional<gms::inet_address> get_replace_address();
|
||||
bool is_replacing();
|
||||
semaphore& system_keyspace_read_concurrency_sem() {
|
||||
return _system_read_concurrency_sem;
|
||||
}
|
||||
};
|
||||
|
||||
// FIXME: stub
|
||||
|
||||
@@ -711,15 +711,21 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
|
||||
std::map<qualified_name, schema_mutations>&& before,
|
||||
std::map<qualified_name, schema_mutations>&& after)
|
||||
{
|
||||
struct dropped_table {
|
||||
global_schema_ptr schema;
|
||||
utils::joinpoint<db_clock::time_point> jp{[] {
|
||||
return make_ready_future<db_clock::time_point>(db_clock::now());
|
||||
}};
|
||||
};
|
||||
std::vector<global_schema_ptr> created;
|
||||
std::vector<global_schema_ptr> altered;
|
||||
std::vector<global_schema_ptr> dropped;
|
||||
std::vector<dropped_table> dropped;
|
||||
|
||||
auto diff = difference(before, after);
|
||||
for (auto&& key : diff.entries_only_on_left) {
|
||||
auto&& s = proxy.local().get_db().local().find_schema(key.keyspace_name, key.table_name);
|
||||
logger.info("Dropping {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
|
||||
dropped.emplace_back(s);
|
||||
dropped.emplace_back(dropped_table{s});
|
||||
}
|
||||
for (auto&& key : diff.entries_only_on_right) {
|
||||
auto s = create_table_from_mutations(after.at(key));
|
||||
@@ -732,9 +738,7 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
|
||||
altered.emplace_back(s);
|
||||
}
|
||||
|
||||
do_with(utils::make_joinpoint([] { return db_clock::now();})
|
||||
, [&created, &dropped, &altered, &proxy](auto& tsf) {
|
||||
return proxy.local().get_db().invoke_on_all([&created, &dropped, &altered, &tsf] (database& db) {
|
||||
proxy.local().get_db().invoke_on_all([&created, &dropped, &altered] (database& db) {
|
||||
return seastar::async([&] {
|
||||
for (auto&& gs : created) {
|
||||
schema_ptr s = gs.get();
|
||||
@@ -749,14 +753,13 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
|
||||
for (auto&& gs : altered) {
|
||||
update_column_family(db, gs.get()).get();
|
||||
}
|
||||
parallel_for_each(dropped.begin(), dropped.end(), [&db, &tsf](auto&& gs) {
|
||||
schema_ptr s = gs.get();
|
||||
return db.drop_column_family(s->ks_name(), s->cf_name(), [&tsf] { return tsf.value(); }).then([s] {
|
||||
parallel_for_each(dropped.begin(), dropped.end(), [&db](dropped_table& dt) {
|
||||
schema_ptr s = dt.schema.get();
|
||||
return db.drop_column_family(s->ks_name(), s->cf_name(), [&dt] { return dt.jp.value(); }).then([s] {
|
||||
return service::get_local_migration_manager().notify_drop_column_family(s);
|
||||
});
|
||||
}).get();
|
||||
});
|
||||
});
|
||||
}).get();
|
||||
}
|
||||
|
||||
|
||||
@@ -1022,6 +1022,10 @@ void make(database& db, bool durable, bool volatile_testing_only) {
|
||||
kscfg.enable_disk_writes = !volatile_testing_only;
|
||||
kscfg.enable_commitlog = !volatile_testing_only;
|
||||
kscfg.enable_cache = true;
|
||||
// don't make system keyspace reads wait for user reads
|
||||
kscfg.read_concurrency_config.sem = &db.system_keyspace_read_concurrency_sem();
|
||||
kscfg.read_concurrency_config.timeout = {};
|
||||
kscfg.read_concurrency_config.max_queue_length = std::numeric_limits<size_t>::max();
|
||||
keyspace _ks{ksm, std::move(kscfg)};
|
||||
auto rs(locator::abstract_replication_strategy::create_replication_strategy(NAME, "LocalStrategy", service::get_local_storage_service().get_token_metadata(), ksm->strategy_options()));
|
||||
_ks.set_replication_strategy(std::move(rs));
|
||||
|
||||
@@ -36,6 +36,8 @@ extern thread_local disk_error_signal_type sstable_read_error;
|
||||
extern thread_local disk_error_signal_type sstable_write_error;
|
||||
extern thread_local disk_error_signal_type general_disk_error;
|
||||
|
||||
bool should_stop_on_system_error(const std::system_error& e);
|
||||
|
||||
template<typename Func, typename... Args>
|
||||
std::enable_if_t<!is_future<std::result_of_t<Func(Args&&...)>>::value,
|
||||
std::result_of_t<Func(Args&&...)>>
|
||||
@@ -44,7 +46,7 @@ do_io_check(disk_error_signal_type& signal, Func&& func, Args&&... args) {
|
||||
// calling function
|
||||
return func(std::forward<Args>(args)...);
|
||||
} catch (std::system_error& e) {
|
||||
if (is_system_error_errno(EIO)) {
|
||||
if (should_stop_on_system_error(e)) {
|
||||
signal();
|
||||
throw storage_io_error(e);
|
||||
}
|
||||
@@ -62,7 +64,7 @@ auto do_io_check(disk_error_signal_type& signal, Func&& func, Args&&... args) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
} catch (std::system_error& sys_err) {
|
||||
if (is_system_error_errno(EIO)) {
|
||||
if (should_stop_on_system_error(sys_err)) {
|
||||
signal();
|
||||
throw storage_io_error(sys_err);
|
||||
}
|
||||
@@ -70,7 +72,7 @@ auto do_io_check(disk_error_signal_type& signal, Func&& func, Args&&... args) {
|
||||
return futurize<std::result_of_t<Func(Args&&...)>>::make_exception_future(ep);
|
||||
});
|
||||
} catch (std::system_error& e) {
|
||||
if (is_system_error_errno(EIO)) {
|
||||
if (should_stop_on_system_error(e)) {
|
||||
signal();
|
||||
throw storage_io_error(e);
|
||||
}
|
||||
|
||||
2
dist/ami/files/scylla-ami
vendored
2
dist/ami/files/scylla-ami
vendored
Submodule dist/ami/files/scylla-ami updated: 72ae2580c1...863cc4598a
4
dist/common/scripts/scylla_io_setup
vendored
4
dist/common/scripts/scylla_io_setup
vendored
@@ -44,8 +44,8 @@ output_to_user()
|
||||
}
|
||||
|
||||
if [ `is_developer_mode` -eq 0 ]; then
|
||||
SMP=`echo $SCYLLA_ARGS|grep smp|sed -e "s/^.*smp\(\s\+\|=\)\([0-9]*\).*$/\2/"`
|
||||
CPUSET=`echo $SCYLLA_ARGS|grep cpuset|sed -e "s/^.*\(--cpuset\(\s\+\|=\)[0-9\-]*\).*$/\1/"`
|
||||
SMP=`echo $CPUSET|grep smp|sed -e "s/^.*smp\(\s\+\|=\)\([0-9]*\).*$/\2/"`
|
||||
CPUSET=`echo $CPUSET|grep cpuset|sed -e "s/^.*\(--cpuset\(\s\+\|=\)[0-9\-]*\).*$/\1/"`
|
||||
if [ $AMI_OPT -eq 1 ]; then
|
||||
NR_CPU=`cat /proc/cpuinfo |grep processor|wc -l`
|
||||
NR_DISKS=`lsblk --list --nodeps --noheadings | grep -v xvda | grep xvd | wc -l`
|
||||
|
||||
5
dist/common/systemd/scylla-server.service.in
vendored
5
dist/common/systemd/scylla-server.service.in
vendored
@@ -2,6 +2,7 @@
|
||||
Description=Scylla Server
|
||||
|
||||
[Service]
|
||||
PermissionsStartOnly=true
|
||||
Type=notify
|
||||
LimitMEMLOCK=infinity
|
||||
LimitNOFILE=200000
|
||||
@@ -10,9 +11,9 @@ LimitNPROC=8096
|
||||
EnvironmentFile=@@SYSCONFDIR@@/scylla-server
|
||||
EnvironmentFile=/etc/scylla.d/*.conf
|
||||
WorkingDirectory=$SCYLLA_HOME
|
||||
ExecStartPre=/usr/bin/sudo /usr/lib/scylla/scylla_prepare
|
||||
ExecStartPre=/usr/lib/scylla/scylla_prepare
|
||||
ExecStart=/usr/bin/scylla $SCYLLA_ARGS $SEASTAR_IO $DEV_MODE $CPUSET
|
||||
ExecStopPost=/usr/bin/sudo /usr/lib/scylla/scylla_stop
|
||||
ExecStopPost=/usr/lib/scylla/scylla_stop
|
||||
TimeoutStartSec=900
|
||||
KillMode=process
|
||||
Restart=on-abnormal
|
||||
|
||||
2
dist/docker/redhat/Dockerfile
vendored
2
dist/docker/redhat/Dockerfile
vendored
@@ -2,7 +2,7 @@ FROM centos:7
|
||||
|
||||
MAINTAINER Avi Kivity <avi@cloudius-systems.com>
|
||||
|
||||
RUN curl http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo -o /etc/yum.repos.d/scylla.repo
|
||||
RUN curl http://downloads.scylladb.com/rpm/centos/scylla-1.2.repo -o /etc/yum.repos.d/scylla.repo
|
||||
RUN yum -y install epel-release
|
||||
RUN yum -y clean expire-cache
|
||||
RUN yum -y update
|
||||
|
||||
117
dist/redhat/centos_dep/gcc.diff
vendored
117
dist/redhat/centos_dep/gcc.diff
vendored
@@ -1,5 +1,5 @@
|
||||
--- gcc.spec.orig 2015-12-08 16:03:46.000000000 +0000
|
||||
+++ gcc.spec 2016-01-21 08:47:49.160667342 +0000
|
||||
+++ gcc.spec 2016-07-10 06:07:27.612453480 +0000
|
||||
@@ -1,6 +1,7 @@
|
||||
%global DATE 20151207
|
||||
%global SVNREV 231358
|
||||
@@ -8,7 +8,24 @@
|
||||
# Note, gcc_release must be integer, if you want to add suffixes to
|
||||
# %{release}, append them after %{gcc_release} on Release: line.
|
||||
%global gcc_release 2
|
||||
@@ -84,7 +85,8 @@
|
||||
@@ -9,16 +10,8 @@
|
||||
# Hardening slows the compiler way too much.
|
||||
%undefine _hardened_build
|
||||
%global multilib_64_archs sparc64 ppc64 ppc64p7 s390x x86_64
|
||||
-%ifarch %{ix86} x86_64 ia64 ppc ppc64 ppc64p7 alpha %{arm} aarch64
|
||||
-%global build_ada 1
|
||||
-%else
|
||||
%global build_ada 0
|
||||
-%endif
|
||||
-%ifarch %{ix86} x86_64 ppc ppc64 ppc64le ppc64p7 s390 s390x %{arm} aarch64
|
||||
-%global build_go 1
|
||||
-%else
|
||||
%global build_go 0
|
||||
-%endif
|
||||
%ifarch %{ix86} x86_64 ia64
|
||||
%global build_libquadmath 1
|
||||
%else
|
||||
@@ -84,7 +77,8 @@
|
||||
%global multilib_32_arch i686
|
||||
%endif
|
||||
Summary: Various compilers (C, C++, Objective-C, Java, ...)
|
||||
@@ -18,7 +35,7 @@
|
||||
Version: %{gcc_version}
|
||||
Release: %{gcc_release}%{?dist}
|
||||
# libgcc, libgfortran, libgomp, libstdc++ and crtstuff have
|
||||
@@ -99,6 +101,7 @@
|
||||
@@ -99,6 +93,7 @@
|
||||
%global isl_version 0.14
|
||||
URL: http://gcc.gnu.org
|
||||
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n)
|
||||
@@ -26,7 +43,7 @@
|
||||
# Need binutils with -pie support >= 2.14.90.0.4-4
|
||||
# Need binutils which can omit dot symbols and overlap .opd on ppc64 >= 2.15.91.0.2-4
|
||||
# Need binutils which handle -msecure-plt on ppc >= 2.16.91.0.2-2
|
||||
@@ -110,7 +113,7 @@
|
||||
@@ -110,7 +105,7 @@
|
||||
# Need binutils which support .cfi_sections >= 2.19.51.0.14-33
|
||||
# Need binutils which support --no-add-needed >= 2.20.51.0.2-12
|
||||
# Need binutils which support -plugin
|
||||
@@ -35,7 +52,7 @@
|
||||
# While gcc doesn't include statically linked binaries, during testing
|
||||
# -static is used several times.
|
||||
BuildRequires: glibc-static
|
||||
@@ -145,15 +148,15 @@
|
||||
@@ -145,15 +140,15 @@
|
||||
BuildRequires: libunwind >= 0.98
|
||||
%endif
|
||||
%if %{build_isl}
|
||||
@@ -55,7 +72,7 @@
|
||||
# Need .eh_frame ld optimizations
|
||||
# Need proper visibility support
|
||||
# Need -pie support
|
||||
@@ -168,7 +171,7 @@
|
||||
@@ -168,7 +163,7 @@
|
||||
# Need binutils that support .cfi_sections
|
||||
# Need binutils that support --no-add-needed
|
||||
# Need binutils that support -plugin
|
||||
@@ -64,7 +81,7 @@
|
||||
# Make sure gdb will understand DW_FORM_strp
|
||||
Conflicts: gdb < 5.1-2
|
||||
Requires: glibc-devel >= 2.2.90-12
|
||||
@@ -176,17 +179,15 @@
|
||||
@@ -176,17 +171,15 @@
|
||||
# Make sure glibc supports TFmode long double
|
||||
Requires: glibc >= 2.3.90-35
|
||||
%endif
|
||||
@@ -86,7 +103,7 @@
|
||||
Requires(post): /sbin/install-info
|
||||
Requires(preun): /sbin/install-info
|
||||
AutoReq: true
|
||||
@@ -228,12 +229,12 @@
|
||||
@@ -228,12 +221,12 @@
|
||||
The gcc package contains the GNU Compiler Collection version 5.
|
||||
You'll need this package in order to compile C code.
|
||||
|
||||
@@ -101,7 +118,7 @@
|
||||
%endif
|
||||
Obsoletes: libmudflap
|
||||
Obsoletes: libmudflap-devel
|
||||
@@ -241,17 +242,19 @@
|
||||
@@ -241,17 +234,19 @@
|
||||
Obsoletes: libgcj < %{version}-%{release}
|
||||
Obsoletes: libgcj-devel < %{version}-%{release}
|
||||
Obsoletes: libgcj-src < %{version}-%{release}
|
||||
@@ -125,7 +142,7 @@
|
||||
Autoreq: true
|
||||
|
||||
%description c++
|
||||
@@ -259,50 +262,55 @@
|
||||
@@ -259,50 +254,55 @@
|
||||
It includes support for most of the current C++ specification,
|
||||
including templates and exception handling.
|
||||
|
||||
@@ -193,7 +210,7 @@
|
||||
Autoreq: true
|
||||
|
||||
%description objc
|
||||
@@ -313,29 +321,32 @@
|
||||
@@ -313,29 +313,32 @@
|
||||
%package objc++
|
||||
Summary: Objective-C++ support for GCC
|
||||
Group: Development/Languages
|
||||
@@ -233,7 +250,7 @@
|
||||
%endif
|
||||
Requires(post): /sbin/install-info
|
||||
Requires(preun): /sbin/install-info
|
||||
@@ -345,260 +356,286 @@
|
||||
@@ -345,260 +348,286 @@
|
||||
The gcc-gfortran package provides support for compiling Fortran
|
||||
programs with the GNU Compiler Collection.
|
||||
|
||||
@@ -592,7 +609,7 @@
|
||||
Cpp is the GNU C-Compatible Compiler Preprocessor.
|
||||
Cpp is a macro processor which is used automatically
|
||||
by the C compiler to transform your program before actual
|
||||
@@ -623,8 +660,9 @@
|
||||
@@ -623,8 +652,9 @@
|
||||
%package gnat
|
||||
Summary: Ada 83, 95, 2005 and 2012 support for GCC
|
||||
Group: Development/Languages
|
||||
@@ -604,7 +621,7 @@
|
||||
Requires(post): /sbin/install-info
|
||||
Requires(preun): /sbin/install-info
|
||||
Autoreq: true
|
||||
@@ -633,82 +671,90 @@
|
||||
@@ -633,82 +663,90 @@
|
||||
GNAT is a GNU Ada 83, 95, 2005 and 2012 front-end to GCC. This package includes
|
||||
development tools, the documents and Ada compiler.
|
||||
|
||||
@@ -717,7 +734,7 @@
|
||||
Requires: gmp-devel >= 4.1.2-8, mpfr-devel >= 2.2.1, libmpc-devel >= 0.8.1
|
||||
|
||||
%description plugin-devel
|
||||
@@ -728,7 +774,8 @@
|
||||
@@ -728,7 +766,8 @@
|
||||
Summary: Debug information for package %{name}
|
||||
Group: Development/Debug
|
||||
AutoReqProv: 0
|
||||
@@ -727,7 +744,7 @@
|
||||
|
||||
%description debuginfo
|
||||
This package provides debug information for package %{name}.
|
||||
@@ -958,11 +1005,11 @@
|
||||
@@ -958,11 +997,11 @@
|
||||
--enable-gnu-unique-object --enable-linker-build-id --with-linker-hash-style=gnu \
|
||||
--enable-plugin --enable-initfini-array \
|
||||
--disable-libgcj \
|
||||
@@ -741,7 +758,7 @@
|
||||
%else
|
||||
--without-isl \
|
||||
%endif
|
||||
@@ -971,11 +1018,9 @@
|
||||
@@ -971,11 +1010,9 @@
|
||||
%else
|
||||
--disable-libmpx \
|
||||
%endif
|
||||
@@ -753,7 +770,7 @@
|
||||
%ifarch %{arm}
|
||||
--disable-sjlj-exceptions \
|
||||
%endif
|
||||
@@ -1006,9 +1051,6 @@
|
||||
@@ -1006,9 +1043,6 @@
|
||||
%if 0%{?rhel} >= 7
|
||||
--with-cpu-32=power8 --with-tune-32=power8 --with-cpu-64=power8 --with-tune-64=power8 \
|
||||
%endif
|
||||
@@ -763,7 +780,7 @@
|
||||
%endif
|
||||
%ifarch ppc
|
||||
--build=%{gcc_target_platform} --target=%{gcc_target_platform} --with-cpu=default32
|
||||
@@ -1270,16 +1312,15 @@
|
||||
@@ -1270,16 +1304,15 @@
|
||||
mv %{buildroot}%{_prefix}/%{_lib}/libmpx.spec $FULLPATH/
|
||||
%endif
|
||||
|
||||
@@ -786,7 +803,7 @@
|
||||
%endif
|
||||
%ifarch ppc
|
||||
rm -f $FULLPATH/libgcc_s.so
|
||||
@@ -1819,7 +1860,7 @@
|
||||
@@ -1819,7 +1852,7 @@
|
||||
chmod 755 %{buildroot}%{_prefix}/bin/c?9
|
||||
|
||||
cd ..
|
||||
@@ -795,7 +812,7 @@
|
||||
%find_lang cpplib
|
||||
|
||||
# Remove binaries we will not be including, so that they don't end up in
|
||||
@@ -1869,11 +1910,7 @@
|
||||
@@ -1869,11 +1902,7 @@
|
||||
|
||||
# run the tests.
|
||||
make %{?_smp_mflags} -k check ALT_CC_UNDER_TEST=gcc ALT_CXX_UNDER_TEST=g++ \
|
||||
@@ -807,7 +824,7 @@
|
||||
echo ====================TESTING=========================
|
||||
( LC_ALL=C ../contrib/test_summary || : ) 2>&1 | sed -n '/^cat.*EOF/,/^EOF/{/^cat.*EOF/d;/^EOF/d;/^LAST_UPDATED:/d;p;}'
|
||||
echo ====================TESTING END=====================
|
||||
@@ -1900,13 +1937,13 @@
|
||||
@@ -1900,13 +1929,13 @@
|
||||
--info-dir=%{_infodir} %{_infodir}/gcc.info.gz || :
|
||||
fi
|
||||
|
||||
@@ -823,7 +840,7 @@
|
||||
if [ $1 = 0 -a -f %{_infodir}/cpp.info.gz ]; then
|
||||
/sbin/install-info --delete \
|
||||
--info-dir=%{_infodir} %{_infodir}/cpp.info.gz || :
|
||||
@@ -1945,19 +1982,19 @@
|
||||
@@ -1945,19 +1974,19 @@
|
||||
fi
|
||||
|
||||
%post go
|
||||
@@ -846,7 +863,7 @@
|
||||
if posix.access ("/sbin/ldconfig", "x") then
|
||||
local pid = posix.fork ()
|
||||
if pid == 0 then
|
||||
@@ -1967,7 +2004,7 @@
|
||||
@@ -1967,7 +1996,7 @@
|
||||
end
|
||||
end
|
||||
|
||||
@@ -855,7 +872,7 @@
|
||||
if posix.access ("/sbin/ldconfig", "x") then
|
||||
local pid = posix.fork ()
|
||||
if pid == 0 then
|
||||
@@ -1977,120 +2014,120 @@
|
||||
@@ -1977,120 +2006,120 @@
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1014,7 +1031,7 @@
|
||||
%defattr(-,root,root,-)
|
||||
%{_prefix}/bin/cc
|
||||
%{_prefix}/bin/c89
|
||||
@@ -2414,7 +2451,7 @@
|
||||
@@ -2414,7 +2443,7 @@
|
||||
%{!?_licensedir:%global license %%doc}
|
||||
%license gcc/COPYING* COPYING.RUNTIME
|
||||
|
||||
@@ -1023,7 +1040,7 @@
|
||||
%defattr(-,root,root,-)
|
||||
%{_prefix}/lib/cpp
|
||||
%{_prefix}/bin/cpp
|
||||
@@ -2425,10 +2462,10 @@
|
||||
@@ -2425,10 +2454,10 @@
|
||||
%dir %{_prefix}/libexec/gcc/%{gcc_target_platform}/%{gcc_version}
|
||||
%{_prefix}/libexec/gcc/%{gcc_target_platform}/%{gcc_version}/cc1
|
||||
|
||||
@@ -1037,7 +1054,7 @@
|
||||
%{!?_licensedir:%global license %%doc}
|
||||
%license gcc/COPYING* COPYING.RUNTIME
|
||||
|
||||
@@ -2469,7 +2506,7 @@
|
||||
@@ -2469,7 +2498,7 @@
|
||||
%endif
|
||||
%doc rpm.doc/changelogs/gcc/cp/ChangeLog*
|
||||
|
||||
@@ -1046,7 +1063,7 @@
|
||||
%defattr(-,root,root,-)
|
||||
%{_prefix}/%{_lib}/libstdc++.so.6*
|
||||
%dir %{_datadir}/gdb
|
||||
@@ -2481,7 +2518,7 @@
|
||||
@@ -2481,7 +2510,7 @@
|
||||
%dir %{_prefix}/share/gcc-%{gcc_version}/python
|
||||
%{_prefix}/share/gcc-%{gcc_version}/python/libstdcxx
|
||||
|
||||
@@ -1055,7 +1072,7 @@
|
||||
%defattr(-,root,root,-)
|
||||
%dir %{_prefix}/include/c++
|
||||
%dir %{_prefix}/include/c++/%{gcc_version}
|
||||
@@ -2507,7 +2544,7 @@
|
||||
@@ -2507,7 +2536,7 @@
|
||||
%endif
|
||||
%doc rpm.doc/changelogs/libstdc++-v3/ChangeLog* libstdc++-v3/README*
|
||||
|
||||
@@ -1064,7 +1081,7 @@
|
||||
%defattr(-,root,root,-)
|
||||
%dir %{_prefix}/lib/gcc
|
||||
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
|
||||
@@ -2528,7 +2565,7 @@
|
||||
@@ -2528,7 +2557,7 @@
|
||||
%endif
|
||||
|
||||
%if %{build_libstdcxx_docs}
|
||||
@@ -1073,7 +1090,7 @@
|
||||
%defattr(-,root,root)
|
||||
%{_mandir}/man3/*
|
||||
%doc rpm.doc/libstdc++-v3/html
|
||||
@@ -2567,7 +2604,7 @@
|
||||
@@ -2567,7 +2596,7 @@
|
||||
%dir %{_prefix}/libexec/gcc/%{gcc_target_platform}/%{gcc_version}
|
||||
%{_prefix}/libexec/gcc/%{gcc_target_platform}/%{gcc_version}/cc1objplus
|
||||
|
||||
@@ -1082,7 +1099,7 @@
|
||||
%defattr(-,root,root,-)
|
||||
%{_prefix}/%{_lib}/libobjc.so.4*
|
||||
|
||||
@@ -2621,11 +2658,11 @@
|
||||
@@ -2621,11 +2650,11 @@
|
||||
%endif
|
||||
%doc rpm.doc/gfortran/*
|
||||
|
||||
@@ -1096,7 +1113,7 @@
|
||||
%defattr(-,root,root,-)
|
||||
%dir %{_prefix}/lib/gcc
|
||||
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
|
||||
@@ -2671,12 +2708,12 @@
|
||||
@@ -2671,12 +2700,12 @@
|
||||
%{_prefix}/libexec/gcc/%{gcc_target_platform}/%{gcc_version}/gnat1
|
||||
%doc rpm.doc/changelogs/gcc/ada/ChangeLog*
|
||||
|
||||
@@ -1111,7 +1128,7 @@
|
||||
%defattr(-,root,root,-)
|
||||
%dir %{_prefix}/lib/gcc
|
||||
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
|
||||
@@ -2702,7 +2739,7 @@
|
||||
@@ -2702,7 +2731,7 @@
|
||||
%exclude %{_prefix}/lib/gcc/%{gcc_target_platform}/%{gcc_version}/adalib/libgnarl.a
|
||||
%endif
|
||||
|
||||
@@ -1120,7 +1137,7 @@
|
||||
%defattr(-,root,root,-)
|
||||
%dir %{_prefix}/lib/gcc
|
||||
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
|
||||
@@ -2726,7 +2763,7 @@
|
||||
@@ -2726,7 +2755,7 @@
|
||||
%endif
|
||||
%endif
|
||||
|
||||
@@ -1129,7 +1146,7 @@
|
||||
%defattr(-,root,root,-)
|
||||
%{_prefix}/%{_lib}/libgomp.so.1*
|
||||
%{_prefix}/%{_lib}/libgomp-plugin-host_nonshm.so.1*
|
||||
@@ -2734,14 +2771,14 @@
|
||||
@@ -2734,14 +2763,14 @@
|
||||
%doc rpm.doc/changelogs/libgomp/ChangeLog*
|
||||
|
||||
%if %{build_libquadmath}
|
||||
@@ -1146,7 +1163,7 @@
|
||||
%defattr(-,root,root,-)
|
||||
%dir %{_prefix}/lib/gcc
|
||||
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
|
||||
@@ -2754,7 +2791,7 @@
|
||||
@@ -2754,7 +2783,7 @@
|
||||
%endif
|
||||
%doc rpm.doc/libquadmath/ChangeLog*
|
||||
|
||||
@@ -1155,7 +1172,7 @@
|
||||
%defattr(-,root,root,-)
|
||||
%dir %{_prefix}/lib/gcc
|
||||
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
|
||||
@@ -2773,12 +2810,12 @@
|
||||
@@ -2773,12 +2802,12 @@
|
||||
%endif
|
||||
|
||||
%if %{build_libitm}
|
||||
@@ -1170,7 +1187,7 @@
|
||||
%defattr(-,root,root,-)
|
||||
%dir %{_prefix}/lib/gcc
|
||||
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
|
||||
@@ -2791,7 +2828,7 @@
|
||||
@@ -2791,7 +2820,7 @@
|
||||
%endif
|
||||
%doc rpm.doc/libitm/ChangeLog*
|
||||
|
||||
@@ -1179,7 +1196,7 @@
|
||||
%defattr(-,root,root,-)
|
||||
%dir %{_prefix}/lib/gcc
|
||||
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
|
||||
@@ -2810,11 +2847,11 @@
|
||||
@@ -2810,11 +2839,11 @@
|
||||
%endif
|
||||
|
||||
%if %{build_libatomic}
|
||||
@@ -1193,7 +1210,7 @@
|
||||
%defattr(-,root,root,-)
|
||||
%dir %{_prefix}/lib/gcc
|
||||
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
|
||||
@@ -2834,11 +2871,11 @@
|
||||
@@ -2834,11 +2863,11 @@
|
||||
%endif
|
||||
|
||||
%if %{build_libasan}
|
||||
@@ -1207,7 +1224,7 @@
|
||||
%defattr(-,root,root,-)
|
||||
%dir %{_prefix}/lib/gcc
|
||||
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
|
||||
@@ -2860,11 +2897,11 @@
|
||||
@@ -2860,11 +2889,11 @@
|
||||
%endif
|
||||
|
||||
%if %{build_libubsan}
|
||||
@@ -1221,7 +1238,7 @@
|
||||
%defattr(-,root,root,-)
|
||||
%dir %{_prefix}/lib/gcc
|
||||
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
|
||||
@@ -2886,11 +2923,11 @@
|
||||
@@ -2886,11 +2915,11 @@
|
||||
%endif
|
||||
|
||||
%if %{build_libtsan}
|
||||
@@ -1235,7 +1252,7 @@
|
||||
%defattr(-,root,root,-)
|
||||
%dir %{_prefix}/lib/gcc
|
||||
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
|
||||
@@ -2902,11 +2939,11 @@
|
||||
@@ -2902,11 +2931,11 @@
|
||||
%endif
|
||||
|
||||
%if %{build_liblsan}
|
||||
@@ -1249,7 +1266,7 @@
|
||||
%defattr(-,root,root,-)
|
||||
%dir %{_prefix}/lib/gcc
|
||||
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
|
||||
@@ -2918,11 +2955,11 @@
|
||||
@@ -2918,11 +2947,11 @@
|
||||
%endif
|
||||
|
||||
%if %{build_libcilkrts}
|
||||
@@ -1263,7 +1280,7 @@
|
||||
%defattr(-,root,root,-)
|
||||
%dir %{_prefix}/lib/gcc
|
||||
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
|
||||
@@ -2942,12 +2979,12 @@
|
||||
@@ -2942,12 +2971,12 @@
|
||||
%endif
|
||||
|
||||
%if %{build_libmpx}
|
||||
@@ -1278,7 +1295,7 @@
|
||||
%defattr(-,root,root,-)
|
||||
%dir %{_prefix}/lib/gcc
|
||||
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
|
||||
@@ -3009,12 +3046,12 @@
|
||||
@@ -3009,12 +3038,12 @@
|
||||
%endif
|
||||
%doc rpm.doc/go/*
|
||||
|
||||
@@ -1293,7 +1310,7 @@
|
||||
%defattr(-,root,root,-)
|
||||
%dir %{_prefix}/lib/gcc
|
||||
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
|
||||
@@ -3042,7 +3079,7 @@
|
||||
@@ -3042,7 +3071,7 @@
|
||||
%{_prefix}/lib/gcc/%{gcc_target_platform}/%{gcc_version}/libgo.so
|
||||
%endif
|
||||
|
||||
@@ -1302,7 +1319,7 @@
|
||||
%defattr(-,root,root,-)
|
||||
%dir %{_prefix}/lib/gcc
|
||||
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
|
||||
@@ -3060,12 +3097,12 @@
|
||||
@@ -3060,12 +3089,12 @@
|
||||
%endif
|
||||
%endif
|
||||
|
||||
|
||||
9
dist/redhat/scylla.spec.in
vendored
9
dist/redhat/scylla.spec.in
vendored
@@ -104,11 +104,6 @@ cp -P dist/common/sbin/* $RPM_BUILD_ROOT%{_sbindir}/
|
||||
%pre server
|
||||
/usr/sbin/groupadd scylla 2> /dev/null || :
|
||||
/usr/sbin/useradd -g scylla -s /sbin/nologin -r -d %{_sharedstatedir}/scylla scylla 2> /dev/null || :
|
||||
%if 0%{?rhel}
|
||||
sed -e "s/Defaults requiretty/#Defaults requiretty/" /etc/sudoers > /tmp/sudoers
|
||||
cp /tmp/sudoers /etc/sudoers
|
||||
rm /tmp/sudoers
|
||||
%endif
|
||||
|
||||
%post server
|
||||
# Upgrade coredump settings
|
||||
@@ -214,7 +209,9 @@ This package contains Linux kernel configuration changes for the Scylla database
|
||||
if Scylla is the main application on your server and you wish to optimize its latency and throughput.
|
||||
|
||||
%post kernel-conf
|
||||
%sysctl_apply 99-scylla-sched.conf
|
||||
# We cannot use the sysctl_apply rpm macro because it is not present in 7.0
|
||||
# following is a "manual" expansion
|
||||
/usr/lib/systemd/systemd-sysctl 99-scylla-sched.conf >/dev/null 2>&1 || :
|
||||
|
||||
%files kernel-conf
|
||||
%defattr(-,root,root)
|
||||
|
||||
41
keys.cc
41
keys.cc
@@ -23,6 +23,7 @@
|
||||
|
||||
#include "keys.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "clustering_bounds_comparator.hh"
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, const partition_key& pk) {
|
||||
return out << "pk{" << to_hex(pk) << "}";
|
||||
@@ -52,3 +53,43 @@ partition_key_view::ring_order_tri_compare(const schema& s, partition_key_view k
|
||||
}
|
||||
return legacy_tri_compare(s, k2);
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, const bound_kind k) {
|
||||
switch(k) {
|
||||
case bound_kind::excl_end:
|
||||
return out << "excl end";
|
||||
case bound_kind::incl_start:
|
||||
return out << "incl start";
|
||||
case bound_kind::incl_end:
|
||||
return out << "incl end";
|
||||
case bound_kind::excl_start:
|
||||
return out << "excl start";
|
||||
}
|
||||
abort();
|
||||
}
|
||||
|
||||
bound_kind invert_kind(bound_kind k) {
|
||||
switch(k) {
|
||||
case bound_kind::excl_start: return bound_kind::incl_end;
|
||||
case bound_kind::incl_start: return bound_kind::excl_end;
|
||||
case bound_kind::excl_end: return bound_kind::incl_start;
|
||||
case bound_kind::incl_end: return bound_kind::excl_start;
|
||||
}
|
||||
abort();
|
||||
}
|
||||
|
||||
int32_t weight(bound_kind k) {
|
||||
switch(k) {
|
||||
case bound_kind::excl_end:
|
||||
return -2;
|
||||
case bound_kind::incl_start:
|
||||
return -1;
|
||||
case bound_kind::incl_end:
|
||||
return 1;
|
||||
case bound_kind::excl_start:
|
||||
return 2;
|
||||
}
|
||||
abort();
|
||||
}
|
||||
|
||||
const thread_local clustering_key_prefix bound_view::empty_prefix = clustering_key::make_empty();
|
||||
|
||||
11
keys.hh
11
keys.hh
@@ -158,8 +158,12 @@ protected:
|
||||
return TopLevel::get_compound_type(s);
|
||||
}
|
||||
public:
|
||||
static TopLevel make_empty() {
|
||||
return from_exploded(std::vector<bytes>());
|
||||
}
|
||||
|
||||
static TopLevel make_empty(const schema& s) {
|
||||
return from_exploded(s, {});
|
||||
return make_empty();
|
||||
}
|
||||
|
||||
template<typename RangeOfSerializedComponents>
|
||||
@@ -299,6 +303,11 @@ public:
|
||||
void feed_hash(Hasher& h, const schema& s) const {
|
||||
view().feed_hash(h, s);
|
||||
}
|
||||
|
||||
// Returns the number of components of this compound.
|
||||
size_t size(const schema& s) const {
|
||||
return std::distance(begin(s), end(s));
|
||||
}
|
||||
};
|
||||
|
||||
template <typename TopLevel, typename PrefixTopLevel>
|
||||
|
||||
22
main.cc
22
main.cc
@@ -277,6 +277,7 @@ verify_seastar_io_scheduler(bool has_max_io_requests, bool developer_mode) {
|
||||
}
|
||||
|
||||
int main(int ac, char** av) {
|
||||
try {
|
||||
// early check to avoid triggering
|
||||
if (!cpu_sanity()) {
|
||||
_exit(71);
|
||||
@@ -516,6 +517,18 @@ int main(int ac, char** av) {
|
||||
}
|
||||
return db.load_sstables(proxy);
|
||||
}).get();
|
||||
// If the same sstable is shared by several shards, it cannot be
|
||||
// deleted until all shards decide to compact it. So we want to
|
||||
// start thse compactions now. Note we start compacting only after
|
||||
// all sstables in this CF were loaded on all shards - otherwise
|
||||
// we will have races between the compaction and loading processes
|
||||
db.invoke_on_all([&proxy] (database& db) {
|
||||
for (auto& x : db.get_column_families()) {
|
||||
column_family& cf = *(x.second);
|
||||
// We start the rewrite, but do not wait for it.
|
||||
cf.start_rewrite();
|
||||
}
|
||||
}).get();
|
||||
supervisor_notify("setting up system keyspace");
|
||||
db::system_keyspace::setup(db, qp).get();
|
||||
supervisor_notify("starting commit log");
|
||||
@@ -595,10 +608,10 @@ int main(int ac, char** av) {
|
||||
supervisor_notify("serving");
|
||||
// Register at_exit last, so that storage_service::drain_on_shutdown will be called first
|
||||
engine().at_exit([] {
|
||||
return service::get_local_storage_service().drain_on_shutdown();
|
||||
return repair_shutdown(service::get_local_storage_service().db());
|
||||
});
|
||||
engine().at_exit([] {
|
||||
return repair_shutdown(service::get_local_storage_service().db());
|
||||
return service::get_local_storage_service().drain_on_shutdown();
|
||||
});
|
||||
engine().at_exit([&db] {
|
||||
return db.invoke_on_all([](auto& db) {
|
||||
@@ -607,6 +620,11 @@ int main(int ac, char** av) {
|
||||
});
|
||||
}).or_terminate();
|
||||
});
|
||||
} catch (...) {
|
||||
// reactor may not have been initialized, so can't use logger
|
||||
fprint(std::cerr, "FATAL: Exception during startup, aborting: %s\n", std::current_exception());
|
||||
return 7; // 1 has a special meaning for upstart
|
||||
}
|
||||
}
|
||||
|
||||
namespace debug {
|
||||
|
||||
@@ -53,6 +53,14 @@ struct reversal_traits<false> {
|
||||
return c.erase_and_dispose(begin, end, std::move(disposer));
|
||||
}
|
||||
|
||||
template<typename Container, typename Disposer>
|
||||
static typename Container::iterator erase_dispose_and_update_end(Container& c,
|
||||
typename Container::iterator it, Disposer&& disposer,
|
||||
typename Container::iterator&)
|
||||
{
|
||||
return c.erase_and_dispose(it, std::forward<Disposer>(disposer));
|
||||
}
|
||||
|
||||
template <typename Container>
|
||||
static boost::iterator_range<typename Container::iterator> maybe_reverse(
|
||||
Container& c, boost::iterator_range<typename Container::iterator> r)
|
||||
@@ -89,6 +97,24 @@ struct reversal_traits<true> {
|
||||
);
|
||||
}
|
||||
|
||||
// Erases element pointed to by it and makes sure than iterator end is not
|
||||
// invalidated.
|
||||
template<typename Container, typename Disposer>
|
||||
static typename Container::reverse_iterator erase_dispose_and_update_end(Container& c,
|
||||
typename Container::reverse_iterator it, Disposer&& disposer,
|
||||
typename Container::reverse_iterator& end)
|
||||
{
|
||||
auto to_erase = std::next(it).base();
|
||||
bool update_end = end.base() == to_erase;
|
||||
auto ret = typename Container::reverse_iterator(
|
||||
c.erase_and_dispose(to_erase, std::forward<Disposer>(disposer))
|
||||
);
|
||||
if (update_end) {
|
||||
end = ret;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <typename Container>
|
||||
static boost::iterator_range<typename Container::reverse_iterator> maybe_reverse(
|
||||
Container& c, boost::iterator_range<typename Container::iterator> r)
|
||||
@@ -1120,7 +1146,7 @@ void mutation_partition::trim_rows(const schema& s,
|
||||
}
|
||||
|
||||
if (e.empty()) {
|
||||
last = reversal_traits<reversed>::erase_and_dispose(_rows, last, std::next(last, 1), deleter);
|
||||
last = reversal_traits<reversed>::erase_dispose_and_update_end(_rows, last, deleter, end);
|
||||
} else {
|
||||
++last;
|
||||
}
|
||||
|
||||
@@ -218,3 +218,42 @@ public:
|
||||
mutation_reader make_empty_reader() {
|
||||
return make_mutation_reader<empty_reader>();
|
||||
}
|
||||
|
||||
|
||||
class restricting_mutation_reader : public mutation_reader::impl {
|
||||
const restricted_mutation_reader_config& _config;
|
||||
unsigned _weight = 0;
|
||||
bool _waited = false;
|
||||
mutation_reader _base;
|
||||
public:
|
||||
restricting_mutation_reader(const restricted_mutation_reader_config& config, unsigned weight, mutation_reader&& base)
|
||||
: _config(config), _weight(weight), _base(std::move(base)) {
|
||||
if (_config.sem->waiters() >= _config.max_queue_length) {
|
||||
_config.raise_queue_overloaded_exception();
|
||||
}
|
||||
}
|
||||
~restricting_mutation_reader() {
|
||||
if (_waited) {
|
||||
_config.sem->signal(_weight);
|
||||
}
|
||||
}
|
||||
future<mutation_opt> operator()() override {
|
||||
// FIXME: we should defer freeing until the mutation is freed, perhaps,
|
||||
// rather than just returned
|
||||
if (_waited) {
|
||||
return _base();
|
||||
}
|
||||
auto waited = _config.timeout.count() != 0
|
||||
? _config.sem->wait(_config.timeout, _weight)
|
||||
: _config.sem->wait(_weight);
|
||||
return waited.then([this] {
|
||||
_waited = true;
|
||||
return _base();
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
mutation_reader
|
||||
make_restricted_reader(const restricted_mutation_reader_config& config, unsigned weight, mutation_reader&& base) {
|
||||
return make_mutation_reader<restricting_mutation_reader>(config, weight, std::move(base));
|
||||
}
|
||||
|
||||
@@ -85,6 +85,23 @@ mutation_reader make_empty_reader();
|
||||
// when creating the reader involves disk I/O or a shard call
|
||||
mutation_reader make_lazy_reader(std::function<mutation_reader ()> make_reader);
|
||||
|
||||
struct restricted_mutation_reader_config {
|
||||
semaphore* sem = nullptr;
|
||||
std::chrono::nanoseconds timeout = {};
|
||||
size_t max_queue_length = std::numeric_limits<size_t>::max();
|
||||
std::function<void ()> raise_queue_overloaded_exception = default_raise_queue_overloaded_exception;
|
||||
|
||||
static void default_raise_queue_overloaded_exception() {
|
||||
throw std::runtime_error("restricted mutation reader queue overload");
|
||||
}
|
||||
};
|
||||
|
||||
// Restricts a given `mutation_reader` to a concurrency limited according to settings in
|
||||
// a restricted_mutation_reader_config. These settings include a semaphore for limiting the number
|
||||
// of active concurrent readers, a timeout for inactive readers, and a maximum queue size for
|
||||
// inactive readers.
|
||||
mutation_reader make_restricted_reader(const restricted_mutation_reader_config& config, unsigned weight, mutation_reader&& base);
|
||||
|
||||
template <typename MutationFilter>
|
||||
class filtering_reader : public mutation_reader::impl {
|
||||
mutation_reader _rd;
|
||||
|
||||
@@ -44,6 +44,8 @@ extern const partition_range full_partition_range;
|
||||
// FIXME: Move this to i_partitioner.hh after query::range<> is moved to utils/range.hh
|
||||
query::partition_range to_partition_range(query::range<dht::token>);
|
||||
|
||||
bool is_wrap_around(const query::clustering_range& r, const schema& s);
|
||||
|
||||
inline
|
||||
bool is_wrap_around(const query::partition_range& range, const schema& s) {
|
||||
return range.is_wrap_around(dht::ring_position_comparator(s));
|
||||
|
||||
6
query.cc
6
query.cc
@@ -29,6 +29,7 @@
|
||||
#include "mutation_partition_serializer.hh"
|
||||
#include "query-result-reader.hh"
|
||||
#include "query_result_merger.hh"
|
||||
#include "clustering_bounds_comparator.hh"
|
||||
|
||||
namespace query {
|
||||
|
||||
@@ -235,4 +236,9 @@ foreign_ptr<lw_shared_ptr<query::result>> result_merger::get() {
|
||||
return make_foreign(make_lw_shared<query::result>(std::move(w), row_count));
|
||||
}
|
||||
|
||||
bool is_wrap_around(const query::clustering_range& r, const schema& s) {
|
||||
auto bounds = bound_view::from_range(s, r);
|
||||
return bound_view::compare(s)(bounds.second, bounds.first);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -397,7 +397,7 @@ static future<> sync_range(seastar::sharded<database>& db,
|
||||
return sp_in.execute().discard_result().then([&sp_out] {
|
||||
return sp_out.execute().discard_result();
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.error("repair's stream failed: {}", ep);
|
||||
logger.warn("repair's stream failed: {}", ep);
|
||||
return make_exception_future(ep);
|
||||
});
|
||||
});
|
||||
|
||||
54
row_cache.cc
54
row_cache.cc
@@ -443,7 +443,16 @@ row_cache::make_reader(schema_ptr s,
|
||||
}
|
||||
|
||||
row_cache::~row_cache() {
|
||||
clear();
|
||||
clear_now();
|
||||
}
|
||||
|
||||
void row_cache::clear_now() noexcept {
|
||||
with_allocator(_tracker.allocator(), [this] {
|
||||
_partitions.clear_and_dispose([this, deleter = current_deleter<cache_entry>()] (auto&& p) mutable {
|
||||
_tracker.on_erase();
|
||||
deleter(p);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void row_cache::populate(const mutation& m) {
|
||||
@@ -467,16 +476,8 @@ void row_cache::populate(const mutation& m) {
|
||||
});
|
||||
}
|
||||
|
||||
void row_cache::clear() {
|
||||
with_allocator(_tracker.allocator(), [this] {
|
||||
// We depend on clear_and_dispose() below not looking up any keys.
|
||||
// Using with_linearized_managed_bytes() is no helps, because we don't
|
||||
// want to propagate an exception from here.
|
||||
_partitions.clear_and_dispose([this, deleter = current_deleter<cache_entry>()] (auto&& p) mutable {
|
||||
_tracker.on_erase();
|
||||
deleter(p);
|
||||
});
|
||||
});
|
||||
future<> row_cache::clear() {
|
||||
return invalidate(query::full_partition_range);
|
||||
}
|
||||
|
||||
future<> row_cache::update(memtable& m, partition_presence_checker presence_checker) {
|
||||
@@ -502,8 +503,8 @@ future<> row_cache::update(memtable& m, partition_presence_checker presence_chec
|
||||
});
|
||||
if (blow_cache) {
|
||||
// We failed to invalidate the key, presumably due to with_linearized_managed_bytes()
|
||||
// running out of memory. Recover using clear(), which doesn't throw.
|
||||
clear();
|
||||
// running out of memory. Recover using clear_now(), which doesn't throw.
|
||||
clear_now();
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -577,7 +578,8 @@ void row_cache::invalidate_locked(const dht::decorated_key& dk) {
|
||||
});
|
||||
}
|
||||
|
||||
void row_cache::invalidate(const dht::decorated_key& dk) {
|
||||
future<> row_cache::invalidate(const dht::decorated_key& dk) {
|
||||
return _populate_phaser.advance_and_await().then([this, &dk] {
|
||||
_read_section(_tracker.region(), [&] {
|
||||
with_allocator(_tracker.allocator(), [this, &dk] {
|
||||
with_linearized_managed_bytes([&] {
|
||||
@@ -585,17 +587,24 @@ void row_cache::invalidate(const dht::decorated_key& dk) {
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void row_cache::invalidate(const query::partition_range& range) {
|
||||
with_linearized_managed_bytes([&] {
|
||||
if (range.is_wrap_around(dht::ring_position_comparator(*_schema))) {
|
||||
auto unwrapped = range.unwrap();
|
||||
invalidate(unwrapped.first);
|
||||
invalidate(unwrapped.second);
|
||||
return;
|
||||
}
|
||||
future<> row_cache::invalidate(const query::partition_range& range) {
|
||||
return _populate_phaser.advance_and_await().then([this, &range] {
|
||||
with_linearized_managed_bytes([&] {
|
||||
if (range.is_wrap_around(dht::ring_position_comparator(*_schema))) {
|
||||
auto unwrapped = range.unwrap();
|
||||
invalidate_unwrapped(unwrapped.first);
|
||||
invalidate_unwrapped(unwrapped.second);
|
||||
} else {
|
||||
invalidate_unwrapped(range);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void row_cache::invalidate_unwrapped(const query::partition_range& range) {
|
||||
logalloc::reclaim_lock _(_tracker.region());
|
||||
|
||||
auto cmp = cache_entry::compare(_schema);
|
||||
@@ -621,7 +630,6 @@ void row_cache::invalidate(const query::partition_range& range) {
|
||||
deleter(p);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
row_cache::row_cache(schema_ptr s, mutation_source fallback_factory, key_source underlying_keys,
|
||||
|
||||
24
row_cache.hh
24
row_cache.hh
@@ -184,13 +184,13 @@ private:
|
||||
mutation_source _underlying;
|
||||
key_source _underlying_keys;
|
||||
|
||||
// Synchronizes populating reads with update() to ensure that cache
|
||||
// Synchronizes populating reads with updates of underlying data source to ensure that cache
|
||||
// remains consistent across flushes with the underlying data source.
|
||||
// Readers obtained from the underlying data source in earlier than
|
||||
// current phases must not be used to populate the cache, unless they hold
|
||||
// phaser::operation created in the reader's phase of origin. Readers
|
||||
// should hold to a phase only briefly because this inhibits progress of
|
||||
// update(). Phase changes occur only in update(), which can be assumed to
|
||||
// updates. Phase changes occur in update()/clear(), which can be assumed to
|
||||
// be asynchronous wrt invoking of the underlying data source.
|
||||
utils::phased_barrier _populate_phaser;
|
||||
|
||||
@@ -204,6 +204,8 @@ private:
|
||||
void on_miss();
|
||||
void upgrade_entry(cache_entry&);
|
||||
void invalidate_locked(const dht::decorated_key&);
|
||||
void invalidate_unwrapped(const query::partition_range&);
|
||||
void clear_now() noexcept;
|
||||
static thread_local seastar::thread_scheduling_group _update_thread_scheduling_group;
|
||||
public:
|
||||
~row_cache();
|
||||
@@ -228,7 +230,9 @@ public:
|
||||
void populate(const mutation& m);
|
||||
|
||||
// Clears the cache.
|
||||
void clear();
|
||||
// Guarantees that cache will not be populated using readers created
|
||||
// before this method was invoked.
|
||||
future<> clear();
|
||||
|
||||
// Synchronizes cache with the underlying data source from a memtable which
|
||||
// has just been flushed to the underlying data source.
|
||||
@@ -240,11 +244,21 @@ public:
|
||||
void touch(const dht::decorated_key&);
|
||||
|
||||
// Removes given partition from cache.
|
||||
void invalidate(const dht::decorated_key&);
|
||||
//
|
||||
// Guarantees that cache will not be populated with given key
|
||||
// using readers created before this method was invoked.
|
||||
//
|
||||
// The key must be kept alive until method resolves.
|
||||
future<> invalidate(const dht::decorated_key& key);
|
||||
|
||||
// Removes given range of partitions from cache.
|
||||
// The range can be a wrap around.
|
||||
void invalidate(const query::partition_range&);
|
||||
//
|
||||
// Guarantees that cache will not be populated with partitions from that range
|
||||
// using readers created before this method was invoked.
|
||||
//
|
||||
// The range must be kept alive until method resolves.
|
||||
future<> invalidate(const query::partition_range&);
|
||||
|
||||
auto num_entries() const {
|
||||
return _partitions.size();
|
||||
|
||||
95
schema.cc
95
schema.cc
@@ -56,6 +56,14 @@ sstring to_sstring(index_type t) {
|
||||
throw std::invalid_argument("unknown index type");
|
||||
}
|
||||
|
||||
bool is_regular(column_kind k) {
|
||||
return k == column_kind::regular_column || k == column_kind::compact_column;
|
||||
}
|
||||
|
||||
bool is_compatible(column_kind k1, column_kind k2) {
|
||||
return k1 == k2 || (is_regular(k1) && is_regular(k2));
|
||||
}
|
||||
|
||||
column_mapping_entry::column_mapping_entry(bytes name, sstring type_name)
|
||||
: _name(std::move(name))
|
||||
, _type(db::marshal::type_parser::parse(type_name))
|
||||
@@ -629,51 +637,60 @@ schema_builder& schema_builder::with_version(table_schema_version v) {
|
||||
return *this;
|
||||
}
|
||||
|
||||
schema_ptr schema_builder::build() {
|
||||
if (_version) {
|
||||
_raw._version = *_version;
|
||||
} else {
|
||||
_raw._version = utils::UUID_gen::get_time_UUID();
|
||||
}
|
||||
void schema_builder::prepare_dense_schema(schema::raw_schema& raw) {
|
||||
if (raw._is_dense) {
|
||||
auto regular_cols = boost::copy_range<std::vector<column_definition*>>(
|
||||
raw._columns | boost::adaptors::filtered([](auto&& col) { return col.is_regular(); })
|
||||
| boost::adaptors::transformed([](auto&& col) { return &col; }));
|
||||
|
||||
if (!_compact_storage) {
|
||||
return make_lw_shared<schema>(schema(_raw));
|
||||
}
|
||||
|
||||
schema s(_raw);
|
||||
|
||||
// Dense means that no part of the comparator stores a CQL column name. This means
|
||||
// COMPACT STORAGE with at least one columnAliases (otherwise it's a thrift "static" CF).
|
||||
s._raw._is_dense = (*_compact_storage == compact_storage::yes) && (s.clustering_key_size() > 0);
|
||||
|
||||
if (s.clustering_key_size() == 0) {
|
||||
if (*_compact_storage == compact_storage::yes) {
|
||||
s._raw._is_compound = false;
|
||||
} else {
|
||||
s._raw._is_compound = true;
|
||||
}
|
||||
} else {
|
||||
if ((*_compact_storage == compact_storage::yes) && s.clustering_key_size() == 1) {
|
||||
s._raw._is_compound = false;
|
||||
} else {
|
||||
s._raw._is_compound = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (s._raw._is_dense) {
|
||||
// In Origin, dense CFs always have at least one regular column
|
||||
if (s.regular_columns_count() == 0) {
|
||||
s._raw._columns.emplace_back(bytes(""), s.regular_column_name_type(), column_kind::regular_column, 0, index_info());
|
||||
if (regular_cols.empty()) {
|
||||
raw._columns.emplace_back(bytes(""), raw._regular_column_name_type, column_kind::compact_column, 0, index_info());
|
||||
return;
|
||||
}
|
||||
|
||||
if (s.regular_columns_count() != 1) {
|
||||
throw exceptions::configuration_exception(sprint("Expecting exactly one regular column. Found %d", s.regular_columns_count()));
|
||||
if (regular_cols.size() != 1) {
|
||||
throw exceptions::configuration_exception(sprint("Expecting exactly one regular column. Found %d", regular_cols.size()));
|
||||
}
|
||||
s._raw._columns.at(s.column_offset(column_kind::regular_column)).kind = column_kind::compact_column;
|
||||
|
||||
regular_cols[0]->kind = column_kind::compact_column;
|
||||
}
|
||||
// We need to rebuild the schema in case we added some column. This is way simpler than trying to factor out the relevant code
|
||||
// from the constructor
|
||||
return make_lw_shared<schema>(schema(s._raw));
|
||||
}
|
||||
|
||||
schema_ptr schema_builder::build() {
|
||||
schema::raw_schema new_raw = _raw; // Copy so that build() remains idempotent.
|
||||
|
||||
if (_version) {
|
||||
new_raw._version = *_version;
|
||||
} else {
|
||||
new_raw._version = utils::UUID_gen::get_time_UUID();
|
||||
}
|
||||
|
||||
if (_compact_storage) {
|
||||
// Dense means that no part of the comparator stores a CQL column name. This means
|
||||
// COMPACT STORAGE with at least one columnAliases (otherwise it's a thrift "static" CF).
|
||||
auto clustering_key_size = std::count_if(new_raw._columns.begin(), new_raw._columns.end(), [](auto&& col) {
|
||||
return col.kind == column_kind::clustering_key;
|
||||
});
|
||||
new_raw._is_dense = (*_compact_storage == compact_storage::yes) && (clustering_key_size > 0);
|
||||
|
||||
if (clustering_key_size == 0) {
|
||||
if (*_compact_storage == compact_storage::yes) {
|
||||
new_raw._is_compound = false;
|
||||
} else {
|
||||
new_raw._is_compound = true;
|
||||
}
|
||||
} else {
|
||||
if ((*_compact_storage == compact_storage::yes) && clustering_key_size == 1) {
|
||||
new_raw._is_compound = false;
|
||||
} else {
|
||||
new_raw._is_compound = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
prepare_dense_schema(new_raw);
|
||||
return make_lw_shared<schema>(schema(new_raw));
|
||||
}
|
||||
|
||||
schema_ptr schema_builder::build(compact_storage cp) {
|
||||
|
||||
@@ -72,6 +72,8 @@ void read_collections(schema_builder& builder, sstring comparator);
|
||||
enum class column_kind { partition_key, clustering_key, static_column, regular_column, compact_column };
|
||||
|
||||
sstring to_sstring(column_kind k);
|
||||
bool is_regular(column_kind k);
|
||||
bool is_compatible(column_kind k1, column_kind k2);
|
||||
|
||||
// CMH this is also manually defined in thrift gen file.
|
||||
enum class index_type {
|
||||
@@ -224,7 +226,7 @@ public:
|
||||
index_info idx_info;
|
||||
|
||||
bool is_static() const { return kind == column_kind::static_column; }
|
||||
bool is_regular() const { return kind == column_kind::regular_column || kind == column_kind::compact_column; }
|
||||
bool is_regular() const { return ::is_regular(kind); }
|
||||
bool is_partition_key() const { return kind == column_kind::partition_key; }
|
||||
bool is_clustering_key() const { return kind == column_kind::clustering_key; }
|
||||
bool is_primary_key() const { return kind == column_kind::partition_key || kind == column_kind::clustering_key; }
|
||||
|
||||
@@ -220,4 +220,6 @@ public:
|
||||
schema_ptr build(compact_storage cp);
|
||||
|
||||
schema_ptr build();
|
||||
private:
|
||||
void prepare_dense_schema(schema::raw_schema& raw);
|
||||
};
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 0bcdd282c5...d6ccc19f9b
@@ -166,7 +166,8 @@ private:
|
||||
);
|
||||
|
||||
auto ranges = _ranges;
|
||||
return get_local_storage_proxy().query(_schema, _cmd, std::move(ranges),
|
||||
auto command = ::make_lw_shared<query::read_command>(*_cmd);
|
||||
return get_local_storage_proxy().query(_schema, std::move(command), std::move(ranges),
|
||||
_options.get_consistency()).then(
|
||||
[this, &builder, page_size, now](foreign_ptr<lw_shared_ptr<query::result>> results) {
|
||||
handle_result(builder, std::move(results), page_size, now);
|
||||
|
||||
@@ -2058,7 +2058,6 @@ public:
|
||||
auto write_timeout = exec->_proxy->_db.local().get_config().write_request_timeout_in_ms() * 1000;
|
||||
auto delta = __int128_t(digest_resolver->last_modified()) - __int128_t(exec->_cmd->read_timestamp);
|
||||
if (std::abs(delta) <= write_timeout) {
|
||||
print("HERE %d\n", int64_t(delta));
|
||||
exec->_proxy->_stats.global_read_repairs_canceled_due_to_concurrent_write++;
|
||||
// if CL is local and non matching data is modified less then write_timeout ms ago do only local repair
|
||||
auto i = boost::range::remove_if(exec->_targets, std::not1(std::cref(db::is_local)));
|
||||
|
||||
@@ -289,6 +289,9 @@ public:
|
||||
*
|
||||
* Partitions for each range will be ordered according to decorated_key ordering. Results for
|
||||
* each range from "partition_ranges" may appear in any order.
|
||||
*
|
||||
* IMPORTANT: Not all fibers started by this method have to be done by the time it returns so no
|
||||
* parameter can be changed after being passed to this method.
|
||||
*/
|
||||
future<foreign_ptr<lw_shared_ptr<query::result>>> query(schema_ptr,
|
||||
lw_shared_ptr<query::read_command> cmd,
|
||||
|
||||
@@ -972,6 +972,28 @@ void storage_service::unregister_subscriber(endpoint_lifecycle_subscriber* subsc
|
||||
|
||||
static stdx::optional<future<>> drain_in_progress;
|
||||
|
||||
future<> storage_service::stop_transport() {
|
||||
return run_with_no_api_lock([] (storage_service& ss) {
|
||||
return seastar::async([&ss] {
|
||||
logger.info("Stop transport: starts");
|
||||
|
||||
gms::get_local_gossiper().stop_gossiping().get();
|
||||
logger.info("Stop transport: stop_gossiping done");
|
||||
|
||||
ss.shutdown_client_servers().get();
|
||||
logger.info("Stop transport: shutdown rpc and cql server done");
|
||||
|
||||
ss.do_stop_ms().get();
|
||||
logger.info("Stop transport: shutdown messaging_service done");
|
||||
|
||||
auth::auth::shutdown().get();
|
||||
logger.info("Stop transport: auth shutdown");
|
||||
|
||||
logger.info("Stop transport: done");
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_service::drain_on_shutdown() {
|
||||
return run_with_no_api_lock([] (storage_service& ss) {
|
||||
if (drain_in_progress) {
|
||||
@@ -980,17 +1002,8 @@ future<> storage_service::drain_on_shutdown() {
|
||||
return seastar::async([&ss] {
|
||||
logger.info("Drain on shutdown: starts");
|
||||
|
||||
gms::get_local_gossiper().stop_gossiping().get();
|
||||
logger.info("Drain on shutdown: stop_gossiping done");
|
||||
|
||||
ss.shutdown_client_servers().get();
|
||||
logger.info("Drain on shutdown: shutdown rpc and cql server done");
|
||||
|
||||
ss.do_stop_ms().get();
|
||||
logger.info("Drain on shutdown: shutdown messaging_service done");
|
||||
|
||||
auth::auth::shutdown().get();
|
||||
logger.info("Drain on shutdown: auth shutdown");
|
||||
ss.stop_transport().get();
|
||||
logger.info("Drain on shutdown: stop_transport done");
|
||||
|
||||
ss.flush_column_families();
|
||||
logger.info("Drain on shutdown: flush column_families done");
|
||||
@@ -3007,7 +3020,7 @@ void storage_service::do_isolate_on_error(disk_error type)
|
||||
if (must_isolate && !isolated.exchange(true)) {
|
||||
logger.warn("Shutting down communications due to I/O errors until operator intervention");
|
||||
// isolated protect us against multiple stops
|
||||
service::get_storage_service().invoke_on_all([] (service::storage_service& s) { s.stop_native_transport(); });
|
||||
service::get_local_storage_service().stop_transport();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -382,6 +382,8 @@ public:
|
||||
|
||||
future<> drain_on_shutdown();
|
||||
|
||||
future<> stop_transport();
|
||||
|
||||
void flush_column_families();
|
||||
#if 0
|
||||
/**
|
||||
@@ -553,9 +555,9 @@ public:
|
||||
auto orig_map = get_range_to_address_map(keyspace, get_tokens_in_local_dc());
|
||||
std::unordered_map<range<token>, std::vector<inet_address>> filtered_map;
|
||||
for (auto entry : orig_map) {
|
||||
filtered_map[entry.first].reserve(entry.second.size());
|
||||
std::remove_copy_if(entry.second.begin(), entry.second.end(),
|
||||
filtered_map[entry.first].begin(), filter);
|
||||
auto& addresses = filtered_map[entry.first];
|
||||
addresses.reserve(entry.second.size());
|
||||
std::copy_if(entry.second.begin(), entry.second.end(), std::back_inserter(addresses), filter);
|
||||
}
|
||||
|
||||
return filtered_map;
|
||||
|
||||
@@ -83,13 +83,17 @@ int compaction_manager::trim_to_compact(column_family* cf, sstables::compaction_
|
||||
return weight;
|
||||
}
|
||||
|
||||
bool compaction_manager::try_to_register_weight(column_family* cf, int weight) {
|
||||
bool compaction_manager::try_to_register_weight(column_family* cf, int weight, bool parallel_compaction) {
|
||||
auto it = _weight_tracker.find(cf);
|
||||
if (it == _weight_tracker.end()) {
|
||||
_weight_tracker.insert({cf, {weight}});
|
||||
return true;
|
||||
}
|
||||
std::unordered_set<int>& s = it->second;
|
||||
// Only one weight is allowed if parallel compaction is disabled.
|
||||
if (!parallel_compaction && !s.empty()) {
|
||||
return false;
|
||||
}
|
||||
// TODO: Maybe allow only *smaller* compactions to start? That can be done
|
||||
// by returning true only if weight is not in the set and is lower than any
|
||||
// entry in the set.
|
||||
@@ -164,8 +168,7 @@ lw_shared_ptr<compaction_manager::task> compaction_manager::task_start(column_fa
|
||||
sstables::compaction_strategy cs = cf.get_compaction_strategy();
|
||||
descriptor = cs.get_sstables_for_compaction(cf, std::move(candidates));
|
||||
weight = trim_to_compact(&cf, descriptor);
|
||||
if (!try_to_register_weight(&cf, weight)) {
|
||||
// Refusing compaction job because of an ongoing compaction with same weight.
|
||||
if (!try_to_register_weight(&cf, weight, cs.parallel_compaction())) {
|
||||
task->stopping = true;
|
||||
_stats.pending_tasks--;
|
||||
cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}",
|
||||
@@ -248,6 +251,51 @@ lw_shared_ptr<compaction_manager::task> compaction_manager::task_start(column_fa
|
||||
return task;
|
||||
}
|
||||
|
||||
// submit_sstable_rewrite() starts a compaction task, much like submit(),
|
||||
// But rather than asking a compaction policy what to compact, this function
|
||||
// compacts just a single sstable, and writes one new sstable. This operation
|
||||
// is useful to split an sstable containing data belonging to multiple shards
|
||||
// into a separate sstable on each shard.
|
||||
void compaction_manager::submit_sstable_rewrite(column_family* cf, sstables::shared_sstable sst) {
|
||||
// The semaphore ensures that the sstable rewrite operations submitted by
|
||||
// submit_sstable_rewrite are run in sequence, and not all of them in
|
||||
// parallel. Note that unlike general compaction which currently allows
|
||||
// different cfs to compact in parallel, here we don't have a semaphore
|
||||
// per cf, so we only get one rewrite at a time on each shard.
|
||||
static thread_local semaphore sem(1);
|
||||
// We cannot, and don't need to, compact an sstable which is already
|
||||
// being compacted anyway.
|
||||
if (_stopped || _compacting_sstables.count(sst)) {
|
||||
return;
|
||||
}
|
||||
// Conversely, we don't want another compaction job to compact the
|
||||
// sstable we are planning to work on:
|
||||
_compacting_sstables.insert(sst);
|
||||
auto task = make_lw_shared<compaction_manager::task>();
|
||||
_tasks.push_back(task);
|
||||
_stats.active_tasks++;
|
||||
task->compaction_done = with_semaphore(sem, 1, [cf, sst] {
|
||||
return cf->compact_sstables(sstables::compaction_descriptor(
|
||||
std::vector<sstables::shared_sstable>{sst},
|
||||
sst->get_sstable_level(),
|
||||
std::numeric_limits<uint64_t>::max()), false);
|
||||
}).then_wrapped([this, sst, task] (future<> f) {
|
||||
_compacting_sstables.erase(sst);
|
||||
_stats.active_tasks--;
|
||||
_tasks.remove(task);
|
||||
try {
|
||||
f.get();
|
||||
_stats.completed_tasks++;
|
||||
} catch (sstables::compaction_stop_exception& e) {
|
||||
cmlog.info("compaction info: {}", e.what());
|
||||
_stats.errors++;
|
||||
} catch (...) {
|
||||
cmlog.error("compaction failed: {}", std::current_exception());
|
||||
_stats.errors++;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<> compaction_manager::task_stop(lw_shared_ptr<compaction_manager::task> task) {
|
||||
task->stopping = true;
|
||||
return task->compaction_gate.close().then([task] {
|
||||
|
||||
@@ -81,9 +81,9 @@ private:
|
||||
// It will not accept new requests in case the manager was stopped.
|
||||
bool can_submit();
|
||||
|
||||
// If weight is not taken for the column family, weight is registered and
|
||||
// true is returned. Return false otherwise.
|
||||
bool try_to_register_weight(column_family* cf, int weight);
|
||||
// Return true if weight is not registered. If parallel_compaction is not
|
||||
// true, only one weight is allowed to be registered.
|
||||
bool try_to_register_weight(column_family* cf, int weight, bool parallel_compaction);
|
||||
// Deregister weight for a column family.
|
||||
void deregister_weight(column_family* cf, int weight);
|
||||
|
||||
@@ -109,6 +109,13 @@ public:
|
||||
// Submit a column family to be cleaned up and wait for its termination.
|
||||
future<> perform_cleanup(column_family* cf);
|
||||
|
||||
// Submit a specific sstable to be rewritten, while dropping data which
|
||||
// does not belong to this shard. Meant to be used on startup when an
|
||||
// sstable is shared by multiple shards, and we want to split it to a
|
||||
// separate sstable for each shard.
|
||||
void submit_sstable_rewrite(column_family* cf,
|
||||
sstables::shared_sstable s);
|
||||
|
||||
// Remove a column family from the compaction manager.
|
||||
// Cancel requests on cf and wait for a possible ongoing compaction on cf.
|
||||
future<> remove(column_family* cf);
|
||||
|
||||
@@ -56,6 +56,9 @@ public:
|
||||
virtual ~compaction_strategy_impl() {}
|
||||
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) = 0;
|
||||
virtual compaction_strategy_type type() const = 0;
|
||||
virtual bool parallel_compaction() const {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
//
|
||||
@@ -402,6 +405,10 @@ public:
|
||||
|
||||
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) override;
|
||||
|
||||
virtual bool parallel_compaction() const override {
|
||||
return false;
|
||||
}
|
||||
|
||||
virtual compaction_strategy_type type() const {
|
||||
return compaction_strategy_type::leveled;
|
||||
}
|
||||
@@ -439,6 +446,9 @@ compaction_strategy_type compaction_strategy::type() const {
|
||||
compaction_descriptor compaction_strategy::get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) {
|
||||
return _compaction_strategy_impl->get_sstables_for_compaction(cfs, std::move(candidates));
|
||||
}
|
||||
bool compaction_strategy::parallel_compaction() const {
|
||||
return _compaction_strategy_impl->parallel_compaction();
|
||||
}
|
||||
|
||||
compaction_strategy make_compaction_strategy(compaction_strategy_type strategy, const std::map<sstring, sstring>& options) {
|
||||
::shared_ptr<compaction_strategy_impl> impl;
|
||||
|
||||
@@ -175,10 +175,8 @@ public:
|
||||
|
||||
if (previous != nullptr && current_first.tri_compare(s, previous->get_last_decorated_key(s)) <= 0) {
|
||||
|
||||
logger.warn("At level {}, {} [{}, {}] overlaps {} [{}, {}]. This could be caused by a bug in Cassandra 1.1.0 .. 1.1.3 " \
|
||||
"or due to the fact that you have dropped sstables from another node into the data directory. " \
|
||||
"Sending back to L0. If you didn't drop in sstables, and have not yet run scrub, you should do so since you may also " \
|
||||
"have rows out-of-order within an sstable",
|
||||
logger.warn("At level {}, {} [{}, {}] overlaps {} [{}, {}]. This could be caused by the fact that you have dropped " \
|
||||
"sstables from another node into the data directory. Sending back to L0.",
|
||||
level, previous->get_filename(), previous->get_first_partition_key(s), previous->get_last_partition_key(s),
|
||||
current->get_filename(), current->get_first_partition_key(s), current->get_last_partition_key(s));
|
||||
|
||||
|
||||
@@ -228,7 +228,7 @@ future<> stream_session::on_initialization_complete() {
|
||||
}
|
||||
_stream_result->handle_session_prepared(this->shared_from_this());
|
||||
} catch (...) {
|
||||
sslog.error("[Stream #{}] Fail to send PREPARE_MESSAGE to {}, {}", this->plan_id(), id, std::current_exception());
|
||||
sslog.warn("[Stream #{}] Fail to send PREPARE_MESSAGE to {}, {}", this->plan_id(), id, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
return make_ready_future<>();
|
||||
@@ -238,7 +238,7 @@ future<> stream_session::on_initialization_complete() {
|
||||
return ms().send_prepare_done_message(id, plan_id, this->dst_cpu_id).then([this] {
|
||||
sslog.debug("[Stream #{}] GOT PREPARE_DONE_MESSAGE Reply from {}", this->plan_id(), this->peer);
|
||||
}).handle_exception([id, plan_id] (auto ep) {
|
||||
sslog.error("[Stream #{}] Fail to send PREPARE_DONE_MESSAGE to {}, {}", plan_id, id, ep);
|
||||
sslog.warn("[Stream #{}] Fail to send PREPARE_DONE_MESSAGE to {}, {}", plan_id, id, ep);
|
||||
std::rethrow_exception(ep);
|
||||
});
|
||||
}).then([this] {
|
||||
@@ -248,7 +248,7 @@ future<> stream_session::on_initialization_complete() {
|
||||
}
|
||||
|
||||
void stream_session::on_error() {
|
||||
sslog.error("[Stream #{}] Streaming error occurred", plan_id());
|
||||
sslog.warn("[Stream #{}] Streaming error occurred", plan_id());
|
||||
// fail session
|
||||
close_session(stream_session_state::FAILED);
|
||||
}
|
||||
@@ -270,7 +270,7 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
|
||||
db.find_column_family(ks, cf);
|
||||
} catch (no_such_column_family) {
|
||||
auto err = sprint("[Stream #{}] prepare requested ks={} cf={} does not exist", ks, cf);
|
||||
sslog.error(err.c_str());
|
||||
sslog.warn(err.c_str());
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
}
|
||||
@@ -284,7 +284,7 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
|
||||
db.find_column_family(cf_id);
|
||||
} catch (no_such_column_family) {
|
||||
auto err = sprint("[Stream #{}] prepare cf_id=%s does not exist", plan_id, cf_id);
|
||||
sslog.error(err.c_str());
|
||||
sslog.warn(err.c_str());
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
prepare_receiving(summary);
|
||||
|
||||
@@ -85,41 +85,41 @@ struct send_info {
|
||||
};
|
||||
|
||||
future<stop_iteration> do_send_mutations(auto si, auto fm) {
|
||||
return get_local_stream_manager().mutation_send_limiter().wait().then([si, fm = std::move(fm)] () mutable {
|
||||
sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id);
|
||||
auto fm_size = fm.representation().size();
|
||||
net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then([si, fm_size] {
|
||||
sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr);
|
||||
get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size);
|
||||
si->mutations_done.signal();
|
||||
}).handle_exception([si] (auto ep) {
|
||||
// There might be larger number of STREAM_MUTATION inflight.
|
||||
// Log one error per column_family per range
|
||||
if (!si->error_logged) {
|
||||
si->error_logged = true;
|
||||
sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep);
|
||||
}
|
||||
si->mutations_done.broken();
|
||||
}).finally([] {
|
||||
get_local_stream_manager().mutation_send_limiter().signal();
|
||||
});
|
||||
return stop_iteration::no;
|
||||
sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id);
|
||||
auto fm_size = fm.representation().size();
|
||||
net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then([si, fm_size] {
|
||||
sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr);
|
||||
get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size);
|
||||
si->mutations_done.signal();
|
||||
}).handle_exception([si] (auto ep) {
|
||||
// There might be larger number of STREAM_MUTATION inflight.
|
||||
// Log one error per column_family per range
|
||||
if (!si->error_logged) {
|
||||
si->error_logged = true;
|
||||
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep);
|
||||
}
|
||||
si->mutations_done.broken();
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}
|
||||
|
||||
future<> send_mutations(auto si) {
|
||||
auto& cf = si->db.find_column_family(si->cf_id);
|
||||
auto& priority = service::get_local_streaming_read_priority();
|
||||
return do_with(cf.make_reader(cf.schema(), si->pr, query::no_clustering_key_filtering, priority), [si] (auto& reader) {
|
||||
return repeat([si, &reader] () {
|
||||
return reader().then([si] (auto mopt) {
|
||||
if (mopt && si->db.column_family_exists(si->cf_id)) {
|
||||
si->mutations_nr++;
|
||||
auto fm = frozen_mutation(*mopt);
|
||||
return do_send_mutations(si, std::move(fm));
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return repeat([si, &reader] {
|
||||
return get_local_stream_manager().mutation_send_limiter().wait().then([si, &reader] {
|
||||
return reader().then([si] (auto mopt) {
|
||||
if (mopt && si->db.column_family_exists(si->cf_id)) {
|
||||
si->mutations_nr++;
|
||||
auto fm = frozen_mutation(*mopt);
|
||||
return do_send_mutations(si, std::move(fm));
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
}).finally([] {
|
||||
get_local_stream_manager().mutation_send_limiter().signal();
|
||||
});
|
||||
});
|
||||
}).then([si] {
|
||||
@@ -132,7 +132,7 @@ void stream_transfer_task::start() {
|
||||
auto cf_id = this->cf_id;
|
||||
auto id = net::messaging_service::msg_addr{session->peer, session->dst_cpu_id};
|
||||
sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}", plan_id, cf_id);
|
||||
parallel_for_each(_ranges.begin(), _ranges.end(), [this, plan_id, cf_id, id] (auto range) {
|
||||
do_for_each(_ranges.begin(), _ranges.end(), [this, plan_id, cf_id, id] (auto range) {
|
||||
unsigned shard_begin = range.start() ? dht::shard_of(range.start()->value()) : 0;
|
||||
unsigned shard_end = range.end() ? dht::shard_of(range.end()->value()) + 1 : smp::count;
|
||||
auto cf_id = this->cf_id;
|
||||
@@ -153,7 +153,7 @@ void stream_transfer_task::start() {
|
||||
sslog.debug("[Stream #{}] SEND STREAM_MUTATION_DONE to {}, cf_id={}", plan_id, id, cf_id);
|
||||
return session->ms().send_stream_mutation_done(id, plan_id, _ranges,
|
||||
cf_id, session->dst_cpu_id).handle_exception([plan_id, id, cf_id] (auto ep) {
|
||||
sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION_DONE to {}: {}", plan_id, id, ep);
|
||||
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION_DONE to {}: {}", plan_id, id, ep);
|
||||
std::rethrow_exception(ep);
|
||||
});
|
||||
}).then([this, id, plan_id, cf_id] {
|
||||
@@ -161,7 +161,7 @@ void stream_transfer_task::start() {
|
||||
session->start_keep_alive_timer();
|
||||
session->transfer_task_completed(cf_id);
|
||||
}).handle_exception([this, plan_id, id] (auto ep){
|
||||
sslog.error("[Stream #{}] stream_transfer_task: Fail to send to {}: {}", plan_id, id, ep);
|
||||
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send to {}: {}", plan_id, id, ep);
|
||||
this->session->on_error();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -61,7 +61,7 @@ int main(int argc, char** argv) {
|
||||
logalloc::region r;
|
||||
|
||||
with_allocator(r.allocator(), [&] {
|
||||
std::deque<managed_bytes> refs;
|
||||
chunked_fifo<managed_bytes> refs;
|
||||
|
||||
r.make_evictable([&] {
|
||||
return with_allocator(r.allocator(), [&] {
|
||||
|
||||
@@ -1317,3 +1317,54 @@ SEASTAR_TEST_CASE(test_tombstone_purge) {
|
||||
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_trim_rows) {
|
||||
return seastar::async([] {
|
||||
auto s = schema_builder("ks", "cf")
|
||||
.with_column("pk", int32_type, column_kind::partition_key)
|
||||
.with_column("ck", int32_type, column_kind::clustering_key)
|
||||
.with_column("v", int32_type)
|
||||
.build();
|
||||
|
||||
auto pk = partition_key::from_exploded(*s, { int32_type->decompose(0) });
|
||||
mutation m(pk, s);
|
||||
constexpr auto row_count = 8;
|
||||
for (auto i = 0; i < row_count; i++) {
|
||||
m.set_clustered_cell(clustering_key_prefix::from_single_value(*s, int32_type->decompose(i)),
|
||||
to_bytes("v"), data_value(i), api::new_timestamp() - 5);
|
||||
}
|
||||
m.partition().apply(tombstone(api::new_timestamp(), gc_clock::now()));
|
||||
|
||||
auto now = gc_clock::now() + gc_clock::duration(std::chrono::hours(1));
|
||||
|
||||
auto compact_and_expect_empty = [&] (mutation m, std::vector<query::clustering_range> ranges) {
|
||||
mutation m2 = m;
|
||||
m.partition().compact_for_query(*s, now, ranges, false, query::max_rows);
|
||||
BOOST_REQUIRE(m.partition().clustered_rows().empty());
|
||||
|
||||
std::reverse(ranges.begin(), ranges.end());
|
||||
m2.partition().compact_for_query(*s, now, ranges, true, query::max_rows);
|
||||
BOOST_REQUIRE(m2.partition().clustered_rows().empty());
|
||||
};
|
||||
|
||||
std::vector<query::clustering_range> ranges = {
|
||||
query::clustering_range::make_starting_with(clustering_key_prefix::from_single_value(*s, int32_type->decompose(5)))
|
||||
};
|
||||
compact_and_expect_empty(m, ranges);
|
||||
|
||||
ranges = {
|
||||
query::clustering_range::make_starting_with(clustering_key_prefix::from_single_value(*s, int32_type->decompose(50)))
|
||||
};
|
||||
compact_and_expect_empty(m, ranges);
|
||||
|
||||
ranges = {
|
||||
query::clustering_range::make_ending_with(clustering_key_prefix::from_single_value(*s, int32_type->decompose(5)))
|
||||
};
|
||||
compact_and_expect_empty(m, ranges);
|
||||
|
||||
ranges = {
|
||||
query::clustering_range::make_open_ended_both_sides()
|
||||
};
|
||||
compact_and_expect_empty(m, ranges);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -546,27 +546,33 @@ static std::vector<mutation> updated_ring(std::vector<mutation>& mutations) {
|
||||
return result;
|
||||
}
|
||||
|
||||
static mutation_source make_mutation_source(std::vector<lw_shared_ptr<memtable>>& memtables) {
|
||||
return mutation_source([&memtables] (schema_ptr s, const query::partition_range& pr) {
|
||||
std::vector<mutation_reader> readers;
|
||||
for (auto&& mt : memtables) {
|
||||
readers.emplace_back(mt->make_reader(s, pr));
|
||||
}
|
||||
return make_combined_reader(std::move(readers));
|
||||
});
|
||||
}
|
||||
|
||||
static key_source make_key_source(schema_ptr s, std::vector<lw_shared_ptr<memtable>>& memtables) {
|
||||
return key_source([s, &memtables] (const query::partition_range& pr) {
|
||||
std::vector<key_reader> readers;
|
||||
for (auto&& mt : memtables) {
|
||||
readers.emplace_back(mt->as_key_source()(pr));
|
||||
}
|
||||
return make_combined_reader(s, std::move(readers));
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_cache_population_and_update_race) {
|
||||
return seastar::async([] {
|
||||
auto s = make_schema();
|
||||
std::vector<lw_shared_ptr<memtable>> memtables;
|
||||
auto memtables_data_source = mutation_source([&] (schema_ptr s, const query::partition_range& pr) {
|
||||
std::vector<mutation_reader> readers;
|
||||
for (auto&& mt : memtables) {
|
||||
readers.emplace_back(mt->make_reader(s, pr));
|
||||
}
|
||||
return make_combined_reader(std::move(readers));
|
||||
});
|
||||
auto memtables_key_source = key_source([&] (const query::partition_range& pr) {
|
||||
std::vector<key_reader> readers;
|
||||
for (auto&& mt : memtables) {
|
||||
readers.emplace_back(mt->as_key_source()(pr));
|
||||
}
|
||||
return make_combined_reader(s, std::move(readers));
|
||||
});
|
||||
throttled_mutation_source cache_source(memtables_data_source);
|
||||
throttled_mutation_source cache_source(make_mutation_source(memtables));
|
||||
cache_tracker tracker;
|
||||
row_cache cache(s, cache_source, memtables_key_source, tracker);
|
||||
row_cache cache(s, cache_source, make_key_source(s, memtables), tracker);
|
||||
|
||||
auto mt1 = make_lw_shared<memtable>(s);
|
||||
memtables.push_back(mt1);
|
||||
@@ -656,7 +662,7 @@ SEASTAR_TEST_CASE(test_invalidate) {
|
||||
auto some_element = keys_in_cache.begin() + 547;
|
||||
std::vector<dht::decorated_key> keys_not_in_cache;
|
||||
keys_not_in_cache.push_back(*some_element);
|
||||
cache.invalidate(*some_element);
|
||||
cache.invalidate(*some_element).get();
|
||||
keys_in_cache.erase(some_element);
|
||||
|
||||
for (auto&& key : keys_in_cache) {
|
||||
@@ -676,7 +682,7 @@ SEASTAR_TEST_CASE(test_invalidate) {
|
||||
{ *some_range_begin, true }, { *some_range_end, false }
|
||||
);
|
||||
keys_not_in_cache.insert(keys_not_in_cache.end(), some_range_begin, some_range_end);
|
||||
cache.invalidate(range);
|
||||
cache.invalidate(range).get();
|
||||
keys_in_cache.erase(some_range_begin, some_range_end);
|
||||
|
||||
for (auto&& key : keys_in_cache) {
|
||||
@@ -688,6 +694,72 @@ SEASTAR_TEST_CASE(test_invalidate) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_cache_population_and_clear_race) {
|
||||
return seastar::async([] {
|
||||
auto s = make_schema();
|
||||
std::vector<lw_shared_ptr<memtable>> memtables;
|
||||
throttled_mutation_source cache_source(make_mutation_source(memtables));
|
||||
cache_tracker tracker;
|
||||
row_cache cache(s, cache_source, make_key_source(s, memtables), tracker);
|
||||
|
||||
auto mt1 = make_lw_shared<memtable>(s);
|
||||
memtables.push_back(mt1);
|
||||
auto ring = make_ring(s, 3);
|
||||
for (auto&& m : ring) {
|
||||
mt1->apply(m);
|
||||
}
|
||||
|
||||
auto mt2 = make_lw_shared<memtable>(s);
|
||||
auto ring2 = updated_ring(ring);
|
||||
for (auto&& m : ring2) {
|
||||
mt2->apply(m);
|
||||
}
|
||||
|
||||
cache_source.block();
|
||||
|
||||
auto rd1 = cache.make_reader(s);
|
||||
auto rd1_result = rd1();
|
||||
|
||||
sleep(10ms).get();
|
||||
|
||||
memtables.clear();
|
||||
memtables.push_back(mt2);
|
||||
|
||||
// This update should miss on all partitions
|
||||
auto cache_cleared = cache.clear();
|
||||
|
||||
auto rd2 = cache.make_reader(s);
|
||||
|
||||
// rd1, which is in progress, should not prevent forward progress of clear()
|
||||
cache_source.unblock();
|
||||
cache_cleared.get();
|
||||
|
||||
// Reads started before memtable flush should return previous value, otherwise this test
|
||||
// doesn't trigger the conditions it is supposed to protect against.
|
||||
|
||||
assert_that(rd1_result.get0()).has_mutation().is_equal_to(ring[0]);
|
||||
assert_that(rd1().get0()).has_mutation().is_equal_to(ring2[1]);
|
||||
assert_that(rd1().get0()).has_mutation().is_equal_to(ring2[2]);
|
||||
assert_that(rd1().get0()).has_no_mutation();
|
||||
|
||||
// Reads started after clear but before previous populations completed
|
||||
// should already see the new data
|
||||
assert_that(std::move(rd2))
|
||||
.produces(ring2[0])
|
||||
.produces(ring2[1])
|
||||
.produces(ring2[2])
|
||||
.produces_end_of_stream();
|
||||
|
||||
// Reads started after clear should see new data
|
||||
assert_that(cache.make_reader(s))
|
||||
.produces(ring2[0])
|
||||
.produces(ring2[1])
|
||||
.produces(ring2[2])
|
||||
.produces_end_of_stream();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) {
|
||||
return seastar::async([] {
|
||||
auto s = make_schema();
|
||||
@@ -707,7 +779,7 @@ SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) {
|
||||
}
|
||||
|
||||
// wrap-around
|
||||
cache.invalidate(query::partition_range({ring[6].ring_position()}, {ring[1].ring_position()}));
|
||||
cache.invalidate(query::partition_range({ring[6].ring_position()}, {ring[1].ring_position()})).get();
|
||||
|
||||
verify_does_not_have(cache, ring[0].decorated_key());
|
||||
verify_does_not_have(cache, ring[1].decorated_key());
|
||||
@@ -719,7 +791,7 @@ SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) {
|
||||
verify_does_not_have(cache, ring[7].decorated_key());
|
||||
|
||||
// not wrap-around
|
||||
cache.invalidate(query::partition_range({ring[3].ring_position()}, {ring[4].ring_position()}));
|
||||
cache.invalidate(query::partition_range({ring[3].ring_position()}, {ring[4].ring_position()})).get();
|
||||
|
||||
verify_does_not_have(cache, ring[0].decorated_key());
|
||||
verify_does_not_have(cache, ring[1].decorated_key());
|
||||
|
||||
@@ -195,6 +195,7 @@ public:
|
||||
void write_long(int64_t n);
|
||||
void write_short(uint16_t n);
|
||||
void write_string(const sstring& s);
|
||||
void write_bytes_as_string(bytes_view s);
|
||||
void write_long_string(const sstring& s);
|
||||
void write_uuid(utils::UUID uuid);
|
||||
void write_string_list(std::vector<sstring> string_list);
|
||||
@@ -1441,6 +1442,12 @@ void cql_server::response::write_string(const sstring& s)
|
||||
_body.insert(_body.end(), s.begin(), s.end());
|
||||
}
|
||||
|
||||
void cql_server::response::write_bytes_as_string(bytes_view s)
|
||||
{
|
||||
write_short(cast_if_fits<uint16_t>(s.size()));
|
||||
_body.insert(_body.end(), s.begin(), s.end());
|
||||
}
|
||||
|
||||
void cql_server::response::write_long_string(const sstring& s)
|
||||
{
|
||||
write_int(cast_if_fits<int32_t>(s.size()));
|
||||
@@ -1587,6 +1594,18 @@ public:
|
||||
if (type->is_reversed()) {
|
||||
fail(unimplemented::cause::REVERSED);
|
||||
}
|
||||
if (type->is_user_type()) {
|
||||
r.write_short(uint16_t(type_id::UDT));
|
||||
auto udt = static_pointer_cast<const user_type_impl>(type);
|
||||
r.write_string(udt->_keyspace);
|
||||
r.write_bytes_as_string(udt->_name);
|
||||
r.write_short(udt->size());
|
||||
for (auto&& i : boost::irange<size_t>(0, udt->size())) {
|
||||
r.write_bytes_as_string(udt->field_name(i));
|
||||
encode(r, udt->field_type(i));
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (type->is_tuple()) {
|
||||
r.write_short(uint16_t(type_id::TUPLE));
|
||||
auto ttype = static_pointer_cast<const tuple_type_impl>(type);
|
||||
|
||||
2
types.cc
2
types.cc
@@ -2704,7 +2704,7 @@ update_types(const std::vector<data_type> types, const user_type updated) {
|
||||
if (!new_types) {
|
||||
new_types = types;
|
||||
}
|
||||
new_types->emplace(new_types->begin() + i, std::move(*ut));
|
||||
(*new_types)[i] = std::move(*ut);
|
||||
}
|
||||
}
|
||||
return new_types;
|
||||
|
||||
@@ -55,7 +55,7 @@ static thread_local auto reusable_indexes = std::vector<long>();
|
||||
|
||||
void bloom_filter::set_indexes(int64_t base, int64_t inc, int count, long max, std::vector<long>& results) {
|
||||
for (int i = 0; i < count; i++) {
|
||||
results[i] = abs(base % max);
|
||||
results[i] = std::abs(base % max);
|
||||
base = static_cast<int64_t>(static_cast<uint64_t>(base) + static_cast<uint64_t>(inc));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,3 +49,17 @@ bool is_system_error_errno(int err_no)
|
||||
code.category() == std::system_category();
|
||||
});
|
||||
}
|
||||
|
||||
bool should_stop_on_system_error(const std::system_error& e) {
|
||||
if (e.code().category() == std::system_category()) {
|
||||
// Whitelist of errors that don't require us to stop the server:
|
||||
switch (e.code().value()) {
|
||||
case EEXIST:
|
||||
case ENOENT:
|
||||
return false;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -196,7 +196,7 @@ inline ihistogram operator +(ihistogram a, const ihistogram& b) {
|
||||
struct rate_moving_average {
|
||||
uint64_t count = 0;
|
||||
double rates[3] = {0};
|
||||
double mean_rate;
|
||||
double mean_rate = 0;
|
||||
rate_moving_average& operator +=(const rate_moving_average& o) {
|
||||
count += o.count;
|
||||
mean_rate += o.mean_rate;
|
||||
|
||||
Reference in New Issue
Block a user