Compare commits

...

30 Commits

Author SHA1 Message Date
Glauber Costa
d58af7ded5 commitlog: acquire semaphore earlier
Recently we have changed our shutdown strategy to wait for the
_request_controller semaphore to make sure no other allocations are
in-flight. That was done to fix an actual issue.

The problem is that this wasn't done early enough. We acquire the
semaphore after we have already marked ourselves as _shutdown and
released the timer.

That means that if there is an allocation in flight that needs to use a
new segment, it will never finish - and we'll therefore neve acquire
the semaphore.

Fix it by acquiring it first. At this point the allocations will all be
done and gone, and then we can shutdown everything else.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
Message-Id: <5c2a2f20e3832b6ea37d6541897519a9307294ed.1479765782.git.glauber@scylladb.com>
(cherry picked from commit 0b8b5abf16)
2016-11-21 22:23:15 +00:00
Avi Kivity
d9700a2826 storage_proxy: don't query concurrently needlessly during range queries
storage_proxy has an optimization where it tries to query multiple token
ranges concurrently to satisfy very large requests (an optimization which is
likely meaningless when paging is enabled, as it always should be).  However,
the rows-per-range code severely underestimates the number of rows per range,
resulting in a large number of "read-ahead" internal queries being performed,
the results of most of which are discarded.

Fix by disabling this code. We should likely remove it completely, but let's
start with a band-aid that can be backported.

Fixes #1863.

Message-Id: <20161120165741.2488-1-avi@scylladb.com>
(cherry picked from commit 6bdb8ba31d)
2016-11-21 18:19:59 +02:00
Glauber Costa
d2438059a7 database: keep a pointer to the memtable list in a memtable
We current pass a region group to the memtable, but after so many recent
changes, that is a bit too low level. This patch changes that so we pass
a memtable list instead.

Doing that also has a couple of advantages. Mainly, during flush we must
get to a memtable to a memtable_list. Currently we do that by going to
the memtable to a column family through the schema, and from there to
the memtable_list.

That, however, involves calling virtual functions in a derived class,
because a single column family could have both streaming and normal
memtables. If we pass a memtable_list to the memtable, we can keep
pointer, and when needed get the memtable_list directly.

Not only that gets rid of the inheritance for aesthetic reasons, but
that inheritance is not even correct anymore. Since the introduction of
the big streaming memtables, we now have a plethora of lists per column
family and this transversal is totally wrong. We haven't noticed before
because we were flushing the memtables based on their individual sizes,
but it has been wrong all along for edge cases in which we would have to
resort to size-based flush. This could be the case, for instance, with
various plan_ids in flight at the same time.

At this point, there is no more reason to keep the derived classes for
the dirty_memory_manager. I'm only keeping them around to reduce
clutter, although they are useful for the specialized constructors and
to communicate to the reader exactly what they are. But those can be
removed in a follow up patch if we want.

The old memtable constructor signature is kept around for the benefit of
two tests in memtable_tests which have their own flush logic. In the
future we could do something like we do for the SSTable tests, and have
a proxy class that is friends with the memtable class. That too, is left
for the future.

Fixes #1870

Signed-off-by: Glauber Costa <glauber@scylladb.com>
Message-Id: <811ec9e8e123dc5fc26eadbda82b0bae906657a9.1479743266.git.glauber@scylladb.com>
(cherry picked from commit 0ca8c3f162)
2016-11-21 18:18:56 +02:00
Glauber Costa
4098831ebc commitlog: wait for pending allocations to finish before closing gate.
allocations may enter the gate, so it would be wise for us to wait for them.

Fixes #1860

Signed-off-by: Glauber Costa <glauber@scylladb.com>
Message-Id: <53cd6996c1cbd8b38bab3b03604bd11e5c20beda.1479650012.git.glauber@scylladb.com>
(cherry picked from commit 21c1e2b48c)
2016-11-20 20:00:32 +02:00
Glauber Costa
4539b8403a database: fix direct flushes of non-durable column families.
If a Column Family is non-durable, then its flushes will never create a
memtable flush reader. Our current flush logic depends on that being
created and destroyed to release the semaphore permits on the flush.

We will remove the permits ourselves it there is an exception, but not
under normal circumnstances. Given this issue, however, it would be more
adequate to always try to remove the permits after we flush. If the
permits were already removed by the flush reader, then this test will
just see that the permit is not in the map and return. But if it is
still there, then it is removed.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
Message-Id: <049334c3b4bef620af2c7c045e6c84347dcf9013.1479498026.git.glauber@scylladb.com>
(cherry picked from commit 1933349654)
2016-11-18 21:33:22 +01:00
Raphael S. Carvalho
558f535fcb db: do not leak deleted sstable when deletion triggers an exception
The leakage results in deleted sstables being opened until shutdown, and disk
space isn't released. That's because column_family::rebuild_sstable_list()
will not remove reference to deleted sstables if an exception was triggered in
sstables::delete_atomically(). A sstable only has its files closed when its
object is destructed.

The exception happens when a major compaction is issued in parallel to a
regular one, and one of them will be unable to delete a sstable already deleted
by the other. That results in remove_by_toc_name() triggering boost::filesystem
::filesystem_error because TOC and temporary TOC don't exist.

We wouldn't have seen this problem if major compaction were going through
compaction manager, but remove_by_toc_name() and rebuild_sstable_list() should
be made resilient.

Fixes #1840.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <d43b2e78f9658e2c3c5bbb7f813756f18874bf92.1479390842.git.raphaelsc@scylladb.com>
(cherry picked from commit 3dc9294023)
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <760f96d81de0bab7507bb4f52c06b30f21e82577.1479420770.git.raphaelsc@scylladb.com>
2016-11-18 13:10:46 +02:00
Glauber Costa
3d45d0d339 fix shutdown and exception conditions for flush logic
This patch addresses post-merge follow up comments by Tomek.
Basically, what we do is:
- we don't need to signal() from remove_from_flush_manager(), because
  the explicit flushes no longer wait on the condition variable. So we
  don't.
- We now wait on the stop() flushes (regardless of their return status)
  so we can make sure that the _flush_queue will indeed be done with.
- we acquire the semaphore before shutting down the dirty_memory_manager
  to make sure that there are no pending flushes
- the flush manager that holds the semaphore has to match in the exception
  handler

Signed-off-by: Glauber Costa <glauber@scylladb.com>
Message-Id: <a23ab5098934546c660a08de64cd9294bb3a2008.1479400239.git.glauber@scylladb.com>
(cherry picked from commit 461778918b)
2016-11-18 11:53:21 +02:00
Avi Kivity
affc0d9138 Merge "get rid of memtable size parameter and rework flush logic" from Glauber
"This patchset allows Scylla to determine the size of a memtable instead
of relying in the user-provided memtable_cleanup_threshold. It does that
by allowing the region_group to specify a soft limit which will trigger
the allocation as early as it is reached.

Given that, we'll keep the memtables in memory for as long as it takes
to reach that limit, regardless of the individual size of any single one
of them. That limit is set to 1/4 of dirty memory. That's the same as
last submission, except this time I have run some experiments to gauge
behavior of that versus 1/2 of dirty memory, which was a preferred
theoretical value.

After that is done, the flush logic is reworked to guarantee that
flushes are not initiated if we already have one memtable under flush.
That allow us to better take advantage of coalescing opportunities with
new requests and prevents the pending memtable explosion that is
ultimately responsible for Issue 1817.

I have run mainly two workloads with this. The first one a local RF=1
workload with large partitions, sized 128kB and 100 threads. The results
are:

Before:

op rate                   : 632 [WRITE:632]
partition rate            : 632 [WRITE:632]
row rate                  : 632 [WRITE:632]
latency mean              : 157.8 [WRITE:157.8]
latency median            : 115.5 [WRITE:115.5]
latency 95th percentile   : 486.7 [WRITE:486.7]
latency 99th percentile   : 534.8 [WRITE:534.8]
latency 99.9th percentile : 599.0 [WRITE:599.0]
latency max               : 722.6 [WRITE:722.6]
Total partitions          : 189667 [WRITE:189667]
Total errors              : 0 [WRITE:0]
total gc count            : 0
total gc mb               : 0
total gc time (s)         : 0
avg gc time(ms)           : NaN
stdev gc time(ms)         : 0
Total operation time      : 00:05:00
END

After:

op rate                   : 951 [WRITE:951]
partition rate            : 951 [WRITE:951]
row rate                  : 951 [WRITE:951]
latency mean              : 104.8 [WRITE:104.8]
latency median            : 102.5 [WRITE:102.5]
latency 95th percentile   : 155.8 [WRITE:155.8]
latency 99th percentile   : 177.8 [WRITE:177.8]
latency 99.9th percentile : 686.4 [WRITE:686.4]
latency max               : 1081.4 [WRITE:1081.4]
Total partitions          : 285324 [WRITE:285324]
Total errors              : 0 [WRITE:0]
total gc count            : 0
total gc mb               : 0
total gc time (s)         : 0
avg gc time(ms)           : NaN
stdev gc time(ms)         : 0
Total operation time      : 00:05:00
END

The other workload was the workload described in #1817. And the result
is that we now have a load that is very stable around 100k ops/s and
hardly any timeouts, instead of the 1.4 baseline of wild variations
around 100k ops/s and lots of timeouts, or the deep reduction of
1.5-rc1."

* 'issue-1817-v4' of github.com:glommer/scylla:
  database: rework memtable flush logic
  get rid of max_memtable_size
  pass a region to dirty_memory_manager accounting API
  memtable: add a method to expose the region_group
  logalloc: allow region group reclaimer to specify a soft limit
  database: remove outdated comment
  database: uphold virtual dirty for system tables.

(cherry picked from commit 5d067eebf2)
2016-11-17 14:41:23 +02:00
Gleb Natapov
3c68504e54 sstables: fix ad-hoc summary creation
If sstable Summary is not present Scylla does not refuses to boot but
instead creates summary information on the fly. There is a bug in this
code though. Summary files is a map between keys and offsets into Index
file, but the code creates map between keys and Data file offsets
instead. Fix it by keeping offset of an index entry in index_entry
structure and use it during Summary file creation.

Fixes #1857.

Reviewed-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20161116165421.GA22296@scylladb.com>
(cherry picked from commit ae0a2935b4)
2016-11-17 11:45:29 +02:00
Raphael S. Carvalho
e9b26d547d main: fix exception handling when initializing data or commitlog dirs
Exception handling was broken because after io checker, storage_io_error
exception is wrapped around system error exceptions. Also the message
when handling exception wasn't precise enough for all cases. For example,
lack of permission to write to existing data directory.

Fixes #883.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <b2dc75010a06f16ab1b676ce905ae12e930a700a.1478542388.git.raphaelsc@scylladb.com>
(cherry picked from commit 9a9f0d3a0f)
2016-11-16 15:12:48 +02:00
Raphael S. Carvalho
8510389188 sstables: handle unrecognized sstable component
As in C*, unrecognized sstable components should be ignored when
loading a sstable. At the moment, Scylla fails to do so and will
not boot as a result. In addition, unknown components should be
remembered when moving a sstable or changing its generation.

Fixes #1780.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <b7af0c28e5b574fd577a7a1d28fb006ac197aa0a.1478025930.git.raphaelsc@scylladb.com>
(cherry picked from commit 53b7b7def3)
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <e30115e089a4c3c3fb4aad956645c9d006c2ee55.1479141101.git.raphaelsc@scylladb.com>
2016-11-16 15:11:05 +02:00
Amnon Heiman
ea61a8b410 API: cache_capacity should use uint for summing
Using integer as a type for the map_reduce causes number over overflow.

Fixes #1801

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
Message-Id: <1479299425-782-1-git-send-email-amnon@scylladb.com>
(cherry picked from commit a4be7afbb0)
2016-11-16 15:03:15 +02:00
Paweł Dziepak
bd694d845e partition_version: make sure that snapshot is destroyed under LSA
Snapshot destructor may free some objects managed by the LSA. That's why
partition_snapshot_reader destructor explicitly destroys the snapshot it
uses. However, it was possible that exception thrown by _read_section
prevented that from happenning making snapshot destoryed implicitly
without current allocator set to LSA.

Refs #1831.

Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
Message-Id: <1478778570-2795-1-git-send-email-pdziepak@scylladb.com>
(cherry picked from commit f16d6f9c40)
2016-11-16 14:34:11 +02:00
Paweł Dziepak
01c01d9ac4 query_pagers: distinct queries do not have clustering keys
Query pager needs to handle results that contain partitions with
possibly multiple clustering rows quite differently than results with
just one row per partition (for example a page may end in a middle of
partition). However, the logic dealing with partitions with clustering
rows doesn't work correctly for SELECT DISTINCT queries, which are
much more similar to the ones for schemas without clustering key.

The solution is to set _has_clustering_keys to false in case of SELECT
DISTINCT queries regardless of the schema which will make pager
correctly expect each partition to return at most one rows.

Fixes #1822.

Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
Message-Id: <1478612486-13421-1-git-send-email-pdziepak@scylladb.com>
(cherry picked from commit 055d78ee4c)
2016-11-16 10:17:34 +01:00
Paweł Dziepak
ed39e8c235 row_cache: touch entries read during range queries
Fixes #1847.

Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
Message-Id: <1479230809-27547-1-git-send-email-pdziepak@scylladb.com>
(cherry picked from commit 999dafbe57)
2016-11-15 20:34:40 +00:00
Avi Kivity
c57835e7b5 Merge "Fixes for histogram and moving average calculations" from Glauber
"JMX metrics were found to be either not showing, or showing absurd
values.  Turns out there were multiple things wrong with them. The
patches were sent separately but conflict with one another. This series
is a collection of the patches needed to fix the issues we saw.

Fixes #1832, #1836, #1837"

(cherry picked from commit bf20aa722b)
2016-11-13 11:42:53 +02:00
Amnon Heiman
13baa04056 API: fix a type in storage_proxy
This patch fixes a typo in the URL definition, causing the metric in the
jmx not to find it.

Fixes #1821

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
Message-Id: <1478563869-20504-1-git-send-email-amnon@scylladb.com>
(cherry picked from commit c8082ccadb)
2016-11-13 09:25:14 +02:00
Glauber Costa
298de37cef histogram: moving averages: fix inverted parameters
moving_averages constructor is defined like this:

    moving_average(latency_counter::duration interval, latency_counter::duration tick_interval)

But when it is time to initialize them, we do this:

	... {tick_interval(), std::chrono::minutes(1)} ...

As it can be seen, the interval and tick interval are inverted. This
leads to the metrics being assigned bogus values.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
Message-Id: <d83f09eed20ea2ea007d120544a003b2e0099732.1478798595.git.glauber@scylladb.com>
(cherry picked from commit d3f11fbabf)
2016-11-11 10:15:32 +02:00
Paweł Dziepak
91e5e50647 Merge "Remove quadratic behavior from atomic sstable deletion" from Avi
"The atomic sstable deletion provides exception safety at the cost of
quadratic behavior in the number of sstables awaiting deletion.  This
causes high cpu utilization during startup.

Change the code to avoid quadratic complexity, and add some unit tests.

See #1812."

(cherry picked from commit 985d2f6d4a)
2016-11-08 22:46:01 +02:00
Pekka Enberg
08b1ff53dd release: prepare for 1.5.rc1 2016-11-02 13:39:53 +02:00
Pekka Enberg
0485289741 cql3: Fix selecting same column multiple times
Under the hood, the selectable::add_and_get_index() function
deliberately filters out duplicate columns. This causes
simple_selector::get_output_row() to return a row with all duplicate
columns filtered out, which triggers and assertion because of row
mismatch with metadata (which contains the duplicate columns).

The fix is rather simple: just make selection::from_selectors() use
selection_with_processing if the number of selectors and column
definitions doesn't match -- like Apache Cassandra does.

Fixes #1367
Message-Id: <1477989740-6485-1-git-send-email-penberg@scylladb.com>

(cherry picked from commit e1e8ca2788)
2016-11-01 09:33:19 +00:00
Avi Kivity
b3504e5482 Update seastar submodule
* seastar 57a17ca...25137c2 (2):
  > reactor: improve task quota timer resolution
  > future: prioritise continuations that can run immediately

Fixes #1794.
2016-10-28 14:17:26 +03:00
Avi Kivity
6cdb1256bb Update seastar submodule
* seastar e2c2bbc...57a17ca (1):
  > rpc: Avoid using zero-copy interface of output_stream (Fixes #1786)
2016-10-28 14:11:47 +03:00
Pekka Enberg
39b0da51a3 auth: Fix resource level handling
We use `data_resource` class in the CQL parser, which let's users refer
to a table resource without specifying a keyspace. This asserts out in
get_level() for no good reason as we already know the intented level
based on the constructor. Therefore, change `data_resource` to track the
level like upstream Cassandra does and use that.

Fixes #1790

Message-Id: <1477599169-2945-1-git-send-email-penberg@scylladb.com>
(cherry picked from commit b54870764f)
2016-10-27 23:37:50 +03:00
Glauber Costa
0656e66f5f auth: always convert string to upper case before comparing
We store all auth perm strings in upper case, but the user might very
well pass this in upper case.

We could use a standard key comparator / hash here, but since the
strings tend to be small, the new sstring will likely be allocated in
the stack here and this approach yields significantly less code.

Fixes #1791.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
Message-Id: <51df92451e6e0a6325a005c19c95eaa55270da61.1477594199.git.glauber@scylladb.com>
(cherry picked from commit ef3c7ab38e)
2016-10-27 22:11:02 +03:00
Avi Kivity
185fbb8abc Merge "Cache fixes" from Paweł
"5ff699e09fcbd62611e78b9de601f6c8636ab2f0 ("row_cache: rework cache to
use fast forwarding reader") brought some significant changes to the
row cache implementation. Unfortunately, "significant changes" often
translates to "more bugs" and this time was no different.

This series contains fixes for the problems introduced in that rework
and makes failing dtest
bootstrap_test.py:TestBootstrap.local_quorum_bootstrap_test
pass again."

* 'pdziepak/cache-fixes/v1' of github.com:cloudius-systems/seastar-dev:
  row_cache: avoid dereferencing invalid iterator
  row_cache: set _first_element flag correctly
  row_cache: fix clearing continuity flag at eviction

(cherry picked from commit 72d78ffa7e)
2016-10-27 11:45:20 +03:00
Tomasz Grabiec
4ed3d350cc Update seastar submodule
* seastar ab1531e...e2c2bbc (3):
  > rpc: do not assume underling semaphore type
  > rpc: fix default resource limit
  > rpc: Move _connected flag to protocol::connection
2016-10-26 10:00:52 +02:00
Tomasz Grabiec
72d4a26c43 Update seastar submodule
* seastar f8e4e93...ab1531e (1):
  > rpc: Fix crash during connection teardown
2016-10-26 09:49:41 +02:00
Tomasz Grabiec
b582525ad8 Merge seastar upstream
(This time for real)

* seastar 69acec1...f8e4e93 (1):
  > rpc: Do not close client connection on error response for a timed out request

Refs #1778
2016-10-25 13:53:01 +02:00
Tomasz Grabiec
5ca372e852 Merge seastar upstream
* seastar 69acec1...f8e4e93 (1):
  > rpc: Do not close client connection on error response for a timed out request

Refs #1778
2016-10-25 13:45:58 +02:00
44 changed files with 826 additions and 355 deletions

2
.gitmodules vendored
View File

@@ -1,6 +1,6 @@
[submodule "seastar"]
path = seastar
url = ../seastar
url = ../scylla-seastar
ignore = dirty
[submodule "swagger-ui"]
path = swagger-ui

View File

@@ -1,6 +1,6 @@
#!/bin/sh
VERSION=666.development
VERSION=1.5.rc1
if test -f version
then

View File

@@ -777,7 +777,7 @@
]
},
{
"path": "/storage_proxy/metrics/read/moving_avrage_histogram",
"path": "/storage_proxy/metrics/read/moving_average_histogram",
"operations": [
{
"method": "GET",
@@ -792,7 +792,7 @@
]
},
{
"path": "/storage_proxy/metrics/range/moving_avrage_histogram",
"path": "/storage_proxy/metrics/range/moving_average_histogram",
"operations": [
{
"method": "GET",
@@ -942,7 +942,7 @@
]
},
{
"path": "/storage_proxy/metrics/write/moving_avrage_histogram",
"path": "/storage_proxy/metrics/write/moving_average_histogram",
"operations": [
{
"method": "GET",

View File

@@ -194,7 +194,7 @@ void set_cache_service(http_context& ctx, routes& r) {
});
cs::get_row_capacity.set(r, [&ctx] (std::unique_ptr<request> req) {
return map_reduce_cf(ctx, 0, [](const column_family& cf) {
return map_reduce_cf(ctx, uint64_t(0), [](const column_family& cf) {
return cf.get_row_cache().get_cache_tracker().region().occupancy().used_space();
}, std::plus<uint64_t>());
});

View File

@@ -47,11 +47,8 @@
const sstring auth::data_resource::ROOT_NAME("data");
auth::data_resource::data_resource(level l, const sstring& ks, const sstring& cf)
: _ks(ks), _cf(cf)
: _level(l), _ks(ks), _cf(cf)
{
if (l != get_level()) {
throw std::invalid_argument("level/keyspace/column mismatch");
}
}
auth::data_resource::data_resource()
@@ -67,14 +64,7 @@ auth::data_resource::data_resource(const sstring& ks, const sstring& cf)
{}
auth::data_resource::level auth::data_resource::get_level() const {
if (!_cf.empty()) {
assert(!_ks.empty());
return level::COLUMN_FAMILY;
}
if (!_ks.empty()) {
return level::KEYSPACE;
}
return level::ROOT;
return _level;
}
auth::data_resource auth::data_resource::from_name(

View File

@@ -56,6 +56,7 @@ private:
static const sstring ROOT_NAME;
level _level;
sstring _ks;
sstring _cf;

View File

@@ -40,6 +40,7 @@
*/
#include <unordered_map>
#include <boost/algorithm/string.hpp>
#include "permission.hh"
const auth::permission_set auth::permissions::ALL_DATA =
@@ -75,7 +76,9 @@ const sstring& auth::permissions::to_string(permission p) {
}
auth::permission auth::permissions::from_string(const sstring& s) {
return permission_names.at(s);
sstring upper(s);
boost::to_upper(upper);
return permission_names.at(upper);
}
std::unordered_set<sstring> auth::permissions::to_strings(const permission_set& set) {

View File

@@ -409,29 +409,6 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner
# the smaller of 1/4 of heap or 512MB.
# file_cache_size_in_mb: 512
# Total permitted memory to use for memtables. Scylla will stop
# accepting writes when the limit is exceeded until a flush completes,
# and will trigger a flush based on memtable_cleanup_threshold
# If omitted, Scylla will set both to 1/4 the size of the heap.
# memtable_heap_space_in_mb: 2048
# memtable_offheap_space_in_mb: 2048
# Ratio of occupied non-flushing memtable size to total permitted size
# that will trigger a flush of the largest memtable. Lager mct will
# mean larger flushes and hence less compaction, but also less concurrent
# flush activity which can make it difficult to keep your disks fed
# under heavy write load.
#
# memtable_cleanup_threshold defaults to 1 / (memtable_flush_writers + 1)
# memtable_cleanup_threshold: 0.11
# Specify the way Scylla allocates and manages memtable memory.
# Options are:
# heap_buffers: on heap nio buffers
# offheap_buffers: off heap (direct) nio buffers
# offheap_objects: native memory, eliminating nio buffer heap overhead
# memtable_allocation_type: heap_buffers
# Total space to use for commitlogs.
#
# If space gets above this value (it will round up to the next nearest
@@ -443,17 +420,6 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner
# available for Scylla.
commitlog_total_space_in_mb: -1
# This sets the amount of memtable flush writer threads. These will
# be blocked by disk io, and each one will hold a memtable in memory
# while blocked.
#
# memtable_flush_writers defaults to the smaller of (number of disks,
# number of cores), with a minimum of 2 and a maximum of 8.
#
# If your data directories are backed by SSD, you should increase this
# to the number of cores.
#memtable_flush_writers: 8
# A fixed memory pool size in MB for for SSTable index summaries. If left
# empty, this will default to 5% of the heap size. If the memory usage of
# all index summaries exceeds this limit, SSTables with low read rates will

View File

@@ -221,6 +221,7 @@ scylla_tests = [
'tests/database_test',
'tests/nonwrapping_range_test',
'tests/input_stream_test',
'tests/sstable_atomic_deletion_test',
]
apps = [
@@ -307,6 +308,7 @@ scylla_core = (['database.cc',
'sstables/compaction.cc',
'sstables/compaction_strategy.cc',
'sstables/compaction_manager.cc',
'sstables/atomic_deletion.cc',
'transport/event.cc',
'transport/event_notifier.cc',
'transport/server.cc',

View File

@@ -232,7 +232,7 @@ uint32_t selection::add_column_for_ordering(const column_definition& c) {
raw_selector::to_selectables(raw_selectors, schema), db, schema, defs);
auto metadata = collect_metadata(schema, raw_selectors, *factories);
if (processes_selection(raw_selectors)) {
if (processes_selection(raw_selectors) || raw_selectors.size() != defs.size()) {
return ::make_shared<selection_with_processing>(schema, std::move(defs), std::move(metadata), std::move(factories));
} else {
return ::make_shared<simple_selection>(schema, std::move(defs), std::move(metadata), false);

View File

@@ -97,28 +97,28 @@ lw_shared_ptr<memtable_list>
column_family::make_memory_only_memtable_list() {
auto seal = [this] (memtable_list::flush_behavior ignored) { return make_ready_future<>(); };
auto get_schema = [this] { return schema(); };
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_manager);
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.dirty_memory_manager);
}
lw_shared_ptr<memtable_list>
column_family::make_memtable_list() {
auto seal = [this] (memtable_list::flush_behavior behavior) { return seal_active_memtable(behavior); };
auto get_schema = [this] { return schema(); };
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_manager);
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.dirty_memory_manager);
}
lw_shared_ptr<memtable_list>
column_family::make_streaming_memtable_list() {
auto seal = [this] (memtable_list::flush_behavior behavior) { return seal_active_streaming_memtable(behavior); };
auto get_schema = [this] { return schema(); };
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_streaming_memtable_size, _config.streaming_dirty_memory_manager);
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.streaming_dirty_memory_manager);
}
lw_shared_ptr<memtable_list>
column_family::make_streaming_memtable_big_list(streaming_memtable_big& smb) {
auto seal = [this, &smb] (memtable_list::flush_behavior) { return seal_active_streaming_memtable_big(smb); };
auto get_schema = [this] { return schema(); };
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_streaming_memtable_size, _config.streaming_dirty_memory_manager);
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.streaming_dirty_memory_manager);
}
column_family::column_family(schema_ptr schema, config config, db::commitlog* cl, compaction_manager& compaction_manager)
@@ -912,10 +912,6 @@ column_family::seal_active_streaming_memtable_delayed() {
return make_ready_future<>();
}
if (_streaming_memtables->should_flush()) {
return seal_active_streaming_memtable_immediate();
}
if (!_delayed_streaming_flush.armed()) {
// We don't want to wait for too long, because the incoming mutations will not be available
// until we flush them to SSTables. On top of that, if the sender ran out of messages, it won't
@@ -946,8 +942,7 @@ column_family::seal_active_streaming_memtable_immediate() {
auto current_waiters = std::exchange(_waiting_streaming_flushes, shared_promise<>());
auto f = current_waiters.get_shared_future(); // for this seal
_config.streaming_dirty_memory_manager->serialize_flush([this, old] {
return with_lock(_sstables_lock.for_read(), [this, old] {
with_lock(_sstables_lock.for_read(), [this, old] {
auto newtab = make_lw_shared<sstables::sstable>(_schema,
_config.datadir, calculate_generation_for_new_table(),
sstables::sstable::version_types::ka,
@@ -980,7 +975,6 @@ column_family::seal_active_streaming_memtable_immediate() {
});
// We will also not have any retry logic. If we fail here, we'll fail the streaming and let
// the upper layers know. They can then apply any logic they want here.
});
}).then_wrapped([this, current_waiters = std::move(current_waiters)] (future <> f) mutable {
if (f.failed()) {
current_waiters.set_exception(f.get_exception());
@@ -1044,12 +1038,10 @@ column_family::seal_active_memtable(memtable_list::flush_behavior ignored) {
_config.cf_stats->pending_memtables_flushes_count++;
_config.cf_stats->pending_memtables_flushes_bytes += memtable_size;
return _config.dirty_memory_manager->serialize_flush([this, old] {
return repeat([this, old] {
return with_lock(_sstables_lock.for_read(), [this, old] {
_flush_queue->check_open_gate();
return try_flush_memtable_to_sstable(old);
});
return repeat([this, old] {
return with_lock(_sstables_lock.for_read(), [this, old] {
_flush_queue->check_open_gate();
return try_flush_memtable_to_sstable(old);
});
}).then([this, memtable_size] {
_config.cf_stats->pending_memtables_flushes_count--;
@@ -1131,15 +1123,15 @@ column_family::start() {
future<>
column_family::stop() {
_memtables->seal_active_memtable(memtable_list::flush_behavior::immediate);
_streaming_memtables->seal_active_memtable(memtable_list::flush_behavior::immediate);
return _compaction_manager.remove(this).then([this] {
// Nest, instead of using when_all, so we don't lose any exceptions.
return _flush_queue->close().then([this] {
return _streaming_flush_gate.close();
return when_all(_memtables->request_flush(), _streaming_memtables->request_flush()).discard_result().finally([this] {
return _compaction_manager.remove(this).then([this] {
// Nest, instead of using when_all, so we don't lose any exceptions.
return _flush_queue->close().then([this] {
return _streaming_flush_gate.close();
});
}).then([this] {
return _sstable_deletion_gate.close();
});
}).then([this] {
return _sstable_deletion_gate.close();
});
}
@@ -1304,7 +1296,17 @@ column_family::rebuild_sstable_list(const std::vector<sstables::shared_sstable>&
// Second, delete the old sstables. This is done in the background, so we can
// consider this compaction completed.
seastar::with_gate(_sstable_deletion_gate, [this, sstables_to_remove] {
return sstables::delete_atomically(sstables_to_remove).then([this, sstables_to_remove] {
return sstables::delete_atomically(sstables_to_remove).then_wrapped([this, sstables_to_remove] (future<> f) {
std::exception_ptr eptr;
try {
f.get();
} catch(...) {
eptr = std::current_exception();
}
// unconditionally remove compacted sstables from _sstables_compacted_but_not_deleted,
// or they could stay forever in the set, resulting in deleted files remaining
// opened and disk space not being released until shutdown.
std::unordered_set<sstables::shared_sstable> s(
sstables_to_remove.begin(), sstables_to_remove.end());
auto e = boost::range::remove_if(_sstables_compacted_but_not_deleted, [&] (sstables::shared_sstable sst) -> bool {
@@ -1312,6 +1314,11 @@ column_family::rebuild_sstable_list(const std::vector<sstables::shared_sstable>&
});
_sstables_compacted_but_not_deleted.erase(e, _sstables_compacted_but_not_deleted.end());
rebuild_statistics();
if (eptr) {
return make_exception_future<>(eptr);
}
return make_ready_future<>();
}).handle_exception([] (std::exception_ptr e) {
try {
std::rethrow_exception(e);
@@ -1644,7 +1651,7 @@ database::database(const db::config& cfg)
// Note that even if we didn't allow extra memory, we would still want to keep system requests
// in a different region group. This is because throttled requests are serviced in FIFO order,
// and we don't want system requests to be waiting for a long time behind user requests.
, _system_dirty_memory_manager(*this, _memtable_total_space + (10 << 20))
, _system_dirty_memory_manager(*this, _memtable_total_space / 2 + (10 << 20))
// The total space that can be used by memtables is _memtable_total_space, but we will only
// allow the region_group to grow to half of that. This is because of virtual_dirty: memtables
// can take a long time to flush, and if we are using the maximum amount of memory possible,
@@ -2167,8 +2174,6 @@ keyspace::make_column_family_config(const schema& s, const db::config& db_config
cfg.enable_disk_writes = _config.enable_disk_writes;
cfg.enable_commitlog = _config.enable_commitlog;
cfg.enable_cache = _config.enable_cache;
cfg.max_memtable_size = _config.max_memtable_size;
cfg.max_streaming_memtable_size = _config.max_streaming_memtable_size;
cfg.dirty_memory_manager = _config.dirty_memory_manager;
cfg.streaming_dirty_memory_manager = _config.streaming_dirty_memory_manager;
cfg.read_concurrency_config = _config.read_concurrency_config;
@@ -2468,7 +2473,6 @@ column_family::apply(const mutation& m, const db::replay_position& rp) {
utils::latency_counter lc;
_stats.writes.set_latency(lc);
_memtables->active_memtable().apply(m, rp);
_memtables->seal_on_overflow();
_stats.writes.mark(lc);
if (lc.is_start()) {
_stats.estimated_write.add(lc.latency(), _stats.writes.hist.count);
@@ -2481,7 +2485,6 @@ column_family::apply(const frozen_mutation& m, const schema_ptr& m_schema, const
_stats.writes.set_latency(lc);
check_valid_rp(rp);
_memtables->active_memtable().apply(m, m_schema, rp);
_memtables->seal_on_overflow();
_stats.writes.mark(lc);
if (lc.is_start()) {
_stats.estimated_write.add(lc.latency(), _stats.writes.hist.count);
@@ -2494,7 +2497,6 @@ void column_family::apply_streaming_mutation(schema_ptr m_schema, utils::UUID pl
return;
}
_streaming_memtables->active_memtable().apply(m, m_schema);
_streaming_memtables->seal_on_overflow();
}
void column_family::apply_streaming_big_mutation(schema_ptr m_schema, utils::UUID plan_id, const frozen_mutation& m) {
@@ -2505,7 +2507,6 @@ void column_family::apply_streaming_big_mutation(schema_ptr m_schema, utils::UUI
}
auto entry = it->second;
entry->memtables->active_memtable().apply(m, m_schema);
entry->memtables->seal_on_overflow();
}
void
@@ -2517,51 +2518,113 @@ column_family::check_valid_rp(const db::replay_position& rp) const {
future<> dirty_memory_manager::shutdown() {
_db_shutdown_requested = true;
return _waiting_flush_gate.close().then([this] {
_should_flush.signal();
return std::move(_waiting_flush).then([this] {
return _region_group.shutdown();
});
}
void dirty_memory_manager::maybe_do_active_flush() {
if (!_db || !under_pressure() || _db_shutdown_requested) {
return;
future<> memtable_list::request_flush() {
if (!_flush_coalescing) {
_flush_coalescing = shared_promise<>();
return _dirty_memory_manager->get_flush_permit().then([this] (auto permit) {
auto current_flush = std::move(*_flush_coalescing);
_flush_coalescing = {};
return _dirty_memory_manager->flush_one(*this, std::move(permit)).then_wrapped([this, current_flush = std::move(current_flush)] (auto f) mutable {
if (f.failed()) {
current_flush.set_exception(f.get_exception());
} else {
current_flush.set_value();
}
});
});
} else {
return _flush_coalescing->get_shared_future();
}
// Flush already ongoing. We don't need to initiate an active flush at this moment.
if (_flush_serializer.current() == 0) {
return;
}
// There are many criteria that can be used to select what is the best memtable to
// flush. Most of the time we want some coordination with the commitlog to allow us to
// release commitlog segments as early as we can.
//
// But during pressure condition, we'll just pick the CF that holds the largest
// memtable. The advantage of doing this is that this is objectively the one that will
// release the biggest amount of memory and is less likely to be generating tiny
// SSTables. The disadvantage is that right now, because we only release memory when the
// SSTable is fully written, that may take a bit of time to happen.
//
// However, since we'll very soon have a mechanism in place to account for the memory
// that was already written in one form or another, that disadvantage is mitigated.
memtable& biggest_memtable = memtable::from_region(*_region_group.get_largest_region());
auto& biggest_cf = _db->find_column_family(biggest_memtable.schema());
memtable_list& mtlist = get_memtable_list(biggest_cf);
// Please note that this will eventually take the semaphore and prevent two concurrent flushes.
// We don't need any other extra protection.
mtlist.seal_active_memtable(memtable_list::flush_behavior::immediate);
}
memtable_list& memtable_dirty_memory_manager::get_memtable_list(column_family& cf) {
return *(cf._memtables);
future<> dirty_memory_manager::flush_one(memtable_list& mtlist, semaphore_units<> permit) {
if (mtlist.back()->empty()) {
return make_ready_future<>();
}
auto* region = &(mtlist.back()->region());
auto* region_group = mtlist.back()->region_group();
auto schema = mtlist.back()->schema();
// Because the region groups are hierarchical, when we pick the biggest region creating pressure
// (in the memory-driven flush case) we may be picking a memtable that is placed in a region
// group below ours. That's totally fine and we can certainly use our semaphore to account for
// it, but we need to destroy the semaphore units from the right flush manager.
//
// If we abandon size-driven flush and go with another flushing scheme that always guarantees
// that we're picking from this region_group, we can simplify this.
dirty_memory_manager::from_region_group(region_group)._flush_manager.emplace(region, std::move(permit));
auto fut = mtlist.seal_active_memtable(memtable_list::flush_behavior::immediate);
return get_units(_background_work_flush_serializer, 1).then([this, fut = std::move(fut), region, region_group, schema] (auto permit) mutable {
return std::move(fut).then_wrapped([this, region, region_group, schema] (auto f) {
// There are two cases in which we may still need to remove the permits from here.
//
// 1) Some exception happenend, and we can't know at which point. It could be that because
// of that, the permits are still dangling. We have to remove it.
// 2) If we are using a memory-only Column Family. That will never create a memtable
// flush object, and we'll never get rid of the permits. So we have to remove it
// here.
dirty_memory_manager::from_region_group(region_group).remove_from_flush_manager(region);
if (f.failed()) {
dblog.error("Failed to flush memtable, {}:{}", schema->ks_name(), schema->cf_name());
}
return std::move(f);
});
});
}
memtable_list& streaming_dirty_memory_manager::get_memtable_list(column_family& cf) {
return *(cf._streaming_memtables);
future<> dirty_memory_manager::flush_when_needed() {
if (!_db) {
return make_ready_future<>();
}
// If there are explicit flushes requested, we must wait for them to finish before we stop.
return do_until([this] { return _db_shutdown_requested && !_flush_serializer.waiters(); }, [this] {
auto has_work = [this] { return _flush_serializer.waiters() || over_soft_limit() || _db_shutdown_requested; };
return _should_flush.wait(std::move(has_work)).then([this] {
return get_flush_permit().then([this] (auto permit) {
// We give priority to explicit flushes. They are mainly user-initiated flushes,
// flushes coming from a DROP statement, or commitlog flushes.
if (_flush_serializer.waiters()) {
return make_ready_future<>();
}
// condition abated while we waited for the semaphore
if (!this->over_soft_limit() || _db_shutdown_requested) {
return make_ready_future<>();
}
// There are many criteria that can be used to select what is the best memtable to
// flush. Most of the time we want some coordination with the commitlog to allow us to
// release commitlog segments as early as we can.
//
// But during pressure condition, we'll just pick the CF that holds the largest
// memtable. The advantage of doing this is that this is objectively the one that will
// release the biggest amount of memory and is less likely to be generating tiny
// SSTables.
memtable& biggest_memtable = memtable::from_region(*(this->_region_group.get_largest_region()));
auto mtlist = biggest_memtable.get_memtable_list();
// Do not wait. The semaphore will protect us against a concurrent flush. But we
// want to start a new one as soon as the permits are destroyed and the semaphore is
// made ready again, not when we are done with the current one.
this->flush_one(*mtlist, std::move(permit));
return make_ready_future<>();
});
});
}).finally([this] {
// We'll try to acquire the permit here to make sure we only really stop when there are no
// in-flight flushes. Our stop condition checks for the presence of waiters, but it could be
// that we have no waiters, but a flush still in flight. We wait for all background work to
// stop. When that stops, we know that the foreground work in the _flush_serializer has
// stopped as well.
return get_units(_background_work_flush_serializer, _max_background_work);
});
}
void dirty_memory_manager::start_reclaiming() {
maybe_do_active_flush();
_should_flush.signal();
}
future<> database::apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::replay_position rp) {
@@ -2637,10 +2700,6 @@ database::make_keyspace_config(const keyspace_metadata& ksm) {
cfg.enable_disk_reads = true; // we allways read from disk
cfg.enable_commitlog = ksm.durable_writes() && _cfg->enable_commitlog() && !_cfg->enable_in_memory_data_store();
cfg.enable_cache = _cfg->enable_cache();
cfg.max_memtable_size = _memtable_total_space * _cfg->memtable_cleanup_threshold();
// We should guarantee that at least two memtable are available, otherwise after flush, adding another memtable would
// easily take us into throttling until the first one is flushed.
cfg.max_streaming_memtable_size = std::min(cfg.max_memtable_size, _streaming_memtable_total_space / 2);
} else {
cfg.datadir = "";
@@ -2648,9 +2707,6 @@ database::make_keyspace_config(const keyspace_metadata& ksm) {
cfg.enable_disk_reads = false;
cfg.enable_commitlog = false;
cfg.enable_cache = false;
cfg.max_memtable_size = std::numeric_limits<size_t>::max();
// All writes should go to the main memtable list if we're not durable
cfg.max_streaming_memtable_size = 0;
}
cfg.dirty_memory_manager = &_dirty_memory_manager;
cfg.streaming_dirty_memory_manager = &_streaming_dirty_memory_manager;
@@ -3097,21 +3153,17 @@ future<std::unordered_map<sstring, column_family::snapshot_details>> column_fami
future<> column_family::flush() {
_stats.pending_flushes++;
auto fut = _memtables->seal_active_memtable(memtable_list::flush_behavior::immediate);
// this rp is either:
// a.) Done - no-op
// b.) Ours
// c.) The last active flush not finished. If our latest memtable is
// empty it still makes sense for this api call to wait for this.
auto high_rp = _highest_flushed_rp;
return fut.finally([this, high_rp] {
// highest_flushed_rp is only updated when we flush. If the memtable is currently alive, then
// the most up2date replay position is the one that's in there now. Otherwise, if the memtable
// hasn't received any writes yet, that's the one from the last flush we made.
auto desired_rp = _memtables->back()->empty() ? _highest_flushed_rp : _memtables->back()->replay_position();
return _memtables->request_flush().finally([this, desired_rp] {
_stats.pending_flushes--;
// In origin memtable_switch_count is incremented inside
// ColumnFamilyMeetrics Flush.run
_stats.memtable_switch_count++;
// wait for all up until us.
return _flush_queue->wait_for_pending(high_rp);
return _flush_queue->wait_for_pending(desired_rp);
});
}
@@ -3128,7 +3180,7 @@ future<> column_family::flush(const db::replay_position& pos) {
// We ignore this for now and just say that if we're asked for
// a CF and it exists, we pretty much have to have data that needs
// flushing. Let's do it.
return _memtables->seal_active_memtable(memtable_list::flush_behavior::immediate);
return _memtables->request_flush();
}
// FIXME: We can do much better than this in terms of cache management. Right
@@ -3169,7 +3221,7 @@ future<> column_family::flush_streaming_big_mutations(utils::UUID plan_id) {
}
auto entry = it->second;
_streaming_memtables_big.erase(it);
return entry->memtables->seal_active_memtable(memtable_list::flush_behavior::immediate).then([entry] {
return entry->memtables->request_flush().then([entry] {
return entry->flush_in_progress.close();
}).then([this, entry] {
return parallel_for_each(entry->sstables, [this] (auto& sst) {

View File

@@ -119,28 +119,54 @@ class dirty_memory_manager: public logalloc::region_group_reclaimer {
// throttled for a long time. Even when we have virtual dirty, that only provides a rough
// estimate, and we can't release requests that early.
semaphore _flush_serializer;
// We will accept a new flush before another one ends, once it is done with the data write.
// That is so we can keep the disk always busy. But there is still some background work that is
// left to be done. Mostly, update the caches and seal the auxiliary components of the SSTable.
// This semaphore will cap the amount of background work that we have. Note that we're not
// overly concerned about memtable memory, because dirty memory will put a limit to that. This
// is mostly about dangling continuations. So that doesn't have to be a small number.
static constexpr unsigned _max_background_work = 20;
semaphore _background_work_flush_serializer = { _max_background_work };
condition_variable _should_flush;
int64_t _dirty_bytes_released_pre_accounted = 0;
seastar::gate _waiting_flush_gate;
std::vector<shared_memtable> _pending_flushes;
void maybe_do_active_flush();
future<> flush_when_needed();
// We need to start a flush before the current one finishes, otherwise
// we'll have a period without significant disk activity when the current
// SSTable is being sealed, the caches are being updated, etc. To do that
// we need to keep track of who is it that we are flushing this memory from.
std::unordered_map<const logalloc::region*, semaphore_units<>> _flush_manager;
void remove_from_flush_manager(const logalloc::region *region) {
// If the flush fails, but the failure happens after we reverted the dirty changes, we
// won't find the region here, because it would have been destroyed already. That's
// ultimately fine, we just need to check it. If we really want to restrict the new attempt
// to run concurrently with a new flush, it can call into the dirty manager to reaquire the
// semaphore. Right now we don't bother.
auto it = _flush_manager.find(region);
if (it != _flush_manager.end()) {
_flush_manager.erase(it);
}
}
future<> _waiting_flush;
protected:
virtual memtable_list& get_memtable_list(column_family& cf) = 0;
virtual void start_reclaiming() override;
public:
future<> shutdown();
dirty_memory_manager(database* db, size_t threshold)
: logalloc::region_group_reclaimer(threshold)
: logalloc::region_group_reclaimer(threshold, threshold * 0.4)
, _db(db)
, _region_group(*this)
, _flush_serializer(1) {}
, _flush_serializer(1)
, _waiting_flush(flush_when_needed()) {}
dirty_memory_manager(database* db, dirty_memory_manager *parent, size_t threshold)
: logalloc::region_group_reclaimer(threshold)
: logalloc::region_group_reclaimer(threshold, threshold * 0.4)
, _db(db)
, _region_group(&parent->_region_group, *this)
, _flush_serializer(1) {}
, _flush_serializer(1)
, _waiting_flush(flush_when_needed()) {}
static dirty_memory_manager& from_region_group(logalloc::region_group *rg) {
return *(boost::intrusive::get_parent_from_member(rg, &dirty_memory_manager::_region_group));
@@ -154,12 +180,19 @@ public:
return _region_group;
}
void revert_potentially_cleaned_up_memory(int64_t delta) {
void revert_potentially_cleaned_up_memory(logalloc::region* from, int64_t delta) {
_region_group.update(delta);
_dirty_bytes_released_pre_accounted -= delta;
// Flushed the current memtable. There is still some work to do, like finish sealing the
// SSTable and updating the cache, but we can already allow the next one to start.
//
// By erasing this memtable from the flush_manager we'll destroy the semaphore_units
// associated with this flush and will allow another one to start. We'll signal the
// condition variable to let them know we might be ready early.
remove_from_flush_manager(from);
}
void account_potentially_cleaned_up_memory(int64_t delta) {
void account_potentially_cleaned_up_memory(logalloc::region* from, int64_t delta) {
_region_group.update(-delta);
_dirty_bytes_released_pre_accounted += delta;
}
@@ -172,25 +205,18 @@ public:
return _region_group.memory_used();
}
template <typename Func>
future<> serialize_flush(Func&& func) {
return seastar::with_gate(_waiting_flush_gate, [this, func] () mutable {
return with_semaphore(_flush_serializer, 1, func).finally([this] {
maybe_do_active_flush();
});
});
future<> flush_one(memtable_list& cf, semaphore_units<> permit);
future<semaphore_units<>> get_flush_permit() {
return get_units(_flush_serializer, 1);
}
};
class streaming_dirty_memory_manager: public dirty_memory_manager {
virtual memtable_list& get_memtable_list(column_family& cf) override;
public:
struct streaming_dirty_memory_manager: public dirty_memory_manager {
streaming_dirty_memory_manager(database& db, dirty_memory_manager *parent, size_t threshold) : dirty_memory_manager(&db, parent, threshold) {}
};
class memtable_dirty_memory_manager: public dirty_memory_manager {
virtual memtable_list& get_memtable_list(column_family& cf) override;
public:
struct memtable_dirty_memory_manager: public dirty_memory_manager {
memtable_dirty_memory_manager(database& db, dirty_memory_manager* parent, size_t threshold) : dirty_memory_manager(&db, parent, threshold) {}
// This constructor will be called for the system tables (no parent). Its flushes are usually drive by us
// and not the user, and tend to be small in size. So we'll allow only two slots.
@@ -225,14 +251,13 @@ private:
std::vector<shared_memtable> _memtables;
std::function<future<> (flush_behavior)> _seal_fn;
std::function<schema_ptr()> _current_schema;
size_t _max_memtable_size;
dirty_memory_manager* _dirty_memory_manager;
std::experimental::optional<shared_promise<>> _flush_coalescing;
public:
memtable_list(std::function<future<> (flush_behavior)> seal_fn, std::function<schema_ptr()> cs, size_t max_memtable_size, dirty_memory_manager* dirty_memory_manager)
memtable_list(std::function<future<> (flush_behavior)> seal_fn, std::function<schema_ptr()> cs, dirty_memory_manager* dirty_memory_manager)
: _memtables({})
, _seal_fn(seal_fn)
, _current_schema(cs)
, _max_memtable_size(max_memtable_size)
, _dirty_memory_manager(dirty_memory_manager) {
add_memtable();
}
@@ -281,20 +306,17 @@ public:
_memtables.emplace_back(new_memtable());
}
bool should_flush() {
return active_memtable().occupancy().total_space() >= _max_memtable_size;
}
void seal_on_overflow() {
if (should_flush()) {
// FIXME: if sparse, do some in-memory compaction first
// FIXME: maybe merge with other in-memory memtables
seal_active_memtable(flush_behavior::immediate);
}
logalloc::region_group& region_group() {
return _dirty_memory_manager->region_group();
}
// This is used for explicit flushes. Will queue the memtable for flushing and proceed when the
// dirty_memory_manager allows us to. We will not seal at this time since the flush itself
// wouldn't happen anyway. Keeping the memtable in memory will potentially increase the time it
// spends in memory allowing for more coalescing opportunities.
future<> request_flush();
private:
lw_shared_ptr<memtable> new_memtable() {
return make_lw_shared<memtable>(_current_schema(), &(_dirty_memory_manager->region_group()));
return make_lw_shared<memtable>(_current_schema(), this);
}
};
@@ -328,8 +350,6 @@ public:
bool enable_cache = true;
bool enable_commitlog = true;
bool enable_incremental_backups = false;
size_t max_memtable_size = 5'000'000;
size_t max_streaming_memtable_size = 5'000'000;
::dirty_memory_manager* dirty_memory_manager = &default_dirty_memory_manager;
::dirty_memory_manager* streaming_dirty_memory_manager = &default_dirty_memory_manager;
restricted_mutation_reader_config read_concurrency_config;
@@ -882,8 +902,6 @@ public:
bool enable_disk_writes = true;
bool enable_cache = true;
bool enable_incremental_backups = false;
size_t max_memtable_size = 5'000'000;
size_t max_streaming_memtable_size = 5'000'000;
::dirty_memory_manager* dirty_memory_manager = &default_dirty_memory_manager;
::dirty_memory_manager* streaming_dirty_memory_manager = &default_dirty_memory_manager;
restricted_mutation_reader_config read_concurrency_config;

View File

@@ -23,6 +23,7 @@
// database.hh
class database;
class memtable_list;
// mutation.hh
class mutation;

View File

@@ -288,6 +288,7 @@ public:
void flush_segments(bool = false);
private:
size_t max_request_controller_units() const;
segment_id_type _ids = 0;
std::vector<sseg_ptr> _segments;
std::deque<sseg_ptr> _reserve_segments;
@@ -911,7 +912,7 @@ db::commitlog::segment_manager::segment_manager(config c)
// an existing in-flight buffer. Since we'll force the cycling() of any buffer that is bigger
// than default_size at the end of the allocation, that allows for every valid mutation to
// always be admitted for processing.
, _request_controller(max_mutation_size + db::commitlog::segment::default_size)
, _request_controller(max_request_controller_units())
{
assert(max_size > 0);
@@ -922,6 +923,10 @@ db::commitlog::segment_manager::segment_manager(config c)
_regs = create_counters();
}
size_t db::commitlog::segment_manager::max_request_controller_units() const {
return max_mutation_size + db::commitlog::segment::default_size;
}
future<std::vector<db::commitlog::descriptor>>
db::commitlog::segment_manager::list_descriptors(sstring dirname) {
struct helper {
@@ -1233,11 +1238,14 @@ future<> db::commitlog::segment_manager::sync_all_segments(bool shutdown) {
future<> db::commitlog::segment_manager::shutdown() {
if (!_shutdown) {
_shutdown = true; // no re-arm, no create new segments.
_timer.cancel(); // no more timer calls
// Now first wait for periodic task to finish, then sync and close all
// segments, flushing out any remaining data.
return _gate.close().then(std::bind(&segment_manager::sync_all_segments, this, true));
// Wait for all pending requests to finish
return get_units(_request_controller, max_request_controller_units()).then([this] (auto permits) {
_timer.cancel(); // no more timer calls
_shutdown = true; // no re-arm, no create new segments.
// Now first wait for periodic task to finish, then sync and close all
// segments, flushing out any remaining data.
return _gate.close().then(std::bind(&segment_manager::sync_all_segments, this, true));
});
}
return make_ready_future<>();
}

View File

@@ -334,7 +334,7 @@ public:
"\toffheap_buffers Off heap (direct) NIO buffers.\n" \
"\toffheap_objects Native memory, eliminating NIO buffer heap overhead." \
) \
val(memtable_cleanup_threshold, double, .11, Used, \
val(memtable_cleanup_threshold, double, .11, Invalid, \
"Ratio of occupied non-flushing memtable size to total permitted size for triggering a flush of the largest memtable. Larger values mean larger flushes and less compaction, but also less concurrent flush activity, which can make it difficult to keep your disks saturated under heavy write load." \
) \
val(file_cache_size_in_mb, uint32_t, 512, Unused, \

View File

@@ -184,8 +184,8 @@ public:
throw;
}
});
} catch (std::system_error& e) {
startlog.error("Directory '{}' not found. Tried to created it but failed: {}", path, e.what());
} catch (...) {
startlog.error("Directory '{}' cannot be initialized. Tried to do it but failed with: {}", path, std::current_exception());
throw;
}
});

View File

@@ -26,8 +26,16 @@
namespace stdx = std::experimental;
memtable::memtable(schema_ptr schema, logalloc::region_group* dirty_memory_region_group)
memtable::memtable(schema_ptr schema, memtable_list* memtable_list)
: logalloc::region(memtable_list ? logalloc::region(memtable_list->region_group()) : logalloc::region())
, _memtable_list(memtable_list)
, _schema(std::move(schema))
, partitions(memtable_entry::compare(_schema)) {
}
memtable::memtable(schema_ptr schema, logalloc::region_group *dirty_memory_region_group)
: logalloc::region(dirty_memory_region_group ? logalloc::region(*dirty_memory_region_group) : logalloc::region())
, _memtable_list(nullptr)
, _schema(std::move(schema))
, partitions(memtable_entry::compare(_schema)) {
}
@@ -254,7 +262,7 @@ class flush_memory_accounter {
public:
void update_bytes_read(uint64_t delta) {
_bytes_read += delta;
dirty_memory_manager::from_region_group(_region.group()).account_potentially_cleaned_up_memory(delta);
dirty_memory_manager::from_region_group(_region.group()).account_potentially_cleaned_up_memory(&_region, delta);
}
explicit flush_memory_accounter(logalloc::region& region)
@@ -263,7 +271,7 @@ public:
~flush_memory_accounter() {
assert(_bytes_read <= _region.occupancy().used_space());
dirty_memory_manager::from_region_group(_region.group()).revert_potentially_cleaned_up_memory(_bytes_read);
dirty_memory_manager::from_region_group(_region.group()).revert_potentially_cleaned_up_memory(&_region, _bytes_read);
}
void account_component(memtable_entry& e) {
auto delta = _region.allocator().object_memory_size_in_allocator(&e)

View File

@@ -101,6 +101,7 @@ public:
bi::member_hook<memtable_entry, bi::set_member_hook<>, &memtable_entry::_link>,
bi::compare<memtable_entry::compare>>;
private:
memtable_list *_memtable_list;
schema_ptr _schema;
logalloc::allocating_section _read_section;
logalloc::allocating_section _allocating_section;
@@ -116,7 +117,9 @@ private:
partition_entry& find_or_create_partition_slow(partition_key_view key);
void upgrade_entry(memtable_entry&);
public:
explicit memtable(schema_ptr schema, logalloc::region_group* dirty_memory_region_group = nullptr);
explicit memtable(schema_ptr schema, memtable_list *memtable_list);
// Used for testing that want to control the flush process.
explicit memtable(schema_ptr schema, logalloc::region_group *dirty_memrory_region= nullptr);
~memtable();
schema_ptr schema() const { return _schema; }
void set_schema(schema_ptr) noexcept;
@@ -134,7 +137,15 @@ public:
const logalloc::region& region() const {
return *this;
}
logalloc::region_group* region_group() {
return group();
}
public:
memtable_list* get_memtable_list() {
return _memtable_list;
}
size_t partition_count() const;
logalloc::occupancy_stats occupancy() const;

View File

@@ -474,9 +474,9 @@ public:
try {
_read_section(_lsa_region, [this] {
_snapshot->merge_partition_versions();
_snapshot = {};
});
} catch (...) { }
_snapshot = {};
});
});
}

View File

@@ -74,8 +74,7 @@ cache_tracker::cache_tracker() {
}
cache_entry& ce = _lru.back();
auto it = row_cache::partitions_type::s_iterator_to(ce);
--it;
clear_continuity(*it);
clear_continuity(*std::next(it));
_lru.pop_back_and_dispose(current_deleter<cache_entry>());
--_partitions;
++_evictions;
@@ -365,6 +364,7 @@ public:
++_it;
_last = ce.key();
_cache.upgrade_entry(ce);
_cache._tracker.touch(ce);
_cache.on_hit();
cache_data cd { { }, ce.continuous() };
if (ce.wide_partition()) {
@@ -546,7 +546,6 @@ private:
if (!_first_element) {
return false;
}
_first_element = false;
return _pr.start() && _pr.start()->is_inclusive() && _pr.start()->value().equal(*_schema, dk);
}
@@ -554,6 +553,7 @@ private:
return _primary_reader().then([this] (just_cache_scanning_reader::cache_data cd) {
auto& smopt = cd.mut;
if (cd.continuous || (smopt && is_inclusive_start_bound(smopt->decorated_key()))) {
_first_element = false;
update_last_key(smopt);
return make_ready_future<streamed_mutation_opt>(std::move(smopt));
} else {
@@ -720,7 +720,9 @@ void row_cache::do_find_or_create_entry(const dht::decorated_key& key,
return;
}
if ((!previous->_key && i == _partitions.begin()) || (previous->_key && std::prev(i)->key().equal(*_schema, *previous->_key))) {
if ((!previous->_key && i == _partitions.begin())
|| (previous->_key && i != _partitions.begin()
&& std::prev(i)->key().equal(*_schema, *previous->_key))) {
i->set_continuous(true);
}
});

Submodule seastar updated: 69acec1788...25137c2846

View File

@@ -54,7 +54,7 @@ public:
const cql3::query_options& options,
lw_shared_ptr<query::read_command> cmd,
std::vector<query::partition_range> ranges)
: _has_clustering_keys(s->clustering_key_size() > 0)
: _has_clustering_keys(has_clustering_keys(*s, *cmd))
, _max(cmd->row_limit)
, _schema(std::move(s))
, _selection(selection)
@@ -65,6 +65,11 @@ public:
{}
private:
static bool has_clustering_keys(const schema& s, const query::read_command& cmd) {
return s.clustering_key_size() > 0
&& !cmd.slice.options.contains<query::partition_slice::option::distinct>();
}
future<> fetch_page(cql3::selection::result_set_builder& builder, uint32_t page_size, db_clock::time_point now) override {
auto state = _options.get_paging_state();

View File

@@ -2643,12 +2643,19 @@ storage_proxy::query_partition_key_range(lw_shared_ptr<query::read_command> cmd,
}
}
// estimate_result_rows_per_range() is currently broken, and this is not needed
// when paging is available in any case
#if 0
// our estimate of how many result rows there will be per-range
float result_rows_per_range = estimate_result_rows_per_range(cmd, ks);
// underestimate how many rows we will get per-range in order to increase the likelihood that we'll
// fetch enough rows in the first round
result_rows_per_range -= result_rows_per_range * CONCURRENT_SUBREQUESTS_MARGIN;
int concurrency_factor = result_rows_per_range == 0.0 ? 1 : std::max(1, std::min(int(ranges.size()), int(std::ceil(cmd->row_limit / result_rows_per_range))));
#else
int result_rows_per_range = 0;
int concurrency_factor = 1;
#endif
std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results;
results.reserve(ranges.size()/concurrency_factor + 1);

140
sstables/atomic_deletion.cc Normal file
View File

@@ -0,0 +1,140 @@
/*
* Copyright (C) 2016 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "atomic_deletion.hh"
#include "to_string.hh"
#include <seastar/core/shared_future.hh>
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/algorithm/copy.hpp>
namespace sstables {
atomic_deletion_manager::atomic_deletion_manager(unsigned shard_count,
std::function<future<> (std::vector<sstring> sstables)> delete_sstables)
: _shard_count(shard_count)
, _delete_sstables(std::move(delete_sstables)) {
}
future<>
atomic_deletion_manager::delete_atomically(std::vector<sstable_to_delete> atomic_deletion_set, unsigned deleting_shard) {
// runs on shard 0 only
_deletion_logger.debug("shard {} atomically deleting {}", deleting_shard, atomic_deletion_set);
if (_atomic_deletions_cancelled) {
_deletion_logger.debug("atomic deletions disabled, erroring out");
using boost::adaptors::transformed;
throw atomic_deletion_cancelled(atomic_deletion_set
| transformed(std::mem_fn(&sstable_to_delete::name)));
}
// Insert atomic_deletion_set into the list of sets pending deletion. If the new set
// overlaps with an existing set, merge them (the merged set will be deleted atomically).
std::unordered_map<sstring, lw_shared_ptr<pending_deletion>> new_atomic_deletion_sets;
auto merged_set = make_lw_shared(pending_deletion());
for (auto&& sst_to_delete : atomic_deletion_set) {
merged_set->names.insert(sst_to_delete.name);
if (!sst_to_delete.shared) {
for (auto shard : boost::irange<shard_id>(0, _shard_count)) {
_shards_agreeing_to_delete_sstable[sst_to_delete.name].insert(shard);
}
}
new_atomic_deletion_sets.emplace(sst_to_delete.name, merged_set);
}
auto pr = make_lw_shared<promise<>>();
merged_set->completions.insert(pr);
auto ret = pr->get_future();
for (auto&& sst_to_delete : atomic_deletion_set) {
auto i = _atomic_deletion_sets.find(sst_to_delete.name);
// merge from old deletion set to new deletion set
// i->second can be nullptr, see below why
if (i != _atomic_deletion_sets.end() && i->second) {
boost::copy(i->second->names, std::inserter(merged_set->names, merged_set->names.end()));
boost::copy(i->second->completions, std::inserter(merged_set->completions, merged_set->completions.end()));
}
}
_deletion_logger.debug("new atomic set: {}", merged_set->names);
// we need to merge new_atomic_deletion_sets into g_atomic_deletion_sets,
// but beware of exceptions. We do that with a first pass that inserts
// nullptr as the value, so the second pass only replaces, and does not allocate
for (auto&& sst_to_delete : atomic_deletion_set) {
_atomic_deletion_sets.emplace(sst_to_delete.name, nullptr);
}
// now, no allocations are involved, so this commits the operation atomically
for (auto&& n : merged_set->names) {
auto i = _atomic_deletion_sets.find(n);
i->second = merged_set;
}
// Mark each sstable as being deleted from deleting_shard. We have to do
// this in a separate pass, so the consideration whether we can delete or not
// sees all the data from this pass.
for (auto&& sst : atomic_deletion_set) {
_shards_agreeing_to_delete_sstable[sst.name].insert(deleting_shard);
}
// Figure out if the (possibly merged) set can be deleted
for (auto&& sst : merged_set->names) {
if (_shards_agreeing_to_delete_sstable[sst].size() != _shard_count) {
// Not everyone agrees, leave the set pending
_deletion_logger.debug("deferring deletion until all shards agree");
return ret;
}
}
// Cannot recover from a failed deletion
for (auto&& name : merged_set->names) {
_atomic_deletion_sets.erase(name);
_shards_agreeing_to_delete_sstable.erase(name);
}
// Everyone agrees, let's delete
auto names = boost::copy_range<std::vector<sstring>>(merged_set->names);
_deletion_logger.debug("deleting {}", names);
return _delete_sstables(names).then_wrapped([this, merged_set] (future<> result) {
_deletion_logger.debug("atomic deletion completed: {}", merged_set->names);
shared_future<> sf(std::move(result));
for (auto&& comp : merged_set->completions) {
sf.get_future().forward_to(std::move(*comp));
}
});
return ret;
}
void
atomic_deletion_manager::cancel_atomic_deletions() {
_atomic_deletions_cancelled = true;
for (auto&& pd : _atomic_deletion_sets) {
if (!pd.second) {
// Could happen if a delete_atomically() failed
continue;
}
for (auto&& c : pd.second->completions) {
c->set_exception(atomic_deletion_cancelled(pd.second->names));
}
// since sets are shared, make sure we don't hit the same one again
pd.second->completions.clear();
}
_atomic_deletion_sets.clear();
_shards_agreeing_to_delete_sstable.clear();
}
}

View File

@@ -0,0 +1,92 @@
/*
* Copyright (C) 2016 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
// The atomic deletion manager solves the problem of orchestrating
// the deletion of files that must be deleted as a group, where each
// shard has different groups, and all shards delete a file for it to
// be deleted. For example,
//
// shard 0: delete "A"
// we can't delete anything because shard 1 hasn't agreed yet.
// shard 1: delete "A" and B"
// shard 1 agrees to delete "A", but we can't delete it yet,
// because shard 1 requires that it be deleted together with "B",
// and shard 0 hasn't agreed to delete "B" yet.
// shard 0: delete "B" and "C"
// shards 0 and 1 now both agree to delete "A" and "B", but shard 0
// doesn't allow us to delete "B" without "C".
// shard 1: delete "C"
// finally, we can delete "A", "B", and "C".
#include "log.hh"
#include <seastar/core/future.hh>
#include <seastar/core/future-util.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/reactor.hh> // for shard_id
#include <unordered_set>
#include <unordered_map>
#include <vector>
namespace sstables {
struct sstable_to_delete {
sstable_to_delete(sstring name, bool shared) : name(std::move(name)), shared(shared) {}
sstring name;
bool shared = false;
friend std::ostream& operator<<(std::ostream& os, const sstable_to_delete& std);
};
class atomic_deletion_cancelled : public std::exception {
std::string _msg;
public:
explicit atomic_deletion_cancelled(std::vector<sstring> names);
template <typename StringRange>
explicit atomic_deletion_cancelled(StringRange range)
: atomic_deletion_cancelled(std::vector<sstring>{range.begin(), range.end()}) {
}
const char* what() const noexcept override;
};
class atomic_deletion_manager {
logging::logger _deletion_logger{"sstable-deletion"};
using shards_agreeing_to_delete_sstable_type = std::unordered_set<shard_id>;
using sstables_to_delete_atomically_type = std::set<sstring>;
struct pending_deletion {
sstables_to_delete_atomically_type names;
std::unordered_set<lw_shared_ptr<promise<>>> completions;
};
bool _atomic_deletions_cancelled = false;
// map from sstable name to a set of sstables that must be deleted atomically, including itself
std::unordered_map<sstring, lw_shared_ptr<pending_deletion>> _atomic_deletion_sets;
std::unordered_map<sstring, shards_agreeing_to_delete_sstable_type> _shards_agreeing_to_delete_sstable;
unsigned _shard_count;
std::function<future<> (std::vector<sstring> sstables)> _delete_sstables;
public:
atomic_deletion_manager(unsigned shard_count,
std::function<future<> (std::vector<sstring> sstables)> delete_sstables);
future<> delete_atomically(std::vector<sstable_to_delete> atomic_deletion_set, unsigned deleting_shard);
void cancel_atomic_deletions();
};
}

View File

@@ -305,7 +305,7 @@ public:
_remain = end - _stream_position;
_prestate = prestate::NONE;
state_processor().reset();
state_processor().reset(begin);
return _input.skip(n);
}

View File

@@ -38,7 +38,7 @@ public:
bool should_continue() {
return indexes.size() < max_quantity;
}
void consume_entry(index_entry&& ie) {
void consume_entry(index_entry&& ie, uint64_t offset) {
indexes.push_back(std::move(ie));
}
void reset() {
@@ -49,13 +49,14 @@ public:
// IndexConsumer is a concept that implements:
//
// bool should_continue();
// void consume_entry(index_entry&& ie);
// void consume_entry(index_entry&& ie, uintt64_t offset);
template <class IndexConsumer>
class index_consume_entry_context: public data_consumer::continuous_data_consumer<index_consume_entry_context<IndexConsumer>> {
using proceed = data_consumer::proceed;
using continuous_data_consumer = data_consumer::continuous_data_consumer<index_consume_entry_context<IndexConsumer>>;
private:
IndexConsumer& _consumer;
uint64_t _entry_offset;
enum class state {
START,
@@ -113,9 +114,12 @@ public:
_state = state::CONSUME_ENTRY;
break;
}
case state::CONSUME_ENTRY:
_consumer.consume_entry(index_entry(std::move(_key), this->_u64, std::move(_promoted)));
case state::CONSUME_ENTRY: {
auto len = (_key.size() + _promoted.size() + 14);
_consumer.consume_entry(index_entry(std::move(_key), this->_u64, std::move(_promoted)), _entry_offset);
_entry_offset += len;
_state = state::START;
}
break;
default:
throw malformed_sstable_exception("unknown state");
@@ -126,11 +130,12 @@ public:
index_consume_entry_context(IndexConsumer& consumer,
input_stream<char>&& input, uint64_t start, uint64_t maxlen)
: continuous_data_consumer(std::move(input), start, maxlen)
, _consumer(consumer)
, _consumer(consumer), _entry_offset(start)
{}
void reset() {
void reset(uint64_t offset) {
_state = state::START;
_entry_offset = offset;
_consumer.reset();
}
};

View File

@@ -374,7 +374,7 @@ public:
}
}
void reset() {
void reset(uint64_t offset) {
_state = state::ROW_START;
_consumer.reset();
}

View File

@@ -741,10 +741,10 @@ future<> sstable::read_toc() {
continue;
}
try {
_components.insert(reverse_map(c, _component_map));
_components.insert(reverse_map(c, _component_map));
} catch (std::out_of_range& oor) {
_components.clear(); // so subsequent read_toc will be forced to fail again
throw malformed_sstable_exception("Unrecognized TOC component: " + c, file_path);
_unrecognized_components.push_back(c);
sstlog.info("Unrecognized TOC component was found: {} in sstable {}", c, file_path);
}
}
if (!_components.size()) {
@@ -1867,8 +1867,8 @@ future<> sstable::generate_summary(const io_priority_class& pc) {
bool should_continue() {
return true;
}
void consume_entry(index_entry&& ie) {
maybe_add_summary_entry(_summary, ie.get_key_bytes(), ie.position());
void consume_entry(index_entry&& ie, uint64_t offset) {
maybe_add_summary_entry(_summary, ie.get_key_bytes(), offset);
if (!first_key) {
first_key = key(to_bytes(ie.get_key_bytes()));
} else {
@@ -1957,6 +1957,28 @@ const sstring sstable::filename(sstring dir, sstring ks, sstring cf, version_typ
return dir + "/" + strmap[version](entry_descriptor(ks, cf, version, generation, format, component));
}
const sstring sstable::filename(sstring dir, sstring ks, sstring cf, version_types version, int64_t generation,
format_types format, sstring component) {
static std::unordered_map<version_types, const char*, enum_hash<version_types>> fmtmap = {
{ sstable::version_types::ka, "{0}-{1}-{2}-{3}-{5}" },
{ sstable::version_types::la, "{2}-{3}-{4}-{5}" }
};
return dir + "/" + seastar::format(fmtmap[version], ks, cf, _version_string.at(version), to_sstring(generation), _format_string.at(format), component);
}
std::vector<std::pair<sstable::component_type, sstring>> sstable::all_components() const {
std::vector<std::pair<component_type, sstring>> all;
all.reserve(_components.size() + _unrecognized_components.size());
for (auto& c : _components) {
all.push_back(std::make_pair(c, _component_map.at(c)));
}
for (auto& c : _unrecognized_components) {
all.push_back(std::make_pair(component_type::Unknown, c));
}
return all;
}
future<> sstable::create_links(sstring dir, int64_t generation) const {
// TemporaryTOC is always first, TOC is always last
auto dst = sstable::filename(dir, _schema->ks_name(), _schema->cf_name(), _version, generation, _format, component_type::TemporaryTOC);
@@ -1964,12 +1986,13 @@ future<> sstable::create_links(sstring dir, int64_t generation) const {
return sstable_write_io_check(sync_directory, dir);
}).then([this, dir, generation] {
// FIXME: Should clean already-created links if we failed midway.
return parallel_for_each(_components, [this, dir, generation] (auto comp) {
if (comp == component_type::TOC) {
return parallel_for_each(all_components(), [this, dir, generation] (auto p) {
if (p.first == component_type::TOC) {
return make_ready_future<>();
}
auto dst = sstable::filename(dir, _schema->ks_name(), _schema->cf_name(), _version, generation, _format, comp);
return sstable_write_io_check(::link_file, this->filename(comp), dst);
auto src = sstable::filename(_dir, _schema->ks_name(), _schema->cf_name(), _version, _generation, _format, p.second);
auto dst = sstable::filename(dir, _schema->ks_name(), _schema->cf_name(), _version, generation, _format, p.second);
return sstable_write_io_check(::link_file, std::move(src), std::move(dst));
});
}).then([dir] {
return sstable_write_io_check(sync_directory, dir);
@@ -1989,11 +2012,11 @@ future<> sstable::set_generation(int64_t new_generation) {
return remove_file(filename(component_type::TOC)).then([this] {
return sstable_write_io_check(sync_directory, _dir);
}).then([this] {
return parallel_for_each(_components, [this] (auto comp) {
if (comp == component_type::TOC) {
return parallel_for_each(all_components(), [this] (auto p) {
if (p.first == component_type::TOC) {
return make_ready_future<>();
}
return remove_file(this->filename(comp));
return remove_file(sstable::filename(_dir, _schema->ks_name(), _schema->cf_name(), _version, _generation, _format, p.second));
});
});
}).then([this, new_generation] {
@@ -2240,8 +2263,11 @@ remove_by_toc_name(sstring sstable_toc_name) {
dir = dirname(sstable_toc_name);
sstable_write_io_check(rename_file, sstable_toc_name, new_toc_name).get();
sstable_write_io_check(fsync_directory, dir).get();
} else {
} else if (sstable_write_io_check(file_exists, new_toc_name).get0()) {
dir = dirname(new_toc_name);
} else {
sstlog.warn("Unable to delete {} because it doesn't exist.", sstable_toc_name);
return;
}
auto toc_file = open_checked_file_dma(sstable_read_error, new_toc_name, open_flags::ro).get0();
@@ -2427,107 +2453,21 @@ operator<<(std::ostream& os, const sstable_to_delete& std) {
return os << std.name << "(" << (std.shared ? "shared" : "unshared") << ")";
}
using shards_agreeing_to_delete_sstable_type = std::unordered_set<shard_id>;
using sstables_to_delete_atomically_type = std::set<sstring>;
struct pending_deletion {
sstables_to_delete_atomically_type names;
std::vector<lw_shared_ptr<promise<>>> completions;
};
static thread_local bool g_atomic_deletions_cancelled = false;
static thread_local std::list<lw_shared_ptr<pending_deletion>> g_atomic_deletion_sets;
static thread_local std::unordered_map<sstring, shards_agreeing_to_delete_sstable_type> g_shards_agreeing_to_delete_sstable;
static logging::logger deletion_logger("sstable-deletion");
static
future<>
do_delete_atomically(std::vector<sstable_to_delete> atomic_deletion_set, unsigned deleting_shard) {
// runs on shard 0 only
deletion_logger.debug("shard {} atomically deleting {}", deleting_shard, atomic_deletion_set);
if (g_atomic_deletions_cancelled) {
deletion_logger.debug("atomic deletions disabled, erroring out");
using boost::adaptors::transformed;
throw atomic_deletion_cancelled(atomic_deletion_set
| transformed(std::mem_fn(&sstable_to_delete::name)));
}
// Insert atomic_deletion_set into the list of sets pending deletion. If the new set
// overlaps with an existing set, merge them (the merged set will be deleted atomically).
std::list<lw_shared_ptr<pending_deletion>> new_atomic_deletion_sets;
auto merged_set = make_lw_shared(pending_deletion());
for (auto&& sst_to_delete : atomic_deletion_set) {
merged_set->names.insert(sst_to_delete.name);
if (!sst_to_delete.shared) {
for (auto shard : boost::irange<shard_id>(0, smp::count)) {
g_shards_agreeing_to_delete_sstable[sst_to_delete.name].insert(shard);
}
}
}
merged_set->completions.push_back(make_lw_shared<promise<>>());
auto ret = merged_set->completions.back()->get_future();
for (auto&& old_set : g_atomic_deletion_sets) {
auto intersection = sstables_to_delete_atomically_type();
boost::set_intersection(merged_set->names, old_set->names, std::inserter(intersection, intersection.end()));
if (intersection.empty()) {
// We copy old_set to avoid corrupting g_atomic_deletion_sets if we fail
// further on.
new_atomic_deletion_sets.push_back(old_set);
} else {
deletion_logger.debug("merging with {}", old_set->names);
merged_set->names.insert(old_set->names.begin(), old_set->names.end());
boost::push_back(merged_set->completions, old_set->completions);
}
}
deletion_logger.debug("new atomic set: {}", merged_set->names);
new_atomic_deletion_sets.push_back(merged_set);
// can now exception-safely commit:
g_atomic_deletion_sets = std::move(new_atomic_deletion_sets);
// Mark each sstable as being deleted from deleting_shard. We have to do
// this in a separate pass, so the consideration whether we can delete or not
// sees all the data from this pass.
for (auto&& sst : atomic_deletion_set) {
g_shards_agreeing_to_delete_sstable[sst.name].insert(deleting_shard);
}
// Figure out if the (possibly merged) set can be deleted
for (auto&& sst : merged_set->names) {
if (g_shards_agreeing_to_delete_sstable[sst].size() != smp::count) {
// Not everyone agrees, leave the set pending
deletion_logger.debug("deferring deletion until all shards agree");
return ret;
}
}
// Cannot recover from a failed deletion
g_atomic_deletion_sets.pop_back();
for (auto&& name : merged_set->names) {
g_shards_agreeing_to_delete_sstable.erase(name);
}
// Everyone agrees, let's delete
delete_sstables(std::vector<sstring> tocs) {
// FIXME: this needs to be done atomically (using a log file of sstables we intend to delete)
parallel_for_each(merged_set->names, [] (sstring name) {
deletion_logger.debug("deleting {}", name);
return parallel_for_each(tocs, [] (sstring name) {
return remove_by_toc_name(name);
}).then_wrapped([merged_set] (future<> result) {
deletion_logger.debug("atomic deletion completed: {}", merged_set->names);
shared_future<> sf(std::move(result));
for (auto&& comp : merged_set->completions) {
sf.get_future().forward_to(std::move(*comp));
}
});
return ret;
}
static thread_local atomic_deletion_manager g_atomic_deletion_manager(smp::count, delete_sstables);
future<>
delete_atomically(std::vector<sstable_to_delete> ssts) {
auto shard = engine().cpu_id();
return smp::submit_to(0, [=] {
return do_delete_atomically(ssts, shard);
return g_atomic_deletion_manager.delete_atomically(ssts, shard);
});
}
@@ -2540,16 +2480,8 @@ delete_atomically(std::vector<shared_sstable> ssts) {
return delete_atomically(std::move(sstables_to_delete_atomically));
}
void
cancel_atomic_deletions() {
g_atomic_deletions_cancelled = true;
for (auto&& pd : g_atomic_deletion_sets) {
for (auto&& c : pd->completions) {
c->set_exception(atomic_deletion_cancelled(pd->names));
}
}
g_atomic_deletion_sets.clear();
g_shards_agreeing_to_delete_sstable.clear();
void cancel_atomic_deletions() {
g_atomic_deletion_manager.cancel_atomic_deletions();
}
atomic_deletion_cancelled::atomic_deletion_cancelled(std::vector<sstring> names)

View File

@@ -48,6 +48,7 @@
#include "mutation_reader.hh"
#include "query-request.hh"
#include "compound_compat.hh"
#include "atomic_deletion.hh"
namespace sstables {
@@ -130,6 +131,7 @@ public:
Statistics,
TemporaryTOC,
TemporaryStatistics,
Unknown,
};
enum class version_types { ka, la };
enum class format_types { big };
@@ -221,6 +223,8 @@ public:
static format_types format_from_sstring(sstring& s);
static const sstring filename(sstring dir, sstring ks, sstring cf, version_types version, int64_t generation,
format_types format, component_type component);
static const sstring filename(sstring dir, sstring ks, sstring cf, version_types version, int64_t generation,
format_types format, sstring component);
// WARNING: it should only be called to remove components of a sstable with
// a temporary TOC file.
static future<> remove_sstable_with_temp_toc(sstring ks, sstring cf, sstring dir, int64_t generation,
@@ -358,6 +362,8 @@ public:
return _collector;
}
std::vector<std::pair<component_type, sstring>> all_components() const;
future<> create_links(sstring dir, int64_t generation) const;
future<> create_links(sstring dir) const {
@@ -394,6 +400,7 @@ private:
static std::unordered_map<component_type, sstring, enum_hash<component_type>> _component_map;
std::unordered_set<component_type, enum_hash<component_type>> _components;
std::vector<sstring> _unrecognized_components;
bool _shared = true; // across shards; safe default
compression _compression;
@@ -688,14 +695,6 @@ future<> await_background_jobs();
// Invokes await_background_jobs() on all shards
future<> await_background_jobs_on_all_shards();
struct sstable_to_delete {
sstable_to_delete(sstring name, bool shared) : name(std::move(name)), shared(shared) {}
sstring name;
bool shared = false;
friend std::ostream& operator<<(std::ostream& os, const sstable_to_delete& std);
};
// When we compact sstables, we have to atomically instantiate the new
// sstable and delete the old ones. Otherwise, if we compact A+B into C,
// and if A contained some data that was tombstoned by B, and if B was
@@ -714,17 +713,6 @@ struct sstable_to_delete {
future<> delete_atomically(std::vector<shared_sstable> ssts);
future<> delete_atomically(std::vector<sstable_to_delete> ssts);
class atomic_deletion_cancelled : public std::exception {
std::string _msg;
public:
explicit atomic_deletion_cancelled(std::vector<sstring> names);
template <typename StringRange>
explicit atomic_deletion_cancelled(StringRange range)
: atomic_deletion_cancelled(std::vector<sstring>{range.begin(), range.end()}) {
}
const char* what() const noexcept override;
};
// Cancel any deletions scheduled by delete_atomically() and make their
// futures complete (with an atomic_deletion_cancelled exception).
void cancel_atomic_deletions();

View File

@@ -39,6 +39,7 @@ boost_tests = [
'storage_proxy_test',
'schema_change_test',
'sstable_mutation_test',
'sstable_atomic_deletion_test',
'commitlog_test',
'hash_test',
'test-serialization',

View File

@@ -0,0 +1,170 @@
/*
* Copyright (C) 2015 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sstables/atomic_deletion.hh"
#include <seastar/tests/test-utils.hh>
#include <deque>
#include <boost/range/numeric.hpp>
#include <boost/range/adaptor/transformed.hpp>
using namespace sstables;
class atomic_deletion_test_env {
public:
using event = std::function<future<> (atomic_deletion_test_env& adm)>;
private:
struct a_hash {
size_t operator()(const std::unordered_set<sstring>& s) const {
auto h = std::hash<sstring>();
return boost::accumulate(s | boost::adaptors::transformed(h), size_t(0)); // sue me
}
};
atomic_deletion_manager _adm;
std::deque<event> _events;
std::unordered_set<std::unordered_set<sstring>, a_hash> _deletes;
semaphore _deletion_counter { 0 };
private:
future<> delete_sstables(std::vector<sstring> names) {
auto&& s1 = boost::copy_range<std::unordered_set<sstring>>(names);
_deletes.insert(s1);
_deletion_counter.signal();
return make_ready_future<>();
}
public:
explicit atomic_deletion_test_env(unsigned shard_count, std::vector<event> events)
: _adm(shard_count, [this] (std::vector<sstring> names) {
return delete_sstables(names);
})
, _events(events.begin(), events.end()) {
}
void expect_no_deletion() {
BOOST_REQUIRE(_deletes.empty());
}
future<> schedule_delete(std::vector<sstable_to_delete> names, unsigned shard) {
_adm.delete_atomically(names, shard).discard_result();
return make_ready_future<>();
}
future<> expect_deletion(std::vector<sstring> names) {
return _deletion_counter.wait().then([this, names] {
auto&& s1 = boost::copy_range<std::unordered_set<sstring>>(names);
auto erased = _deletes.erase(s1);
BOOST_REQUIRE_EQUAL(erased, 1);
});
}
future<> test() {
// run all _events sequentially
return repeat([this] {
if (_events.empty()) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
auto ev = std::move(_events.front());
_events.pop_front();
return ev(*this).then([] {
return stop_iteration::no;
});
});
}
};
future<> test_atomic_deletion_manager(unsigned shards, std::vector<atomic_deletion_test_env::event> events) {
auto env = make_lw_shared<atomic_deletion_test_env>(shards, events);
return env->test().finally([env] {});
}
atomic_deletion_test_env::event
delete_many(std::vector<sstable_to_delete> v, unsigned shard) {
return [v, shard] (atomic_deletion_test_env& env) {
// verify we didn't have an early delete from previous deletion
env.expect_no_deletion();
return env.schedule_delete(v, shard);
};
}
atomic_deletion_test_env::event
delete_one(sstable_to_delete s, unsigned shard) {
return delete_many({s}, shard);
}
atomic_deletion_test_env::event
expect_many(std::vector<sstring> names) {
return [names] (atomic_deletion_test_env& env) {
return env.expect_deletion(names);
};
}
atomic_deletion_test_env::event
expect_one(sstring name) {
return expect_many({name});
}
SEASTAR_TEST_CASE(test_single_shard_single_sstable) {
return test_atomic_deletion_manager(1, {
delete_one({"1", false}, 0),
expect_one("1"),
delete_one({"2", true}, 0),
expect_one("2"),
});
}
SEASTAR_TEST_CASE(test_multi_shard_single_sstable) {
return test_atomic_deletion_manager(3, {
delete_one({"1", true}, 0),
delete_one({"1", true}, 1),
delete_one({"1", true}, 2),
expect_one("1"),
delete_one({"2", false}, 1),
expect_one("2"),
});
}
SEASTAR_TEST_CASE(test_nonshared_compaction) {
return test_atomic_deletion_manager(5, {
delete_many({{"1", false}, {"2", false}, {"3", false}}, 2),
expect_many({"1", "2", "3"}),
});
}
SEASTAR_TEST_CASE(test_shared_compaction) {
return test_atomic_deletion_manager(3, {
delete_one({"1", true}, 0),
delete_many({{"1", true}, {"2", false}, {"3", false}}, 2),
delete_one({"1", true}, 1),
expect_many({"1", "2", "3"}),
});
}
SEASTAR_TEST_CASE(test_overlapping_compaction) {
return test_atomic_deletion_manager(3, {
delete_one({"1", true}, 0),
delete_one({"3", true}, 0),
delete_many({{"1", true}, {"2", false}, {"3", true}}, 2),
delete_one({"1", true}, 1),
delete_many({{"3", true}, {"4", false}}, 1),
expect_many({"1", "2", "3", "4"}),
});
}
#include "disk-error-handler.hh"
thread_local disk_error_signal_type commit_error;
thread_local disk_error_signal_type general_disk_error;

View File

@@ -3031,3 +3031,22 @@ SEASTAR_TEST_CASE(test_partition_skipping) {
.produces_end_of_stream();
});
}
SEASTAR_TEST_CASE(test_unknown_component) {
return seastar::async([] {
auto tmp = make_lw_shared<tmpdir>();
auto sstp = reusable_sst(uncompressed_schema(), "tests/sstables/unknown_component", 1).get0();
sstp->create_links(tmp->path).get();
// check that create_links() moved unknown component to new dir
BOOST_REQUIRE(file_exists(tmp->path + "/la-1-big-UNKNOWN.txt").get0());
sstp = reusable_sst(uncompressed_schema(), tmp->path, 1).get0();
sstp->set_generation(2).get();
BOOST_REQUIRE(!file_exists(tmp->path + "/la-1-big-UNKNOWN.txt").get0());
BOOST_REQUIRE(file_exists(tmp->path + "/la-2-big-UNKNOWN.txt").get0());
sstables::delete_atomically({sstp}).get();
// assure unknown component is deleted
BOOST_REQUIRE(!file_exists(tmp->path + "/la-2-big-UNKNOWN.txt").get0());
});
}

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1 @@
748507322

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,9 @@
Data.db
Filter.db
CRC.db
Statistics.db
Summary.db
Digest.sha1
Index.db
TOC.txt
UNKNOWN.txt

View File

@@ -39,8 +39,8 @@ class moving_average {
public:
moving_average(latency_counter::duration interval, latency_counter::duration tick_interval) :
_tick_interval(tick_interval) {
_alpha = 1 - std::exp(-std::chrono::duration_cast<std::chrono::nanoseconds>(interval).count()/
static_cast<double>(std::chrono::duration_cast<std::chrono::nanoseconds>(tick_interval).count()));
_alpha = 1 - std::exp(-std::chrono::duration_cast<std::chrono::seconds>(tick_interval).count()/
static_cast<double>(std::chrono::duration_cast<std::chrono::seconds>(interval).count()));
}
void add(uint64_t val = 1) {
@@ -48,7 +48,7 @@ public:
}
void update() {
double instant_rate = _count / static_cast<double>(std::chrono::duration_cast<std::chrono::nanoseconds>(_tick_interval).count());
double instant_rate = _count / static_cast<double>(std::chrono::duration_cast<std::chrono::seconds>(_tick_interval).count());
if (_initialized) {
_rate += (_alpha * (instant_rate - _rate));
} else {
@@ -70,7 +70,8 @@ public:
}
};
class ihistogram {
template <typename Unit>
class basic_ihistogram {
public:
// count holds all the events
int64_t count;
@@ -84,12 +85,13 @@ public:
double variance;
int64_t sample_mask;
boost::circular_buffer<int64_t> sample;
ihistogram(size_t size = 1024, int64_t _sample_mask = 0x80)
basic_ihistogram(size_t size = 1024, int64_t _sample_mask = 0x80)
: count(0), total(0), min(0), max(0), sum(0), started(0), mean(0), variance(0),
sample_mask(_sample_mask), sample(
size) {
}
void mark(int64_t value) {
void mark(int64_t ns_value) {
auto value = std::chrono::duration_cast<Unit>(std::chrono::nanoseconds(ns_value)).count();
if (total == 0 || value < min) {
min = value;
}
@@ -131,7 +133,7 @@ public:
/**
* Set the latency according to the sample rate.
*/
ihistogram& set_latency(latency_counter& lc) {
basic_ihistogram& set_latency(latency_counter& lc) {
if (should_sample()) {
lc.start();
}
@@ -144,7 +146,7 @@ public:
* Increment the total number of events without
* sampling the value.
*/
ihistogram& inc() {
basic_ihistogram& inc() {
count++;
return *this;
}
@@ -157,7 +159,7 @@ public:
return a * a;
}
ihistogram& operator +=(const ihistogram& o) {
basic_ihistogram& operator +=(const basic_ihistogram& o) {
if (count == 0) {
*this = o;
} else if (o.count > 0) {
@@ -190,14 +192,18 @@ public:
return mean * count;
}
friend ihistogram operator +(ihistogram a, const ihistogram& b);
template <typename U>
friend basic_ihistogram<U> operator +(basic_ihistogram<U> a, const basic_ihistogram<U>& b);
};
inline ihistogram operator +(ihistogram a, const ihistogram& b) {
template <typename Unit>
inline basic_ihistogram<Unit> operator +(basic_ihistogram<Unit> a, const basic_ihistogram<Unit>& b) {
a += b;
return a;
}
using ihistogram = basic_ihistogram<std::chrono::microseconds>;
struct rate_moving_average {
uint64_t count = 0;
double rates[3] = {0};
@@ -222,7 +228,7 @@ class timed_rate_moving_average {
static constexpr latency_counter::duration tick_interval() {
return std::chrono::seconds(10);
}
moving_average rates[3] = {{tick_interval(), std::chrono::minutes(1)}, {tick_interval(), std::chrono::minutes(5)}, {tick_interval(), std::chrono::minutes(15)}};
moving_average rates[3] = {{std::chrono::minutes(1), tick_interval()}, {std::chrono::minutes(5), tick_interval()}, {std::chrono::minutes(15), tick_interval()}};
latency_counter::time_point start_time;
timer<> _timer;
@@ -246,7 +252,7 @@ public:
rate_moving_average rate() const {
rate_moving_average res;
if (_count > 0) {
double elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(latency_counter::now() - start_time).count();
double elapsed = std::chrono::duration_cast<std::chrono::seconds>(latency_counter::now() - start_time).count();
res.mean_rate = (_count / elapsed);
}
res.count = _count;

View File

@@ -61,7 +61,9 @@ using eviction_fn = std::function<memory::reclaiming_result()>;
class region_group_reclaimer {
protected:
size_t _threshold;
size_t _soft_limit;
bool _under_pressure = false;
bool _under_soft_pressure = false;
virtual void start_reclaiming() {}
virtual void stop_reclaiming() {}
public:
@@ -69,6 +71,24 @@ public:
return _under_pressure;
}
bool over_soft_limit() const {
return _under_soft_pressure;
}
void notify_soft_pressure() {
if (!_under_soft_pressure) {
_under_soft_pressure = true;
start_reclaiming();
}
}
void notify_soft_relief() {
if (_under_soft_pressure) {
_under_soft_pressure = false;
stop_reclaiming();
}
}
void notify_pressure() {
if (!_under_pressure) {
_under_pressure = true;
@@ -83,12 +103,21 @@ public:
}
}
region_group_reclaimer(size_t threshold = std::numeric_limits<size_t>::max()) : _threshold(threshold) {}
region_group_reclaimer()
: _threshold(std::numeric_limits<size_t>::max()), _soft_limit(std::numeric_limits<size_t>::max()) {}
region_group_reclaimer(size_t threshold)
: _threshold(threshold), _soft_limit(threshold) {}
region_group_reclaimer(size_t threshold, size_t soft)
: _threshold(threshold), _soft_limit(soft) {}
virtual ~region_group_reclaimer() {}
size_t throttle_threshold() const {
return _threshold;
}
size_t soft_limit_threshold() const {
return _soft_limit;
}
};
// Groups regions for the purpose of statistics. Can be nested.
@@ -232,6 +261,11 @@ public:
if (rg->execution_permitted()) {
rg->release_requests();
}
if (rg->_total_memory >= rg->_reclaimer.soft_limit_threshold()) {
rg->_reclaimer.notify_soft_pressure();
} else if (rg->_total_memory < rg->_reclaimer.soft_limit_threshold()) {
rg->_reclaimer.notify_soft_relief();
}
return stop_iteration::no;
});
}