Compare commits
7 Commits
scylla-4.2
...
next-4.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
44ec73cfc4 | ||
|
|
df6f9a200f | ||
|
|
2f4a3c271c | ||
|
|
6a11c20b4a | ||
|
|
cccdd6aaae | ||
|
|
92871a88c3 | ||
|
|
85bbf6751d |
@@ -519,7 +519,7 @@ public:
|
||||
table& t = db.local().find_column_family(_schema->id());
|
||||
auto writer = shared_from_this();
|
||||
_writer_done[node_idx] = mutation_writer::distribute_reader_and_consume_on_shards(_schema, std::move(queue_reader),
|
||||
[&db, reason = this->_reason, estimated_partitions = this->_estimated_partitions, writer] (flat_mutation_reader reader) {
|
||||
[&db, reason = this->_reason, estimated_partitions = this->_estimated_partitions] (flat_mutation_reader reader) {
|
||||
auto& t = db.local().find_column_family(reader.schema());
|
||||
return db::view::check_needs_view_update_path(_sys_dist_ks->local(), t, reason).then([t = t.shared_from_this(), estimated_partitions, reader = std::move(reader)] (bool use_view_update_path) mutable {
|
||||
//FIXME: for better estimations this should be transmitted from remote
|
||||
|
||||
@@ -827,7 +827,7 @@ std::ostream& schema::describe(std::ostream& os) const {
|
||||
os << "}";
|
||||
os << "\n AND comment = '" << comment()<< "'";
|
||||
os << "\n AND compaction = {'class': '" << sstables::compaction_strategy::name(compaction_strategy()) << "'";
|
||||
map_as_cql_param(os, compaction_strategy_options()) << "}";
|
||||
map_as_cql_param(os, compaction_strategy_options(), false) << "}";
|
||||
os << "\n AND compression = {";
|
||||
map_as_cql_param(os, get_compressor_params().get_options());
|
||||
os << "}";
|
||||
|
||||
@@ -2546,7 +2546,7 @@ future<> storage_service::rebuild(sstring source_dc) {
|
||||
slogger.info("Streaming for rebuild successful");
|
||||
}).handle_exception([] (auto ep) {
|
||||
// This is used exclusively through JMX, so log the full trace but only throw a simple RTE
|
||||
slogger.warn("Error while rebuilding node: {}", std::current_exception());
|
||||
slogger.warn("Error while rebuilding node: {}", ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
});
|
||||
|
||||
@@ -646,10 +646,11 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa
|
||||
_tasks.push_back(task);
|
||||
|
||||
auto sstables = std::make_unique<std::vector<sstables::shared_sstable>>(get_func(*cf));
|
||||
auto compacting = make_lw_shared<compacting_sstable_registration>(this, *sstables);
|
||||
auto sstables_ptr = sstables.get();
|
||||
_stats.pending_tasks += sstables->size();
|
||||
|
||||
task->compaction_done = do_until([sstables_ptr] { return sstables_ptr->empty(); }, [this, task, options, sstables_ptr] () mutable {
|
||||
task->compaction_done = do_until([sstables_ptr] { return sstables_ptr->empty(); }, [this, task, options, sstables_ptr, compacting] () mutable {
|
||||
|
||||
// FIXME: lock cf here
|
||||
if (!can_proceed(task)) {
|
||||
@@ -659,7 +660,7 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa
|
||||
auto sst = sstables_ptr->back();
|
||||
sstables_ptr->pop_back();
|
||||
|
||||
return repeat([this, task, options, sst = std::move(sst)] () mutable {
|
||||
return repeat([this, task, options, sst = std::move(sst), compacting] () mutable {
|
||||
column_family& cf = *task->compacting_cf;
|
||||
auto sstable_level = sst->get_sstable_level();
|
||||
auto run_identifier = sst->run_identifier();
|
||||
@@ -667,21 +668,22 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa
|
||||
auto descriptor = sstables::compaction_descriptor({ sst }, cf.get_sstable_set(), service::get_local_compaction_priority(),
|
||||
sstable_level, sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, options);
|
||||
|
||||
auto compacting = make_lw_shared<compacting_sstable_registration>(this, descriptor.sstables);
|
||||
// Releases reference to cleaned sstable such that respective used disk space can be freed.
|
||||
descriptor.release_exhausted = [compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
|
||||
compacting->release_compacting(exhausted_sstables);
|
||||
};
|
||||
|
||||
_stats.pending_tasks--;
|
||||
_stats.active_tasks++;
|
||||
task->compaction_running = true;
|
||||
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_compaction_controller.backlog_of_shares(200), _available_memory));
|
||||
return do_with(std::move(user_initiated), [this, &cf, descriptor = std::move(descriptor)] (compaction_backlog_tracker& bt) mutable {
|
||||
return with_scheduling_group(_scheduling_group, [this, &cf, descriptor = std::move(descriptor)] () mutable {
|
||||
return cf.run_compaction(std::move(descriptor));
|
||||
return with_semaphore(_rewrite_sstables_sem, 1, [this, task, &cf, descriptor = std::move(descriptor)] () mutable {
|
||||
_stats.pending_tasks--;
|
||||
_stats.active_tasks++;
|
||||
task->compaction_running = true;
|
||||
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_compaction_controller.backlog_of_shares(200), _available_memory));
|
||||
return do_with(std::move(user_initiated), [this, &cf, descriptor = std::move(descriptor)] (compaction_backlog_tracker& bt) mutable {
|
||||
return with_scheduling_group(_scheduling_group, [this, &cf, descriptor = std::move(descriptor)]() mutable {
|
||||
return cf.run_compaction(std::move(descriptor));
|
||||
});
|
||||
});
|
||||
}).then_wrapped([this, task, compacting = std::move(compacting)] (future<> f) mutable {
|
||||
}).then_wrapped([this, task, compacting] (future<> f) mutable {
|
||||
task->compaction_running = false;
|
||||
_stats.active_tasks--;
|
||||
if (!can_proceed(task)) {
|
||||
|
||||
@@ -110,6 +110,7 @@ private:
|
||||
std::unordered_map<column_family*, rwlock> _compaction_locks;
|
||||
|
||||
semaphore _custom_job_sem{1};
|
||||
seastar::named_semaphore _rewrite_sstables_sem = {1, named_semaphore_exception_factory{"rewrite sstables"}};
|
||||
|
||||
std::function<void()> compaction_submission_callback();
|
||||
// all registered column families are submitted for compaction at a constant interval.
|
||||
|
||||
@@ -176,7 +176,7 @@ leveled_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input
|
||||
|
||||
unsigned max_filled_level = 0;
|
||||
|
||||
size_t offstrategy_threshold = std::max(schema->min_compaction_threshold(), 4);
|
||||
size_t offstrategy_threshold = (mode == reshape_mode::strict) ? std::max(schema->min_compaction_threshold(), 4) : std::max(schema->max_compaction_threshold(), 32);
|
||||
size_t max_sstables = std::max(schema->max_compaction_threshold(), int(offstrategy_threshold));
|
||||
auto tolerance = [mode] (unsigned level) -> unsigned {
|
||||
if (mode == reshape_mode::strict) {
|
||||
|
||||
@@ -1734,8 +1734,8 @@ void sstable::write_collection(file_writer& out, const composite& clustering_key
|
||||
void sstable::write_clustered_row(file_writer& out, const schema& schema, const clustering_row& clustered_row) {
|
||||
auto clustering_key = composite::from_clustering_element(schema, clustered_row.key());
|
||||
|
||||
maybe_write_row_marker(out, schema, clustered_row.marker(), clustering_key);
|
||||
maybe_write_row_tombstone(out, clustering_key, clustered_row);
|
||||
maybe_write_row_marker(out, schema, clustered_row.marker(), clustering_key);
|
||||
|
||||
if (schema.clustering_key_size()) {
|
||||
column_name_helper::min_max_components(schema, _collector.min_column_names(), _collector.max_column_names(),
|
||||
|
||||
Reference in New Issue
Block a user