Compare commits
170 Commits
copilot/fi
...
branch-2.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b95be056b4 | ||
|
|
d91d9fc198 | ||
|
|
aed8102269 | ||
|
|
b249d459f5 | ||
|
|
4fdb5de611 | ||
|
|
951a50f1bc | ||
|
|
b4d2f306f7 | ||
|
|
fd7c4aaed7 | ||
|
|
97947dbb1b | ||
|
|
1c65fad3c2 | ||
|
|
f39891a999 | ||
|
|
e2f2f712e8 | ||
|
|
31be029b7d | ||
|
|
0881bdc7a0 | ||
|
|
c9dc9d4e99 | ||
|
|
859a0c2c90 | ||
|
|
799dbb4f2e | ||
|
|
a2fe669dd3 | ||
|
|
56de761daf | ||
|
|
c3187093a3 | ||
|
|
111c2ecf5d | ||
|
|
a6ecdbbba6 | ||
|
|
36e44395d5 | ||
|
|
b9d9815adc | ||
|
|
e7e4541c72 | ||
|
|
e22e7a1079 | ||
|
|
a690e6cf8a | ||
|
|
d0ad290567 | ||
|
|
982755eb86 | ||
|
|
e5505aa01e | ||
|
|
7c69b6e5d0 | ||
|
|
fecef90f28 | ||
|
|
6a55ecf3a2 | ||
|
|
5c4129c027 | ||
|
|
45e4094d60 | ||
|
|
d38ef5059c | ||
|
|
8d79b83e04 | ||
|
|
64aa23b6e2 | ||
|
|
58ab46c60c | ||
|
|
3c0cf145af | ||
|
|
905d9c620a | ||
|
|
d145a6cb97 | ||
|
|
17cc62d0b3 | ||
|
|
6c6d6711f2 | ||
|
|
8f2ef52b47 | ||
|
|
d3b901d545 | ||
|
|
2eb6a5718a | ||
|
|
3c6745ae2c | ||
|
|
d1de3dd109 | ||
|
|
11d36b25e9 | ||
|
|
b011079ce3 | ||
|
|
efc7e966e1 | ||
|
|
82a89d452d | ||
|
|
98f0a4f8e5 | ||
|
|
2f6632a76d | ||
|
|
64354f2114 | ||
|
|
dc116dc0f9 | ||
|
|
32baa88e38 | ||
|
|
c5cd6bf57f | ||
|
|
b162ba1daf | ||
|
|
f207c1e652 | ||
|
|
7b9f856883 | ||
|
|
42bd490f8a | ||
|
|
430946e000 | ||
|
|
434fb2149c | ||
|
|
677fcf2532 | ||
|
|
512b2177fe | ||
|
|
4c31a14093 | ||
|
|
e0cd39bbf7 | ||
|
|
6e9789067d | ||
|
|
d231f8570e | ||
|
|
88702df344 | ||
|
|
a4904d51b4 | ||
|
|
c2e7b3395f | ||
|
|
d0f5945840 | ||
|
|
f2f6b172b6 | ||
|
|
64dae389e8 | ||
|
|
fa3a0e4741 | ||
|
|
c0ba555b5d | ||
|
|
ac87023c26 | ||
|
|
e94394bbdd | ||
|
|
8cc532bb16 | ||
|
|
d1b5442cc5 | ||
|
|
789ad72fc8 | ||
|
|
9a86c0ee1f | ||
|
|
49bbb6bda1 | ||
|
|
9eeeb82bd3 | ||
|
|
313754eb53 | ||
|
|
450d1a8327 | ||
|
|
0eaefb5ec3 | ||
|
|
c0c837d95d | ||
|
|
9d5ec17087 | ||
|
|
f18558e01a | ||
|
|
a685cf325f | ||
|
|
fe14353b9b | ||
|
|
9c0aaa2132 | ||
|
|
85bb77d0d2 | ||
|
|
5ab14c5e1f | ||
|
|
f4e1f60468 | ||
|
|
46355f8f89 | ||
|
|
3dc017a043 | ||
|
|
e2a641062d | ||
|
|
350048a35f | ||
|
|
5631cc9ead | ||
|
|
cca9047c86 | ||
|
|
97ff7b640c | ||
|
|
dc171ba2ab | ||
|
|
2b90e09468 | ||
|
|
961e263693 | ||
|
|
56fb464735 | ||
|
|
86ad5f87ff | ||
|
|
566e47d757 | ||
|
|
8b4ee2b2be | ||
|
|
a19c3c8908 | ||
|
|
bcae6cdb8e | ||
|
|
eb646c61ed | ||
|
|
b146d2dda1 | ||
|
|
e4deae4eab | ||
|
|
4a360aab87 | ||
|
|
b65aec4824 | ||
|
|
fd39a84b7b | ||
|
|
cc6dcbe250 | ||
|
|
136637401d | ||
|
|
782d817e84 | ||
|
|
bec3241fe8 | ||
|
|
68a2644b50 | ||
|
|
a02dcf15cf | ||
|
|
3ed5e63e8a | ||
|
|
d17ce46983 | ||
|
|
7ca5e7e993 | ||
|
|
07b0ce27fa | ||
|
|
27be3cd242 | ||
|
|
abf50aafef | ||
|
|
dfe5b38a43 | ||
|
|
9bdc8c25f5 | ||
|
|
e75c55b2db | ||
|
|
756feae052 | ||
|
|
202b4e6797 | ||
|
|
76ac200eff | ||
|
|
9aa172fe8e | ||
|
|
c4af043ef7 | ||
|
|
06b25320be | ||
|
|
ff70d9f15c | ||
|
|
9bbd5821a2 | ||
|
|
a7841f1f2e | ||
|
|
84859e0745 | ||
|
|
6b74e1f02d | ||
|
|
520f17b315 | ||
|
|
9fe3d04f31 | ||
|
|
a74183eb1e | ||
|
|
e059f17bf2 | ||
|
|
0e8e005357 | ||
|
|
8bf6f39392 | ||
|
|
04ba51986e | ||
|
|
1d5379c462 | ||
|
|
cb5dc56bfd | ||
|
|
b578b492cd | ||
|
|
30c950a7f6 | ||
|
|
f0d1e9c518 | ||
|
|
597aeca93d | ||
|
|
1a94b90a4d | ||
|
|
acdd42c7c8 | ||
|
|
bd4f658555 | ||
|
|
a983ba7aad | ||
|
|
0a561fc326 | ||
|
|
1f10549056 | ||
|
|
c2a2560ea3 | ||
|
|
237e36a0b4 | ||
|
|
e78c137bfc | ||
|
|
fb99a7c902 |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#!/bin/sh
|
||||
|
||||
VERSION=666.development
|
||||
VERSION=2.2.rc2
|
||||
|
||||
if test -f version
|
||||
then
|
||||
@@ -14,7 +14,7 @@ else
|
||||
# where counter starts at 1 and increments for successive versions.
|
||||
# This ensures that the package manager will select your custom
|
||||
# package over the standard release.
|
||||
SCYLLA_BUILD=0
|
||||
SCYLLA_BUILD=1.mv
|
||||
SCYLLA_RELEASE=$SCYLLA_BUILD.$DATE.$GIT_COMMIT
|
||||
fi
|
||||
|
||||
|
||||
@@ -2129,6 +2129,41 @@
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/storage_service/view_build_statuses/{keyspace}/{view}",
|
||||
"operations":[
|
||||
{
|
||||
"method":"GET",
|
||||
"summary":"Gets the progress of a materialized view build",
|
||||
"type":"array",
|
||||
"items":{
|
||||
"type":"mapper"
|
||||
},
|
||||
"nickname":"view_build_statuses",
|
||||
"produces":[
|
||||
"application/json"
|
||||
],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"keyspace",
|
||||
"description":"The keyspace",
|
||||
"required":true,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"path"
|
||||
},
|
||||
{
|
||||
"name":"view",
|
||||
"description":"View name",
|
||||
"required":true,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"path"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"models":{
|
||||
|
||||
@@ -852,6 +852,15 @@ void set_storage_service(http_context& ctx, routes& r) {
|
||||
return make_ready_future<json::json_return_type>(map_to_key_value(ownership, res));
|
||||
});
|
||||
});
|
||||
|
||||
ss::view_build_statuses.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
auto keyspace = validate_keyspace(ctx, req->param);
|
||||
auto view = req->param["view"];
|
||||
return service::get_local_storage_service().view_build_statuses(std::move(keyspace), std::move(view)).then([] (std::unordered_map<sstring, sstring> status) {
|
||||
std::vector<storage_service_json::mapper> res;
|
||||
return make_ready_future<json::json_return_type>(map_to_key_value(std::move(status), res));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -72,18 +72,22 @@ public:
|
||||
return make_ready_future<authenticated_user>(anonymous_user());
|
||||
}
|
||||
|
||||
virtual future<> create(stdx::string_view, const authentication_options& options) override {
|
||||
virtual future<> create(stdx::string_view, const authentication_options& options) const override {
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
virtual future<> alter(stdx::string_view, const authentication_options& options) override {
|
||||
virtual future<> alter(stdx::string_view, const authentication_options& options) const override {
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
virtual future<> drop(stdx::string_view) override {
|
||||
virtual future<> drop(stdx::string_view) const override {
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
virtual future<custom_options> query_custom_options(stdx::string_view role_name) const override {
|
||||
return make_ready_future<custom_options>();
|
||||
}
|
||||
|
||||
virtual const resource_set& protected_resources() const override {
|
||||
static const resource_set resources;
|
||||
return resources;
|
||||
|
||||
@@ -58,24 +58,30 @@ public:
|
||||
return make_ready_future<permission_set>(permissions::ALL);
|
||||
}
|
||||
|
||||
virtual future<> grant(stdx::string_view, permission_set, const resource&) override {
|
||||
throw exceptions::invalid_request_exception("GRANT operation is not supported by AllowAllAuthorizer");
|
||||
virtual future<> grant(stdx::string_view, permission_set, const resource&) const override {
|
||||
return make_exception_future<>(
|
||||
unsupported_authorization_operation("GRANT operation is not supported by AllowAllAuthorizer"));
|
||||
}
|
||||
|
||||
virtual future<> revoke(stdx::string_view, permission_set, const resource&) override {
|
||||
throw exceptions::invalid_request_exception("REVOKE operation is not supported by AllowAllAuthorizer");
|
||||
virtual future<> revoke(stdx::string_view, permission_set, const resource&) const override {
|
||||
return make_exception_future<>(
|
||||
unsupported_authorization_operation("REVOKE operation is not supported by AllowAllAuthorizer"));
|
||||
}
|
||||
|
||||
virtual future<std::vector<permission_details>> list_all() const override {
|
||||
throw exceptions::invalid_request_exception("LIST PERMISSIONS operation is not supported by AllowAllAuthorizer");
|
||||
return make_exception_future<std::vector<permission_details>>(
|
||||
unsupported_authorization_operation(
|
||||
"LIST PERMISSIONS operation is not supported by AllowAllAuthorizer"));
|
||||
}
|
||||
|
||||
virtual future<> revoke_all(stdx::string_view) override {
|
||||
return make_ready_future();
|
||||
virtual future<> revoke_all(stdx::string_view) const override {
|
||||
return make_exception_future(
|
||||
unsupported_authorization_operation("REVOKE operation is not supported by AllowAllAuthorizer"));
|
||||
}
|
||||
|
||||
virtual future<> revoke_all(const resource&) override {
|
||||
return make_ready_future();
|
||||
virtual future<> revoke_all(const resource&) const override {
|
||||
return make_exception_future(
|
||||
unsupported_authorization_operation("REVOKE operation is not supported by AllowAllAuthorizer"));
|
||||
}
|
||||
|
||||
virtual const resource_set& protected_resources() const override {
|
||||
|
||||
@@ -43,9 +43,11 @@ std::ostream& operator<<(std::ostream&, authentication_option);
|
||||
|
||||
using authentication_option_set = std::unordered_set<authentication_option>;
|
||||
|
||||
using custom_options = std::unordered_map<sstring, sstring>;
|
||||
|
||||
struct authentication_options final {
|
||||
std::optional<sstring> password;
|
||||
std::optional<std::unordered_map<sstring, sstring>> options;
|
||||
std::optional<custom_options> options;
|
||||
};
|
||||
|
||||
inline bool any_authentication_options(const authentication_options& aos) noexcept {
|
||||
|
||||
@@ -69,7 +69,9 @@ namespace auth {
|
||||
class authenticated_user;
|
||||
|
||||
///
|
||||
/// Abstract interface for authenticating users.
|
||||
/// Abstract client for authenticating role identity.
|
||||
///
|
||||
/// All state necessary to authorize a role is stored externally to the client instance.
|
||||
///
|
||||
class authenticator {
|
||||
public:
|
||||
@@ -120,7 +122,7 @@ public:
|
||||
///
|
||||
/// The options provided must be a subset of `supported_options()`.
|
||||
///
|
||||
virtual future<> create(stdx::string_view role_name, const authentication_options& options) = 0;
|
||||
virtual future<> create(stdx::string_view role_name, const authentication_options& options) const = 0;
|
||||
|
||||
///
|
||||
/// Alter the authentication record of an existing user.
|
||||
@@ -129,12 +131,19 @@ public:
|
||||
///
|
||||
/// Callers must ensure that the specification of `alterable_options()` is adhered to.
|
||||
///
|
||||
virtual future<> alter(stdx::string_view role_name, const authentication_options& options) = 0;
|
||||
virtual future<> alter(stdx::string_view role_name, const authentication_options& options) const = 0;
|
||||
|
||||
///
|
||||
/// Delete the authentication record for a user. This will disallow the user from logging in.
|
||||
///
|
||||
virtual future<> drop(stdx::string_view role_name) = 0;
|
||||
virtual future<> drop(stdx::string_view role_name) const = 0;
|
||||
|
||||
///
|
||||
/// Query for custom options (those corresponding to \ref authentication_options::options).
|
||||
///
|
||||
/// If no options are set the result is an empty container.
|
||||
///
|
||||
virtual future<custom_options> query_custom_options(stdx::string_view role_name) const = 0;
|
||||
|
||||
///
|
||||
/// System resources used internally as part of the implementation. These are made inaccessible to users.
|
||||
|
||||
@@ -44,6 +44,7 @@
|
||||
#include <experimental/string_view>
|
||||
#include <functional>
|
||||
#include <optional>
|
||||
#include <stdexcept>
|
||||
#include <tuple>
|
||||
#include <vector>
|
||||
|
||||
@@ -79,8 +80,15 @@ inline bool operator<(const permission_details& pd1, const permission_details& p
|
||||
< std::forward_as_tuple(pd2.role_name, pd2.resource, pd2.permissions);
|
||||
}
|
||||
|
||||
class unsupported_authorization_operation : public std::invalid_argument {
|
||||
public:
|
||||
using std::invalid_argument::invalid_argument;
|
||||
};
|
||||
|
||||
///
|
||||
/// Abstract interface for authorizing users to access resources.
|
||||
/// Abstract client for authorizing roles to access resources.
|
||||
///
|
||||
/// All state necessary to authorize a role is stored externally to the client instance.
|
||||
///
|
||||
class authorizer {
|
||||
public:
|
||||
@@ -107,27 +115,37 @@ public:
|
||||
///
|
||||
/// Grant a set of permissions to a role for a particular \ref resource.
|
||||
///
|
||||
virtual future<> grant(stdx::string_view role_name, permission_set, const resource&) = 0;
|
||||
/// \throws \ref unsupported_authorization_operation if granting permissions is not supported.
|
||||
///
|
||||
virtual future<> grant(stdx::string_view role_name, permission_set, const resource&) const = 0;
|
||||
|
||||
///
|
||||
/// Revoke a set of permissions from a role for a particular \ref resource.
|
||||
///
|
||||
virtual future<> revoke(stdx::string_view role_name, permission_set, const resource&) = 0;
|
||||
/// \throws \ref unsupported_authorization_operation if revoking permissions is not supported.
|
||||
///
|
||||
virtual future<> revoke(stdx::string_view role_name, permission_set, const resource&) const = 0;
|
||||
|
||||
///
|
||||
/// Query for all directly granted permissions.
|
||||
///
|
||||
/// \throws \ref unsupported_authorization_operation if listing permissions is not supported.
|
||||
///
|
||||
virtual future<std::vector<permission_details>> list_all() const = 0;
|
||||
|
||||
///
|
||||
/// Revoke all permissions granted directly to a particular role.
|
||||
///
|
||||
virtual future<> revoke_all(stdx::string_view role_name) = 0;
|
||||
/// \throws \ref unsupported_authorization_operation if revoking permissions is not supported.
|
||||
///
|
||||
virtual future<> revoke_all(stdx::string_view role_name) const = 0;
|
||||
|
||||
///
|
||||
/// Revoke all permissions granted to any role for a particular resource.
|
||||
///
|
||||
virtual future<> revoke_all(const resource&) = 0;
|
||||
/// \throws \ref unsupported_authorization_operation if revoking permissions is not supported.
|
||||
///
|
||||
virtual future<> revoke_all(const resource&) const = 0;
|
||||
|
||||
///
|
||||
/// System resources used internally as part of the implementation. These are made inaccessible to users.
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "cql3/statements/create_table_statement.hh"
|
||||
#include "database.hh"
|
||||
#include "schema_builder.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
|
||||
@@ -48,7 +49,7 @@ future<> do_after_system_ready(seastar::abort_source& as, seastar::noncopyable_f
|
||||
return exponential_backoff_retry::do_until_value(1s, 1min, as, [func = std::move(func)] {
|
||||
return func().then_wrapped([] (auto&& f) -> stdx::optional<empty_state> {
|
||||
if (f.failed()) {
|
||||
auth_log.warn("Auth task failed with error, rescheduling: {}", f.get_exception());
|
||||
auth_log.info("Auth task failed with error, rescheduling: {}", f.get_exception());
|
||||
return { };
|
||||
}
|
||||
return { empty_state() };
|
||||
@@ -58,13 +59,13 @@ future<> do_after_system_ready(seastar::abort_source& as, seastar::noncopyable_f
|
||||
}
|
||||
|
||||
future<> create_metadata_table_if_missing(
|
||||
const sstring& table_name,
|
||||
stdx::string_view table_name,
|
||||
cql3::query_processor& qp,
|
||||
const sstring& cql,
|
||||
stdx::string_view cql,
|
||||
::service::migration_manager& mm) {
|
||||
auto& db = qp.db().local();
|
||||
|
||||
if (db.has_schema(meta::AUTH_KS, table_name)) {
|
||||
if (db.has_schema(meta::AUTH_KS, sstring(table_name))) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -85,4 +86,12 @@ future<> create_metadata_table_if_missing(
|
||||
return mm.announce_new_column_family(b.build(), false);
|
||||
}
|
||||
|
||||
future<> wait_for_schema_agreement(::service::migration_manager& mm, const database& db) {
|
||||
static const auto pause = [] { return sleep(std::chrono::milliseconds(500)); };
|
||||
|
||||
return do_until([&db] { return db.get_version() != database::empty_version; }, pause).then([&mm] {
|
||||
return do_until([&mm] { return mm.have_schema_agreement(); }, pause);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <chrono>
|
||||
#include <experimental/string_view>
|
||||
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/abort_source.hh>
|
||||
@@ -36,6 +37,8 @@
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
class database;
|
||||
|
||||
namespace service {
|
||||
class migration_manager;
|
||||
}
|
||||
@@ -65,16 +68,18 @@ future<> once_among_shards(Task&& f) {
|
||||
}
|
||||
|
||||
inline future<> delay_until_system_ready(seastar::abort_source& as) {
|
||||
return sleep_abortable(10s, as);
|
||||
return sleep_abortable(15s, as);
|
||||
}
|
||||
|
||||
// Func must support being invoked more than once.
|
||||
future<> do_after_system_ready(seastar::abort_source& as, seastar::noncopyable_function<future<>()> func);
|
||||
|
||||
future<> create_metadata_table_if_missing(
|
||||
const sstring& table_name,
|
||||
stdx::string_view table_name,
|
||||
cql3::query_processor&,
|
||||
const sstring& cql,
|
||||
stdx::string_view cql,
|
||||
::service::migration_manager&);
|
||||
|
||||
future<> wait_for_schema_agreement(::service::migration_manager&, const database&);
|
||||
|
||||
}
|
||||
|
||||
@@ -109,7 +109,7 @@ future<bool> default_authorizer::any_granted() const {
|
||||
});
|
||||
}
|
||||
|
||||
future<> default_authorizer::migrate_legacy_metadata() {
|
||||
future<> default_authorizer::migrate_legacy_metadata() const {
|
||||
alogger.info("Starting migration of legacy permissions metadata.");
|
||||
static const sstring query = sprint("SELECT * FROM %s.%s", meta::AUTH_KS, legacy_table_name);
|
||||
|
||||
@@ -157,18 +157,18 @@ future<> default_authorizer::start() {
|
||||
create_table,
|
||||
_migration_manager).then([this] {
|
||||
_finished = do_after_system_ready(_as, [this] {
|
||||
if (legacy_metadata_exists()) {
|
||||
return any_granted().then([this](bool any) {
|
||||
if (!any) {
|
||||
return migrate_legacy_metadata();
|
||||
}
|
||||
return async([this] {
|
||||
wait_for_schema_agreement(_migration_manager, _qp.db().local()).get0();
|
||||
|
||||
alogger.warn("Ignoring legacy permissions metadata since role permissions exist.");
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
if (legacy_metadata_exists()) {
|
||||
if (!any_granted().get0()) {
|
||||
migrate_legacy_metadata().get0();
|
||||
return;
|
||||
}
|
||||
|
||||
return make_ready_future<>();
|
||||
alogger.warn("Ignoring legacy permissions metadata since role permissions exist.");
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -210,7 +210,7 @@ default_authorizer::modify(
|
||||
stdx::string_view role_name,
|
||||
permission_set set,
|
||||
const resource& resource,
|
||||
stdx::string_view op) {
|
||||
stdx::string_view op) const {
|
||||
return do_with(
|
||||
sprint(
|
||||
"UPDATE %s.%s SET %s = %s %s ? WHERE %s = ? AND %s = ?",
|
||||
@@ -230,11 +230,11 @@ default_authorizer::modify(
|
||||
}
|
||||
|
||||
|
||||
future<> default_authorizer::grant(stdx::string_view role_name, permission_set set, const resource& resource) {
|
||||
future<> default_authorizer::grant(stdx::string_view role_name, permission_set set, const resource& resource) const {
|
||||
return modify(role_name, std::move(set), resource, "+");
|
||||
}
|
||||
|
||||
future<> default_authorizer::revoke(stdx::string_view role_name, permission_set set, const resource& resource) {
|
||||
future<> default_authorizer::revoke(stdx::string_view role_name, permission_set set, const resource& resource) const {
|
||||
return modify(role_name, std::move(set), resource, "-");
|
||||
}
|
||||
|
||||
@@ -267,7 +267,7 @@ future<std::vector<permission_details>> default_authorizer::list_all() const {
|
||||
});
|
||||
}
|
||||
|
||||
future<> default_authorizer::revoke_all(stdx::string_view role_name) {
|
||||
future<> default_authorizer::revoke_all(stdx::string_view role_name) const {
|
||||
static const sstring query = sprint(
|
||||
"DELETE FROM %s.%s WHERE %s = ?",
|
||||
meta::AUTH_KS,
|
||||
@@ -286,7 +286,7 @@ future<> default_authorizer::revoke_all(stdx::string_view role_name) {
|
||||
});
|
||||
}
|
||||
|
||||
future<> default_authorizer::revoke_all(const resource& resource) {
|
||||
future<> default_authorizer::revoke_all(const resource& resource) const {
|
||||
static const sstring query = sprint(
|
||||
"SELECT %s FROM %s.%s WHERE %s = ? ALLOW FILTERING",
|
||||
ROLE_NAME,
|
||||
|
||||
@@ -77,15 +77,15 @@ public:
|
||||
|
||||
virtual future<permission_set> authorize(const role_or_anonymous&, const resource&) const override;
|
||||
|
||||
virtual future<> grant(stdx::string_view, permission_set, const resource&) override;
|
||||
virtual future<> grant(stdx::string_view, permission_set, const resource&) const override;
|
||||
|
||||
virtual future<> revoke( stdx::string_view, permission_set, const resource&) override;
|
||||
virtual future<> revoke( stdx::string_view, permission_set, const resource&) const override;
|
||||
|
||||
virtual future<std::vector<permission_details>> list_all() const override;
|
||||
|
||||
virtual future<> revoke_all(stdx::string_view) override;
|
||||
virtual future<> revoke_all(stdx::string_view) const override;
|
||||
|
||||
virtual future<> revoke_all(const resource&) override;
|
||||
virtual future<> revoke_all(const resource&) const override;
|
||||
|
||||
virtual const resource_set& protected_resources() const override;
|
||||
|
||||
@@ -94,9 +94,9 @@ private:
|
||||
|
||||
future<bool> any_granted() const;
|
||||
|
||||
future<> migrate_legacy_metadata();
|
||||
future<> migrate_legacy_metadata() const;
|
||||
|
||||
future<> modify(stdx::string_view, permission_set, const resource&, stdx::string_view);
|
||||
future<> modify(stdx::string_view, permission_set, const resource&, stdx::string_view) const;
|
||||
};
|
||||
|
||||
} /* namespace auth */
|
||||
|
||||
@@ -177,7 +177,7 @@ bool password_authenticator::legacy_metadata_exists() const {
|
||||
return _qp.db().local().has_schema(meta::AUTH_KS, legacy_table_name);
|
||||
}
|
||||
|
||||
future<> password_authenticator::migrate_legacy_metadata() {
|
||||
future<> password_authenticator::migrate_legacy_metadata() const {
|
||||
plogger.info("Starting migration of legacy authentication metadata.");
|
||||
static const sstring query = sprint("SELECT * FROM %s.%s", meta::AUTH_KS, legacy_table_name);
|
||||
|
||||
@@ -201,7 +201,7 @@ future<> password_authenticator::migrate_legacy_metadata() {
|
||||
});
|
||||
}
|
||||
|
||||
future<> password_authenticator::create_default_if_missing() {
|
||||
future<> password_authenticator::create_default_if_missing() const {
|
||||
return default_role_row_satisfies(_qp, &has_salted_hash).then([this](bool exists) {
|
||||
if (!exists) {
|
||||
return _qp.process(
|
||||
@@ -220,8 +220,16 @@ future<> password_authenticator::start() {
|
||||
return once_among_shards([this] {
|
||||
gensalt(); // do this once to determine usable hashing
|
||||
|
||||
auto f = create_metadata_table_if_missing(
|
||||
meta::roles_table::name,
|
||||
_qp,
|
||||
meta::roles_table::creation_query(),
|
||||
_migration_manager);
|
||||
|
||||
_stopped = do_after_system_ready(_as, [this] {
|
||||
return async([this] {
|
||||
wait_for_schema_agreement(_migration_manager, _qp.db().local()).get0();
|
||||
|
||||
if (any_nondefault_role_row_satisfies(_qp, &has_salted_hash).get0()) {
|
||||
if (legacy_metadata_exists()) {
|
||||
plogger.warn("Ignoring legacy authentication metadata since nondefault data already exist.");
|
||||
@@ -239,7 +247,7 @@ future<> password_authenticator::start() {
|
||||
});
|
||||
});
|
||||
|
||||
return make_ready_future<>();
|
||||
return f;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -317,7 +325,7 @@ future<authenticated_user> password_authenticator::authenticate(
|
||||
});
|
||||
}
|
||||
|
||||
future<> password_authenticator::create(stdx::string_view role_name, const authentication_options& options) {
|
||||
future<> password_authenticator::create(stdx::string_view role_name, const authentication_options& options) const {
|
||||
if (!options.password) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -328,7 +336,7 @@ future<> password_authenticator::create(stdx::string_view role_name, const authe
|
||||
{hashpw(*options.password), sstring(role_name)}).discard_result();
|
||||
}
|
||||
|
||||
future<> password_authenticator::alter(stdx::string_view role_name, const authentication_options& options) {
|
||||
future<> password_authenticator::alter(stdx::string_view role_name, const authentication_options& options) const {
|
||||
if (!options.password) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -345,7 +353,7 @@ future<> password_authenticator::alter(stdx::string_view role_name, const authen
|
||||
{hashpw(*options.password), sstring(role_name)}).discard_result();
|
||||
}
|
||||
|
||||
future<> password_authenticator::drop(stdx::string_view name) {
|
||||
future<> password_authenticator::drop(stdx::string_view name) const {
|
||||
static const sstring query = sprint(
|
||||
"DELETE %s FROM %s WHERE %s = ?",
|
||||
SALTED_HASH,
|
||||
@@ -355,6 +363,10 @@ future<> password_authenticator::drop(stdx::string_view name) {
|
||||
return _qp.process(query, consistency_for_user(name), {sstring(name)}).discard_result();
|
||||
}
|
||||
|
||||
future<custom_options> password_authenticator::query_custom_options(stdx::string_view role_name) const {
|
||||
return make_ready_future<custom_options>();
|
||||
}
|
||||
|
||||
const resource_set& password_authenticator::protected_resources() const {
|
||||
static const resource_set resources({make_data_resource(meta::AUTH_KS, meta::roles_table::name)});
|
||||
return resources;
|
||||
|
||||
@@ -81,11 +81,13 @@ public:
|
||||
|
||||
virtual future<authenticated_user> authenticate(const credentials_map& credentials) const override;
|
||||
|
||||
virtual future<> create(stdx::string_view role_name, const authentication_options& options) override;
|
||||
virtual future<> create(stdx::string_view role_name, const authentication_options& options) const override;
|
||||
|
||||
virtual future<> alter(stdx::string_view role_name, const authentication_options& options) override;
|
||||
virtual future<> alter(stdx::string_view role_name, const authentication_options& options) const override;
|
||||
|
||||
virtual future<> drop(stdx::string_view role_name) override;
|
||||
virtual future<> drop(stdx::string_view role_name) const override;
|
||||
|
||||
virtual future<custom_options> query_custom_options(stdx::string_view role_name) const override;
|
||||
|
||||
virtual const resource_set& protected_resources() const override;
|
||||
|
||||
@@ -94,9 +96,9 @@ public:
|
||||
private:
|
||||
bool legacy_metadata_exists() const;
|
||||
|
||||
future<> migrate_legacy_metadata();
|
||||
future<> migrate_legacy_metadata() const;
|
||||
|
||||
future<> create_default_if_missing();
|
||||
future<> create_default_if_missing() const;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -93,10 +93,12 @@ using role_set = std::unordered_set<sstring>;
|
||||
enum class recursive_role_query { yes, no };
|
||||
|
||||
///
|
||||
/// Abstract role manager.
|
||||
/// Abstract client for managing roles.
|
||||
///
|
||||
/// All implementations should throw role-related exceptions as documented, but authorization-related checking is
|
||||
/// handled by the CQL layer, and not here.
|
||||
/// All state necessary for managing roles is stored externally to the client instance.
|
||||
///
|
||||
/// All implementations should throw role-related exceptions as documented. Authorization is not addressed here, and
|
||||
/// access-control should never be enforced in implementations.
|
||||
///
|
||||
class role_manager {
|
||||
public:
|
||||
@@ -113,17 +115,17 @@ public:
|
||||
///
|
||||
/// \returns an exceptional future with \ref role_already_exists for a role that has previously been created.
|
||||
///
|
||||
virtual future<> create(stdx::string_view role_name, const role_config&) = 0;
|
||||
virtual future<> create(stdx::string_view role_name, const role_config&) const = 0;
|
||||
|
||||
///
|
||||
/// \returns an exceptional future with \ref nonexistant_role if the role does not exist.
|
||||
///
|
||||
virtual future<> drop(stdx::string_view role_name) = 0;
|
||||
virtual future<> drop(stdx::string_view role_name) const = 0;
|
||||
|
||||
///
|
||||
/// \returns an exceptional future with \ref nonexistant_role if the role does not exist.
|
||||
///
|
||||
virtual future<> alter(stdx::string_view role_name, const role_config_update&) = 0;
|
||||
virtual future<> alter(stdx::string_view role_name, const role_config_update&) const = 0;
|
||||
|
||||
///
|
||||
/// Grant `role_name` to `grantee_name`.
|
||||
@@ -133,7 +135,7 @@ public:
|
||||
/// \returns an exceptional future with \ref role_already_included if granting the role would be redundant, or
|
||||
/// create a cycle.
|
||||
///
|
||||
virtual future<> grant(stdx::string_view grantee_name, stdx::string_view role_name) = 0;
|
||||
virtual future<> grant(stdx::string_view grantee_name, stdx::string_view role_name) const = 0;
|
||||
|
||||
///
|
||||
/// Revoke `role_name` from `revokee_name`.
|
||||
@@ -142,7 +144,7 @@ public:
|
||||
///
|
||||
/// \returns an exceptional future with \ref revoke_ungranted_role if the role was not granted.
|
||||
///
|
||||
virtual future<> revoke(stdx::string_view revokee_name, stdx::string_view role_name) = 0;
|
||||
virtual future<> revoke(stdx::string_view revokee_name, stdx::string_view role_name) const = 0;
|
||||
|
||||
///
|
||||
/// \returns an exceptional future with \ref nonexistant_role if the role does not exist.
|
||||
|
||||
@@ -36,6 +36,21 @@ namespace meta {
|
||||
|
||||
namespace roles_table {
|
||||
|
||||
stdx::string_view creation_query() {
|
||||
static const sstring instance = sprint(
|
||||
"CREATE TABLE %s ("
|
||||
" %s text PRIMARY KEY,"
|
||||
" can_login boolean,"
|
||||
" is_superuser boolean,"
|
||||
" member_of set<text>,"
|
||||
" salted_hash text"
|
||||
")",
|
||||
qualified_name(),
|
||||
role_col_name);
|
||||
|
||||
return instance;
|
||||
}
|
||||
|
||||
stdx::string_view qualified_name() noexcept {
|
||||
static const sstring instance = AUTH_KS + "." + sstring(name);
|
||||
return instance;
|
||||
|
||||
@@ -40,6 +40,8 @@ namespace meta {
|
||||
|
||||
namespace roles_table {
|
||||
|
||||
stdx::string_view creation_query();
|
||||
|
||||
constexpr stdx::string_view name{"roles", 5};
|
||||
|
||||
stdx::string_view qualified_name() noexcept;
|
||||
|
||||
@@ -77,11 +77,18 @@ private:
|
||||
void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override {}
|
||||
|
||||
void on_drop_keyspace(const sstring& ks_name) override {
|
||||
_authorizer.revoke_all(auth::make_data_resource(ks_name));
|
||||
_authorizer.revoke_all(
|
||||
auth::make_data_resource(ks_name)).handle_exception_type([](const unsupported_authorization_operation&) {
|
||||
// Nothing.
|
||||
});
|
||||
}
|
||||
|
||||
void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {
|
||||
_authorizer.revoke_all(auth::make_data_resource(ks_name, cf_name));
|
||||
_authorizer.revoke_all(
|
||||
auth::make_data_resource(
|
||||
ks_name, cf_name)).handle_exception_type([](const unsupported_authorization_operation&) {
|
||||
// Nothing.
|
||||
});
|
||||
}
|
||||
|
||||
void on_drop_user_type(const sstring& ks_name, const sstring& type_name) override {}
|
||||
@@ -177,9 +184,7 @@ future<> service::start() {
|
||||
return once_among_shards([this] {
|
||||
return create_keyspace_if_missing();
|
||||
}).then([this] {
|
||||
return _role_manager->start();
|
||||
}).then([this] {
|
||||
return when_all_succeed(_authorizer->start(), _authenticator->start());
|
||||
return when_all_succeed(_role_manager->start(), _authorizer->start(), _authenticator->start());
|
||||
}).then([this] {
|
||||
_permissions_cache = std::make_unique<permissions_cache>(_permissions_cache_config, *this, log);
|
||||
}).then([this] {
|
||||
@@ -402,7 +407,7 @@ static void validate_authentication_options_are_supported(
|
||||
|
||||
|
||||
future<> create_role(
|
||||
service& ser,
|
||||
const service& ser,
|
||||
stdx::string_view name,
|
||||
const role_config& config,
|
||||
const authentication_options& options) {
|
||||
@@ -415,7 +420,7 @@ future<> create_role(
|
||||
&validate_authentication_options_are_supported,
|
||||
options,
|
||||
ser.underlying_authenticator().supported_options()).then([&ser, name, &options] {
|
||||
return ser.underlying_authenticator().create(sstring(name), options);
|
||||
return ser.underlying_authenticator().create(name, options);
|
||||
}).handle_exception([&ser, &name](std::exception_ptr ep) {
|
||||
// Roll-back.
|
||||
return ser.underlying_role_manager().drop(name).then([ep = std::move(ep)] {
|
||||
@@ -426,7 +431,7 @@ future<> create_role(
|
||||
}
|
||||
|
||||
future<> alter_role(
|
||||
service& ser,
|
||||
const service& ser,
|
||||
stdx::string_view name,
|
||||
const role_config_update& config_update,
|
||||
const authentication_options& options) {
|
||||
@@ -444,10 +449,15 @@ future<> alter_role(
|
||||
});
|
||||
}
|
||||
|
||||
future<> drop_role(service& ser, stdx::string_view name) {
|
||||
future<> drop_role(const service& ser, stdx::string_view name) {
|
||||
return do_with(make_role_resource(name), [&ser, name](const resource& r) {
|
||||
auto& a = ser.underlying_authorizer();
|
||||
return when_all_succeed(a.revoke_all(name), a.revoke_all(r));
|
||||
|
||||
return when_all_succeed(
|
||||
a.revoke_all(name),
|
||||
a.revoke_all(r)).handle_exception_type([](const unsupported_authorization_operation&) {
|
||||
// Nothing.
|
||||
});
|
||||
}).then([&ser, name] {
|
||||
return ser.underlying_authenticator().drop(name);
|
||||
}).then([&ser, name] {
|
||||
@@ -471,7 +481,7 @@ future<bool> has_role(const service& ser, const authenticated_user& u, stdx::str
|
||||
}
|
||||
|
||||
future<> grant_permissions(
|
||||
service& ser,
|
||||
const service& ser,
|
||||
stdx::string_view role_name,
|
||||
permission_set perms,
|
||||
const resource& r) {
|
||||
@@ -480,8 +490,19 @@ future<> grant_permissions(
|
||||
});
|
||||
}
|
||||
|
||||
future<> grant_applicable_permissions(const service& ser, stdx::string_view role_name, const resource& r) {
|
||||
return grant_permissions(ser, role_name, r.applicable_permissions(), r);
|
||||
}
|
||||
future<> grant_applicable_permissions(const service& ser, const authenticated_user& u, const resource& r) {
|
||||
if (is_anonymous(u)) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
return grant_applicable_permissions(ser, *u.name, r);
|
||||
}
|
||||
|
||||
future<> revoke_permissions(
|
||||
service& ser,
|
||||
const service& ser,
|
||||
stdx::string_view role_name,
|
||||
permission_set perms,
|
||||
const resource& r) {
|
||||
|
||||
@@ -75,12 +75,14 @@ public:
|
||||
};
|
||||
|
||||
///
|
||||
/// Central interface into access-control for the system.
|
||||
/// Client for access-control in the system.
|
||||
///
|
||||
/// Access control encompasses user/role management, authentication, and authorization. This class provides access to
|
||||
/// Access control encompasses user/role management, authentication, and authorization. This client provides access to
|
||||
/// the dynamically-loaded implementations of these modules (through the `underlying_*` member functions), but also
|
||||
/// builds on their functionality with caching and abstractions for common operations.
|
||||
///
|
||||
/// All state associated with access-control is stored externally to any particular instance of this class.
|
||||
///
|
||||
class service final {
|
||||
permissions_cache_config _permissions_cache_config;
|
||||
std::unique_ptr<permissions_cache> _permissions_cache;
|
||||
@@ -149,26 +151,14 @@ public:
|
||||
|
||||
future<bool> exists(const resource&) const;
|
||||
|
||||
authenticator& underlying_authenticator() {
|
||||
return *_authenticator;
|
||||
}
|
||||
|
||||
const authenticator& underlying_authenticator() const {
|
||||
return *_authenticator;
|
||||
}
|
||||
|
||||
authorizer& underlying_authorizer() {
|
||||
return *_authorizer;
|
||||
}
|
||||
|
||||
const authorizer& underlying_authorizer() const {
|
||||
return *_authorizer;
|
||||
}
|
||||
|
||||
role_manager& underlying_role_manager() {
|
||||
return *_role_manager;
|
||||
}
|
||||
|
||||
const role_manager& underlying_role_manager() const {
|
||||
return *_role_manager;
|
||||
}
|
||||
@@ -206,7 +196,7 @@ bool is_protected(const service&, const resource&) noexcept;
|
||||
/// \returns an exceptional future with \ref unsupported_authentication_option if an unsupported option is included.
|
||||
///
|
||||
future<> create_role(
|
||||
service&,
|
||||
const service&,
|
||||
stdx::string_view name,
|
||||
const role_config&,
|
||||
const authentication_options&);
|
||||
@@ -219,7 +209,7 @@ future<> create_role(
|
||||
/// \returns an exceptional future with \ref unsupported_authentication_option if an unsupported option is included.
|
||||
///
|
||||
future<> alter_role(
|
||||
service&,
|
||||
const service&,
|
||||
stdx::string_view name,
|
||||
const role_config_update&,
|
||||
const authentication_options&);
|
||||
@@ -229,7 +219,7 @@ future<> alter_role(
|
||||
///
|
||||
/// \returns an exceptional future with \ref nonexistant_role if the named role does not exist.
|
||||
///
|
||||
future<> drop_role(service&, stdx::string_view name);
|
||||
future<> drop_role(const service&, stdx::string_view name);
|
||||
|
||||
///
|
||||
/// Check if `grantee` has been granted the named role.
|
||||
@@ -247,17 +237,34 @@ future<bool> has_role(const service&, const authenticated_user&, stdx::string_vi
|
||||
///
|
||||
/// \returns an exceptional future with \ref nonexistent_role if the named role does not exist.
|
||||
///
|
||||
/// \returns an exceptional future with \ref unsupported_authorization_operation if granting permissions is not
|
||||
/// supported.
|
||||
///
|
||||
future<> grant_permissions(
|
||||
service&,
|
||||
const service&,
|
||||
stdx::string_view role_name,
|
||||
permission_set,
|
||||
const resource&);
|
||||
|
||||
///
|
||||
/// Like \ref grant_permissions, but grants all applicable permissions on the resource.
|
||||
///
|
||||
/// \returns an exceptional future with \ref nonexistent_role if the named role does not exist.
|
||||
///
|
||||
/// \returns an exceptional future with \ref unsupported_authorization_operation if granting permissions is not
|
||||
/// supported.
|
||||
///
|
||||
future<> grant_applicable_permissions(const service&, stdx::string_view role_name, const resource&);
|
||||
future<> grant_applicable_permissions(const service&, const authenticated_user&, const resource&);
|
||||
|
||||
///
|
||||
/// \returns an exceptional future with \ref nonexistent_role if the named role does not exist.
|
||||
///
|
||||
/// \returns an exceptional future with \ref unsupported_authorization_operation if revoking permissions is not
|
||||
/// supported.
|
||||
///
|
||||
future<> revoke_permissions(
|
||||
service&,
|
||||
const service&,
|
||||
stdx::string_view role_name,
|
||||
permission_set,
|
||||
const resource&);
|
||||
@@ -277,6 +284,9 @@ using recursive_permissions = bool_class<struct recursive_permissions_tag>;
|
||||
/// \returns an exceptional future with \ref nonexistent_role if a role name is included which refers to a role that
|
||||
/// does not exist.
|
||||
///
|
||||
/// \returns an exceptional future with \ref unsupported_authorization_operation if listing permissions is not
|
||||
/// supported.
|
||||
///
|
||||
future<std::vector<permission_details>> list_filtered_permissions(
|
||||
const service&,
|
||||
permission_set,
|
||||
|
||||
@@ -118,6 +118,10 @@ static future<record> require_record(cql3::query_processor& qp, stdx::string_vie
|
||||
});
|
||||
}
|
||||
|
||||
static bool has_can_login(const cql3::untyped_result_set_row& row) {
|
||||
return row.has("can_login") && !(boolean_type->deserialize(row.get_blob("can_login")).is_null());
|
||||
}
|
||||
|
||||
stdx::string_view standard_role_manager_name() noexcept {
|
||||
static const sstring instance = meta::AUTH_PACKAGE_NAME + "CassandraRoleManager";
|
||||
return instance;
|
||||
@@ -135,18 +139,7 @@ const resource_set& standard_role_manager::protected_resources() const {
|
||||
return resources;
|
||||
}
|
||||
|
||||
future<> standard_role_manager::create_metadata_tables_if_missing() {
|
||||
static const sstring create_roles_query = sprint(
|
||||
"CREATE TABLE %s ("
|
||||
" %s text PRIMARY KEY,"
|
||||
" can_login boolean,"
|
||||
" is_superuser boolean,"
|
||||
" member_of set<text>,"
|
||||
" salted_hash text"
|
||||
")",
|
||||
meta::roles_table::qualified_name(),
|
||||
meta::roles_table::role_col_name);
|
||||
|
||||
future<> standard_role_manager::create_metadata_tables_if_missing() const {
|
||||
static const sstring create_role_members_query = sprint(
|
||||
"CREATE TABLE %s ("
|
||||
" role text,"
|
||||
@@ -158,19 +151,19 @@ future<> standard_role_manager::create_metadata_tables_if_missing() {
|
||||
|
||||
return when_all_succeed(
|
||||
create_metadata_table_if_missing(
|
||||
sstring(meta::roles_table::name),
|
||||
meta::roles_table::name,
|
||||
_qp,
|
||||
create_roles_query,
|
||||
meta::roles_table::creation_query(),
|
||||
_migration_manager),
|
||||
create_metadata_table_if_missing(
|
||||
sstring(meta::role_members_table::name),
|
||||
meta::role_members_table::name,
|
||||
_qp,
|
||||
create_role_members_query,
|
||||
_migration_manager));
|
||||
}
|
||||
|
||||
future<> standard_role_manager::create_default_role_if_missing() {
|
||||
return default_role_row_satisfies(_qp, [](auto&&) { return true; }).then([this](bool exists) {
|
||||
future<> standard_role_manager::create_default_role_if_missing() const {
|
||||
return default_role_row_satisfies(_qp, &has_can_login).then([this](bool exists) {
|
||||
if (!exists) {
|
||||
static const sstring query = sprint(
|
||||
"INSERT INTO %s (%s, is_superuser, can_login) VALUES (?, true, true)",
|
||||
@@ -199,7 +192,7 @@ bool standard_role_manager::legacy_metadata_exists() const {
|
||||
return _qp.db().local().has_schema(meta::AUTH_KS, legacy_table_name);
|
||||
}
|
||||
|
||||
future<> standard_role_manager::migrate_legacy_metadata() {
|
||||
future<> standard_role_manager::migrate_legacy_metadata() const {
|
||||
log.info("Starting migration of legacy user metadata.");
|
||||
static const sstring query = sprint("SELECT * FROM %s.%s", meta::AUTH_KS, legacy_table_name);
|
||||
|
||||
@@ -231,7 +224,9 @@ future<> standard_role_manager::start() {
|
||||
return this->create_metadata_tables_if_missing().then([this] {
|
||||
_stopped = auth::do_after_system_ready(_as, [this] {
|
||||
return seastar::async([this] {
|
||||
if (any_nondefault_role_row_satisfies(_qp, [](auto&&) { return true; }).get0()) {
|
||||
wait_for_schema_agreement(_migration_manager, _qp.db().local()).get0();
|
||||
|
||||
if (any_nondefault_role_row_satisfies(_qp, &has_can_login).get0()) {
|
||||
if (this->legacy_metadata_exists()) {
|
||||
log.warn("Ignoring legacy user metadata since nondefault roles already exist.");
|
||||
}
|
||||
@@ -256,7 +251,7 @@ future<> standard_role_manager::stop() {
|
||||
return _stopped.handle_exception_type([] (const sleep_aborted&) { });
|
||||
}
|
||||
|
||||
future<> standard_role_manager::create_or_replace(stdx::string_view role_name, const role_config& c) {
|
||||
future<> standard_role_manager::create_or_replace(stdx::string_view role_name, const role_config& c) const {
|
||||
static const sstring query = sprint(
|
||||
"INSERT INTO %s (%s, is_superuser, can_login) VALUES (?, ?, ?)",
|
||||
meta::roles_table::qualified_name(),
|
||||
@@ -270,7 +265,7 @@ future<> standard_role_manager::create_or_replace(stdx::string_view role_name, c
|
||||
}
|
||||
|
||||
future<>
|
||||
standard_role_manager::create(stdx::string_view role_name, const role_config& c) {
|
||||
standard_role_manager::create(stdx::string_view role_name, const role_config& c) const {
|
||||
return this->exists(role_name).then([this, role_name, &c](bool role_exists) {
|
||||
if (role_exists) {
|
||||
throw role_already_exists(role_name);
|
||||
@@ -281,7 +276,7 @@ standard_role_manager::create(stdx::string_view role_name, const role_config& c)
|
||||
}
|
||||
|
||||
future<>
|
||||
standard_role_manager::alter(stdx::string_view role_name, const role_config_update& u) {
|
||||
standard_role_manager::alter(stdx::string_view role_name, const role_config_update& u) const {
|
||||
static const auto build_column_assignments = [](const role_config_update& u) -> sstring {
|
||||
std::vector<sstring> assignments;
|
||||
|
||||
@@ -312,7 +307,7 @@ standard_role_manager::alter(stdx::string_view role_name, const role_config_upda
|
||||
});
|
||||
}
|
||||
|
||||
future<> standard_role_manager::drop(stdx::string_view role_name) {
|
||||
future<> standard_role_manager::drop(stdx::string_view role_name) const {
|
||||
return this->exists(role_name).then([this, role_name](bool role_exists) {
|
||||
if (!role_exists) {
|
||||
throw nonexistant_role(role_name);
|
||||
@@ -379,7 +374,7 @@ future<>
|
||||
standard_role_manager::modify_membership(
|
||||
stdx::string_view grantee_name,
|
||||
stdx::string_view role_name,
|
||||
membership_change ch) {
|
||||
membership_change ch) const {
|
||||
|
||||
|
||||
const auto modify_roles = [this, role_name, grantee_name, ch] {
|
||||
@@ -421,7 +416,7 @@ standard_role_manager::modify_membership(
|
||||
}
|
||||
|
||||
future<>
|
||||
standard_role_manager::grant(stdx::string_view grantee_name, stdx::string_view role_name) {
|
||||
standard_role_manager::grant(stdx::string_view grantee_name, stdx::string_view role_name) const {
|
||||
const auto check_redundant = [this, role_name, grantee_name] {
|
||||
return this->query_granted(
|
||||
grantee_name,
|
||||
@@ -452,7 +447,7 @@ standard_role_manager::grant(stdx::string_view grantee_name, stdx::string_view r
|
||||
}
|
||||
|
||||
future<>
|
||||
standard_role_manager::revoke(stdx::string_view revokee_name, stdx::string_view role_name) {
|
||||
standard_role_manager::revoke(stdx::string_view revokee_name, stdx::string_view role_name) const {
|
||||
return this->exists(role_name).then([this, revokee_name, role_name](bool role_exists) {
|
||||
if (!role_exists) {
|
||||
throw nonexistant_role(sstring(role_name));
|
||||
|
||||
@@ -66,15 +66,15 @@ public:
|
||||
|
||||
virtual future<> stop() override;
|
||||
|
||||
virtual future<> create(stdx::string_view role_name, const role_config&) override;
|
||||
virtual future<> create(stdx::string_view role_name, const role_config&) const override;
|
||||
|
||||
virtual future<> drop(stdx::string_view role_name) override;
|
||||
virtual future<> drop(stdx::string_view role_name) const override;
|
||||
|
||||
virtual future<> alter(stdx::string_view role_name, const role_config_update&) override;
|
||||
virtual future<> alter(stdx::string_view role_name, const role_config_update&) const override;
|
||||
|
||||
virtual future<> grant(stdx::string_view grantee_name, stdx::string_view role_name) override;
|
||||
virtual future<> grant(stdx::string_view grantee_name, stdx::string_view role_name) const override;
|
||||
|
||||
virtual future<> revoke(stdx::string_view revokee_name, stdx::string_view role_name) override;
|
||||
virtual future<> revoke(stdx::string_view revokee_name, stdx::string_view role_name) const override;
|
||||
|
||||
virtual future<role_set> query_granted(stdx::string_view grantee_name, recursive_role_query) const override;
|
||||
|
||||
@@ -89,17 +89,17 @@ public:
|
||||
private:
|
||||
enum class membership_change { add, remove };
|
||||
|
||||
future<> create_metadata_tables_if_missing();
|
||||
future<> create_metadata_tables_if_missing() const;
|
||||
|
||||
bool legacy_metadata_exists() const;
|
||||
|
||||
future<> migrate_legacy_metadata();
|
||||
future<> migrate_legacy_metadata() const;
|
||||
|
||||
future<> create_default_role_if_missing();
|
||||
future<> create_default_role_if_missing() const;
|
||||
|
||||
future<> create_or_replace(stdx::string_view role_name, const role_config&);
|
||||
future<> create_or_replace(stdx::string_view role_name, const role_config&) const;
|
||||
|
||||
future<> modify_membership(stdx::string_view role_name, stdx::string_view grantee_name, membership_change);
|
||||
future<> modify_membership(stdx::string_view role_name, stdx::string_view grantee_name, membership_change) const;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -118,18 +118,22 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
virtual future<> create(stdx::string_view role_name, const authentication_options& options) override {
|
||||
virtual future<> create(stdx::string_view role_name, const authentication_options& options) const override {
|
||||
return _authenticator->create(role_name, options);
|
||||
}
|
||||
|
||||
virtual future<> alter(stdx::string_view role_name, const authentication_options& options) override {
|
||||
virtual future<> alter(stdx::string_view role_name, const authentication_options& options) const override {
|
||||
return _authenticator->alter(role_name, options);
|
||||
}
|
||||
|
||||
virtual future<> drop(stdx::string_view role_name) override {
|
||||
virtual future<> drop(stdx::string_view role_name) const override {
|
||||
return _authenticator->drop(role_name);
|
||||
}
|
||||
|
||||
virtual future<custom_options> query_custom_options(stdx::string_view role_name) const override {
|
||||
return _authenticator->query_custom_options(role_name);
|
||||
}
|
||||
|
||||
virtual const resource_set& protected_resources() const override {
|
||||
return _authenticator->protected_resources();
|
||||
}
|
||||
@@ -214,11 +218,11 @@ public:
|
||||
return make_ready_future<permission_set>(transitional_permissions);
|
||||
}
|
||||
|
||||
virtual future<> grant(stdx::string_view s, permission_set ps, const resource& r) override {
|
||||
virtual future<> grant(stdx::string_view s, permission_set ps, const resource& r) const override {
|
||||
return _authorizer->grant(s, std::move(ps), r);
|
||||
}
|
||||
|
||||
virtual future<> revoke(stdx::string_view s, permission_set ps, const resource& r) override {
|
||||
virtual future<> revoke(stdx::string_view s, permission_set ps, const resource& r) const override {
|
||||
return _authorizer->revoke(s, std::move(ps), r);
|
||||
}
|
||||
|
||||
@@ -226,11 +230,11 @@ public:
|
||||
return _authorizer->list_all();
|
||||
}
|
||||
|
||||
virtual future<> revoke_all(stdx::string_view s) override {
|
||||
virtual future<> revoke_all(stdx::string_view s) const override {
|
||||
return _authorizer->revoke_all(s);
|
||||
}
|
||||
|
||||
virtual future<> revoke_all(const resource& r) override {
|
||||
virtual future<> revoke_all(const resource& r) const override {
|
||||
return _authorizer->revoke_all(r);
|
||||
}
|
||||
|
||||
|
||||
@@ -47,6 +47,7 @@
|
||||
class backlog_controller {
|
||||
public:
|
||||
future<> shutdown() {
|
||||
_update_timer.cancel();
|
||||
return std::move(_inflight_update);
|
||||
}
|
||||
protected:
|
||||
|
||||
@@ -70,7 +70,7 @@ public:
|
||||
{
|
||||
if (!with_static_row) {
|
||||
if (_current == _end) {
|
||||
_current_start = _current_end = position_in_partition_view::after_all_clustered_rows();
|
||||
_current_start = position_in_partition_view::before_all_clustered_rows();
|
||||
} else {
|
||||
_current_start = position_in_partition_view::for_range_start(*_current);
|
||||
_current_end = position_in_partition_view::for_range_end(*_current);
|
||||
|
||||
@@ -228,6 +228,7 @@ scylla_tests = [
|
||||
'tests/memory_footprint',
|
||||
'tests/perf/perf_sstable',
|
||||
'tests/cql_query_test',
|
||||
'tests/secondary_index_test',
|
||||
'tests/storage_proxy_test',
|
||||
'tests/schema_change_test',
|
||||
'tests/mutation_reader_test',
|
||||
@@ -273,6 +274,8 @@ scylla_tests = [
|
||||
'tests/input_stream_test',
|
||||
'tests/virtual_reader_test',
|
||||
'tests/view_schema_test',
|
||||
'tests/view_build_test',
|
||||
'tests/view_complex_test',
|
||||
'tests/counter_test',
|
||||
'tests/cell_locker_test',
|
||||
'tests/row_locker_test',
|
||||
@@ -492,6 +495,7 @@ scylla_core = (['database.cc',
|
||||
'cql3/variable_specifications.cc',
|
||||
'db/consistency_level.cc',
|
||||
'db/system_keyspace.cc',
|
||||
'db/system_distributed_keyspace.cc',
|
||||
'db/schema_tables.cc',
|
||||
'db/cql_type_parser.cc',
|
||||
'db/legacy_schema_migrator.cc',
|
||||
@@ -502,12 +506,12 @@ scylla_core = (['database.cc',
|
||||
'db/config.cc',
|
||||
'db/extensions.cc',
|
||||
'db/heat_load_balance.cc',
|
||||
'db/index/secondary_index.cc',
|
||||
'db/marshal/type_parser.cc',
|
||||
'db/batchlog_manager.cc',
|
||||
'db/view/view.cc',
|
||||
'db/view/row_locking.cc',
|
||||
'index/secondary_index_manager.cc',
|
||||
'index/secondary_index.cc',
|
||||
'utils/UUID_gen.cc',
|
||||
'utils/i_filter.cc',
|
||||
'utils/bloom_filter.cc',
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
#include "cql3/column_identifier.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "cql3/selection/simple_selector.hh"
|
||||
#include "cql3/util.hh"
|
||||
|
||||
#include <regex>
|
||||
|
||||
@@ -62,14 +63,11 @@ sstring column_identifier::to_string() const {
|
||||
}
|
||||
|
||||
sstring column_identifier::to_cql_string() const {
|
||||
static const std::regex unquoted_identifier_re("[a-z][a-z0-9_]*");
|
||||
if (std::regex_match(_text.begin(), _text.end(), unquoted_identifier_re)) {
|
||||
return _text;
|
||||
}
|
||||
static const std::regex double_quote_re("\"");
|
||||
std::string result = _text;
|
||||
std::regex_replace(result, double_quote_re, "\"\"");
|
||||
return '"' + result + '"';
|
||||
return util::maybe_quote(_text);
|
||||
}
|
||||
|
||||
sstring column_identifier::raw::to_cql_string() const {
|
||||
return util::maybe_quote(_text);
|
||||
}
|
||||
|
||||
column_identifier::raw::raw(sstring raw_text, bool keep_case)
|
||||
|
||||
@@ -123,6 +123,7 @@ public:
|
||||
bool operator!=(const raw& other) const;
|
||||
|
||||
virtual sstring to_string() const;
|
||||
sstring to_cql_string() const;
|
||||
|
||||
friend std::hash<column_identifier::raw>;
|
||||
friend std::ostream& operator<<(std::ostream& out, const column_identifier::raw& id);
|
||||
|
||||
@@ -395,18 +395,15 @@ operator<<(std::ostream& os, const cql3_type::raw& r) {
|
||||
|
||||
namespace util {
|
||||
|
||||
sstring maybe_quote(const sstring& s) {
|
||||
static const std::regex unquoted("\\w*");
|
||||
static const std::regex double_quote("\"");
|
||||
|
||||
if (std::regex_match(s.begin(), s.end(), unquoted)) {
|
||||
return s;
|
||||
sstring maybe_quote(const sstring& identifier) {
|
||||
static const std::regex unquoted_identifier_re("[a-z][a-z0-9_]*");
|
||||
if (std::regex_match(identifier.begin(), identifier.end(), unquoted_identifier_re)) {
|
||||
return identifier;
|
||||
}
|
||||
std::ostringstream ss;
|
||||
ss << "\"";
|
||||
std::regex_replace(std::ostreambuf_iterator<char>(ss), s.begin(), s.end(), double_quote, "\"\"");
|
||||
ss << "\"";
|
||||
return ss.str();
|
||||
static const std::regex double_quote_re("\"");
|
||||
std::string result = identifier;
|
||||
std::regex_replace(result, double_quote_re, "\"\"");
|
||||
return '"' + result + '"';
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -172,7 +172,30 @@ query_processor::query_processor(distributed<service::storage_proxy>& proxy, dis
|
||||
sm::make_gauge(
|
||||
"prepared_cache_memory_footprint",
|
||||
[this] { return _prepared_cache.memory_footprint(); },
|
||||
sm::description("Size (in bytes) of the prepared statements cache."))});
|
||||
sm::description("Size (in bytes) of the prepared statements cache.")),
|
||||
|
||||
sm::make_derive(
|
||||
"secondary_index_creates",
|
||||
_cql_stats.secondary_index_creates,
|
||||
sm::description("Counts a total number of CQL CREATE INDEX requests.")),
|
||||
|
||||
sm::make_derive(
|
||||
"secondary_index_drops",
|
||||
_cql_stats.secondary_index_drops,
|
||||
sm::description("Counts a total number of CQL DROP INDEX requests.")),
|
||||
|
||||
// secondary_index_reads total count is also included in all cql reads
|
||||
sm::make_derive(
|
||||
"secondary_index_reads",
|
||||
_cql_stats.secondary_index_reads,
|
||||
sm::description("Counts a total number of CQL read requests performed using secondary indexes.")),
|
||||
|
||||
// secondary_index_rows_read total count is also included in all cql rows read
|
||||
sm::make_derive(
|
||||
"secondary_index_rows_read",
|
||||
_cql_stats.secondary_index_rows_read,
|
||||
sm::description("Counts a total number of rows read during CQL requests performed using secondary indexes."))
|
||||
});
|
||||
|
||||
service::get_local_migration_manager().register_listener(_migration_subscriber.get());
|
||||
}
|
||||
|
||||
@@ -64,13 +64,15 @@ class single_column_primary_key_restrictions : public primary_key_restrictions<V
|
||||
using bounds_range_type = typename primary_key_restrictions<ValueType>::bounds_range_type;
|
||||
private:
|
||||
schema_ptr _schema;
|
||||
bool _allow_filtering;
|
||||
::shared_ptr<single_column_restrictions> _restrictions;
|
||||
bool _slice;
|
||||
bool _contains;
|
||||
bool _in;
|
||||
public:
|
||||
single_column_primary_key_restrictions(schema_ptr schema)
|
||||
single_column_primary_key_restrictions(schema_ptr schema, bool allow_filtering)
|
||||
: _schema(schema)
|
||||
, _allow_filtering(allow_filtering)
|
||||
, _restrictions(::make_shared<single_column_restrictions>(schema))
|
||||
, _slice(false)
|
||||
, _contains(false)
|
||||
@@ -110,7 +112,7 @@ public:
|
||||
}
|
||||
|
||||
void do_merge_with(::shared_ptr<single_column_restriction> restriction) {
|
||||
if (!_restrictions->empty()) {
|
||||
if (!_restrictions->empty() && !_allow_filtering) {
|
||||
auto last_column = *_restrictions->last_column();
|
||||
auto new_column = restriction->get_column_def();
|
||||
|
||||
|
||||
@@ -41,14 +41,17 @@ using boost::adaptors::transformed;
|
||||
|
||||
template<typename T>
|
||||
class statement_restrictions::initial_key_restrictions : public primary_key_restrictions<T> {
|
||||
bool _allow_filtering;
|
||||
public:
|
||||
initial_key_restrictions(bool allow_filtering)
|
||||
: _allow_filtering(allow_filtering) {}
|
||||
using bounds_range_type = typename primary_key_restrictions<T>::bounds_range_type;
|
||||
|
||||
::shared_ptr<primary_key_restrictions<T>> do_merge_to(schema_ptr schema, ::shared_ptr<restriction> restriction) const {
|
||||
if (restriction->is_multi_column()) {
|
||||
throw std::runtime_error(sprint("%s not implemented", __PRETTY_FUNCTION__));
|
||||
}
|
||||
return ::make_shared<single_column_primary_key_restrictions<T>>(schema)->merge_to(schema, restriction);
|
||||
return ::make_shared<single_column_primary_key_restrictions<T>>(schema, _allow_filtering)->merge_to(schema, restriction);
|
||||
}
|
||||
::shared_ptr<primary_key_restrictions<T>> merge_to(schema_ptr schema, ::shared_ptr<restriction> restriction) override {
|
||||
if (restriction->is_multi_column()) {
|
||||
@@ -57,7 +60,7 @@ public:
|
||||
if (restriction->is_on_token()) {
|
||||
return static_pointer_cast<token_restriction>(restriction);
|
||||
}
|
||||
return ::make_shared<single_column_primary_key_restrictions<T>>(schema)->merge_to(restriction);
|
||||
return ::make_shared<single_column_primary_key_restrictions<T>>(schema, _allow_filtering)->merge_to(restriction);
|
||||
}
|
||||
void merge_with(::shared_ptr<restriction> restriction) override {
|
||||
throw exceptions::unsupported_operation_exception();
|
||||
@@ -122,9 +125,10 @@ statement_restrictions::initial_key_restrictions<clustering_key_prefix>::merge_t
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
::shared_ptr<primary_key_restrictions<T>> statement_restrictions::get_initial_key_restrictions() {
|
||||
static thread_local ::shared_ptr<primary_key_restrictions<T>> initial_kr = ::make_shared<initial_key_restrictions<T>>();
|
||||
return initial_kr;
|
||||
::shared_ptr<primary_key_restrictions<T>> statement_restrictions::get_initial_key_restrictions(bool allow_filtering) {
|
||||
static thread_local ::shared_ptr<primary_key_restrictions<T>> initial_kr_true = ::make_shared<initial_key_restrictions<T>>(true);
|
||||
static thread_local ::shared_ptr<primary_key_restrictions<T>> initial_kr_false = ::make_shared<initial_key_restrictions<T>>(false);
|
||||
return allow_filtering ? initial_kr_true : initial_kr_false;
|
||||
}
|
||||
|
||||
std::vector<::shared_ptr<column_identifier>>
|
||||
@@ -141,10 +145,10 @@ statement_restrictions::get_partition_key_unrestricted_components() const {
|
||||
return r;
|
||||
}
|
||||
|
||||
statement_restrictions::statement_restrictions(schema_ptr schema)
|
||||
statement_restrictions::statement_restrictions(schema_ptr schema, bool allow_filtering)
|
||||
: _schema(schema)
|
||||
, _partition_key_restrictions(get_initial_key_restrictions<partition_key>())
|
||||
, _clustering_columns_restrictions(get_initial_key_restrictions<clustering_key_prefix>())
|
||||
, _partition_key_restrictions(get_initial_key_restrictions<partition_key>(allow_filtering))
|
||||
, _clustering_columns_restrictions(get_initial_key_restrictions<clustering_key_prefix>(allow_filtering))
|
||||
, _nonprimary_key_restrictions(::make_shared<single_column_restrictions>(schema))
|
||||
{ }
|
||||
#if 0
|
||||
@@ -162,8 +166,9 @@ statement_restrictions::statement_restrictions(database& db,
|
||||
::shared_ptr<variable_specifications> bound_names,
|
||||
bool selects_only_static_columns,
|
||||
bool select_a_collection,
|
||||
bool for_view)
|
||||
: statement_restrictions(schema)
|
||||
bool for_view,
|
||||
bool allow_filtering)
|
||||
: statement_restrictions(schema, allow_filtering)
|
||||
{
|
||||
/*
|
||||
* WHERE clause. For a given entity, rules are: - EQ relation conflicts with anything else (including a 2nd EQ)
|
||||
@@ -327,6 +332,17 @@ void statement_restrictions::process_partition_key_restrictions(bool has_queriab
|
||||
_is_key_range = true;
|
||||
_uses_secondary_indexing = has_queriable_index;
|
||||
}
|
||||
if (_partition_key_restrictions->is_slice() && !_partition_key_restrictions->is_on_token() && !for_view) {
|
||||
// A SELECT query may not request a slice (range) of partition keys
|
||||
// without using token(). This is because there is no way to do this
|
||||
// query efficiently: mumur3 turns a contiguous range of partition
|
||||
// keys into tokens all over the token space.
|
||||
// However, in a SELECT statement used to define a materialized view,
|
||||
// such a slice is fine - it is used to check whether individual
|
||||
// partitions, match, and does not present a performance problem.
|
||||
throw exceptions::invalid_request_exception(
|
||||
"Only EQ and IN relation are supported on the partition key (unless you use the token() function)");
|
||||
}
|
||||
}
|
||||
|
||||
bool statement_restrictions::has_partition_key_unrestricted_components() const {
|
||||
|
||||
@@ -67,7 +67,7 @@ private:
|
||||
class initial_key_restrictions;
|
||||
|
||||
template<typename T>
|
||||
static ::shared_ptr<primary_key_restrictions<T>> get_initial_key_restrictions();
|
||||
static ::shared_ptr<primary_key_restrictions<T>> get_initial_key_restrictions(bool allow_filtering);
|
||||
|
||||
/**
|
||||
* Restrictions on partitioning columns
|
||||
@@ -108,7 +108,7 @@ public:
|
||||
* @param cfm the column family meta data
|
||||
* @return a new empty <code>StatementRestrictions</code>.
|
||||
*/
|
||||
statement_restrictions(schema_ptr schema);
|
||||
statement_restrictions(schema_ptr schema, bool allow_filtering);
|
||||
|
||||
statement_restrictions(database& db,
|
||||
schema_ptr schema,
|
||||
@@ -117,7 +117,8 @@ public:
|
||||
::shared_ptr<variable_specifications> bound_names,
|
||||
bool selects_only_static_columns,
|
||||
bool select_a_collection,
|
||||
bool for_view = false);
|
||||
bool for_view = false,
|
||||
bool allow_filtering = false);
|
||||
private:
|
||||
void add_restriction(::shared_ptr<restriction> restriction);
|
||||
void add_single_column_restriction(::shared_ptr<single_column_restriction> restriction);
|
||||
|
||||
@@ -116,18 +116,6 @@ single_column_relation::to_receivers(schema_ptr schema, const column_definition&
|
||||
throw exceptions::invalid_request_exception(sprint(
|
||||
"IN predicates on non-primary-key columns (%s) is not yet supported", column_def.name_as_text()));
|
||||
}
|
||||
} else if (is_slice()) {
|
||||
// Non EQ relation is not supported without token(), even if we have a 2ndary index (since even those
|
||||
// are ordered by partitioner).
|
||||
// Note: In theory we could allow it for 2ndary index queries with ALLOW FILTERING, but that would
|
||||
// probably require some special casing
|
||||
// Note bis: This is also why we don't bother handling the 'tuple' notation of #4851 for keys. If we
|
||||
// lift the limitation for 2ndary
|
||||
// index with filtering, we'll need to handle it though.
|
||||
if (column_def.is_partition_key()) {
|
||||
throw exceptions::invalid_request_exception(
|
||||
"Only EQ and IN relation are supported on the partition key (unless you use the token() function)");
|
||||
}
|
||||
}
|
||||
|
||||
if (is_contains() && !receiver->type->is_collection()) {
|
||||
|
||||
@@ -134,7 +134,7 @@ protected:
|
||||
#endif
|
||||
|
||||
virtual sstring to_string() const override {
|
||||
auto entity_as_string = _entity->to_string();
|
||||
auto entity_as_string = _entity->to_cql_string();
|
||||
if (_map_key) {
|
||||
entity_as_string = sprint("%s[%s]", std::move(entity_as_string), _map_key->to_string());
|
||||
}
|
||||
|
||||
@@ -247,10 +247,11 @@ future<shared_ptr<cql_transport::event::schema_change>> alter_table_statement::a
|
||||
cfm.with_column(column_name->name(), type, _is_static ? column_kind::static_column : column_kind::regular_column);
|
||||
|
||||
// Adding a column to a table which has an include all view requires the column to be added to the view
|
||||
// as well
|
||||
// as well. If the view has a regular base column in its PK, then the column ID needs to be updated in
|
||||
// view_info; for that, rebuild the schema.
|
||||
if (!_is_static) {
|
||||
for (auto&& view : cf.views()) {
|
||||
if (view->view_info()->include_all_columns()) {
|
||||
if (view->view_info()->include_all_columns() || view->view_info()->base_non_pk_column_in_view_pk()) {
|
||||
schema_builder builder(view);
|
||||
builder.with_column(column_name->name(), type);
|
||||
view_updates.push_back(view_ptr(builder.build()));
|
||||
@@ -305,14 +306,10 @@ future<shared_ptr<cql_transport::event::schema_change>> alter_table_statement::a
|
||||
}
|
||||
}
|
||||
|
||||
// If a column is dropped which is included in a view, we don't allow the drop to take place.
|
||||
auto view_names = ::join(", ", cf.views()
|
||||
| boost::adaptors::filtered([&] (auto&& v) { return bool(v->get_column_definition(column_name->name())); })
|
||||
| boost::adaptors::transformed([] (auto&& v) { return v->cf_name(); }));
|
||||
if (!view_names.empty()) {
|
||||
if (!cf.views().empty()) {
|
||||
throw exceptions::invalid_request_exception(sprint(
|
||||
"Cannot drop column %s, depended on by materialized views (%s.{%s})",
|
||||
column_name, keyspace(), view_names));
|
||||
"Cannot drop column %s on base table %s.%s with materialized views",
|
||||
column_name, keyspace(), column_family()));
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -252,6 +252,7 @@ create_index_statement::announce_migration(distributed<service::storage_proxy>&
|
||||
sprint("Index %s is a duplicate of existing index %s", index.name(), existing_index.value().name()));
|
||||
}
|
||||
}
|
||||
++_cql_stats->secondary_index_creates;
|
||||
schema_builder builder{schema};
|
||||
builder.with_index(index);
|
||||
return service::get_local_migration_manager().announce_column_family_update(
|
||||
@@ -267,6 +268,7 @@ create_index_statement::announce_migration(distributed<service::storage_proxy>&
|
||||
|
||||
std::unique_ptr<cql3::statements::prepared_statement>
|
||||
create_index_statement::prepare(database& db, cql_stats& stats) {
|
||||
_cql_stats = &stats;
|
||||
return std::make_unique<prepared_statement>(make_shared<create_index_statement>(*this));
|
||||
}
|
||||
|
||||
@@ -279,7 +281,7 @@ index_metadata create_index_statement::make_index_metadata(schema_ptr schema,
|
||||
index_options_map new_options = options;
|
||||
auto target_option = boost::algorithm::join(targets | boost::adaptors::transformed(
|
||||
[schema](const auto &target) -> sstring {
|
||||
return target->as_cql_string(schema);
|
||||
return target->as_string();
|
||||
}), ",");
|
||||
new_options.emplace(index_target::target_option_name, target_option);
|
||||
return index_metadata{name, new_options, kind};
|
||||
|
||||
@@ -70,7 +70,7 @@ class create_index_statement : public schema_altering_statement {
|
||||
const std::vector<::shared_ptr<index_target::raw>> _raw_targets;
|
||||
const ::shared_ptr<index_prop_defs> _properties;
|
||||
const bool _if_not_exists;
|
||||
|
||||
cql_stats* _cql_stats = nullptr;
|
||||
|
||||
public:
|
||||
create_index_statement(::shared_ptr<cf_name> name, ::shared_ptr<index_name> index_name,
|
||||
|
||||
@@ -128,6 +128,17 @@ cql3::statements::create_keyspace_statement::prepare(database& db, cql_stats& st
|
||||
return std::make_unique<prepared_statement>(make_shared<create_keyspace_statement>(*this));
|
||||
}
|
||||
|
||||
future<> cql3::statements::create_keyspace_statement::grant_permissions_to_creator(const service::client_state& cs) {
|
||||
return do_with(auth::make_data_resource(keyspace()), [&cs](const auth::resource& r) {
|
||||
return auth::grant_applicable_permissions(
|
||||
*cs.get_auth_service(),
|
||||
*cs.user(),
|
||||
r).handle_exception_type([](const auth::unsupported_authorization_operation&) {
|
||||
// Nothing.
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -84,6 +84,8 @@ public:
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
|
||||
|
||||
virtual std::unique_ptr<prepared> prepare(database& db, cql_stats& stats) override;
|
||||
|
||||
virtual future<> grant_permissions_to_creator(const service::client_state&) override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -70,6 +70,8 @@ public:
|
||||
, _if_not_exists(if_not_exists) {
|
||||
}
|
||||
|
||||
future<> grant_permissions_to_creator(const service::client_state&) const;
|
||||
|
||||
void validate(distributed<service::storage_proxy>&, const service::client_state&) override;
|
||||
|
||||
virtual future<> check_access(const service::client_state&) override;
|
||||
|
||||
@@ -49,6 +49,8 @@
|
||||
#include "cql3/statements/create_table_statement.hh"
|
||||
#include "cql3/statements/prepared_statement.hh"
|
||||
|
||||
#include "auth/resource.hh"
|
||||
#include "auth/service.hh"
|
||||
#include "schema_builder.hh"
|
||||
#include "service/storage_service.hh"
|
||||
|
||||
@@ -162,6 +164,16 @@ create_table_statement::prepare(database& db, cql_stats& stats) {
|
||||
abort();
|
||||
}
|
||||
|
||||
future<> create_table_statement::grant_permissions_to_creator(const service::client_state& cs) {
|
||||
return do_with(auth::make_data_resource(keyspace(), column_family()), [&cs](const auth::resource& r) {
|
||||
return auth::grant_applicable_permissions(
|
||||
*cs.get_auth_service(),
|
||||
*cs.user(),
|
||||
r).handle_exception_type([](const auth::unsupported_authorization_operation&) {
|
||||
// Nothing.
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
create_table_statement::raw_statement::raw_statement(::shared_ptr<cf_name> name, bool if_not_exists)
|
||||
: cf_statement{std::move(name)}
|
||||
|
||||
@@ -106,6 +106,8 @@ public:
|
||||
|
||||
virtual std::unique_ptr<prepared> prepare(database& db, cql_stats& stats) override;
|
||||
|
||||
virtual future<> grant_permissions_to_creator(const service::client_state&) override;
|
||||
|
||||
schema_ptr get_cf_meta_data(const database&);
|
||||
|
||||
class raw_statement;
|
||||
|
||||
@@ -127,22 +127,25 @@ static bool validate_primary_key(
|
||||
"Cannot use Static column '%s' in PRIMARY KEY of materialized view", def->name_as_text()));
|
||||
}
|
||||
|
||||
bool new_non_pk_column = false;
|
||||
if (base_pk.find(def) == base_pk.end()) {
|
||||
if (has_non_pk_column) {
|
||||
throw exceptions::invalid_request_exception(sprint(
|
||||
"Cannot include more than one non-primary key column '%s' in materialized view primary key", def->name_as_text()));
|
||||
}
|
||||
return true;
|
||||
new_non_pk_column = true;
|
||||
}
|
||||
|
||||
// We don't need to include the "IS NOT NULL" filter on a non-composite partition key
|
||||
// because we will never allow a single partition key to be NULL
|
||||
if (schema->partition_key_columns().size() > 1 && !restrictions.is_restricted(def)) {
|
||||
bool is_non_composite_partition_key = def->is_partition_key() &&
|
||||
schema->partition_key_columns().size() == 1;
|
||||
if (!is_non_composite_partition_key && !restrictions.is_restricted(def)) {
|
||||
throw exceptions::invalid_request_exception(sprint(
|
||||
"Primary key column '%s' is required to be filtered by 'IS NOT NULL'", def->name_as_text()));
|
||||
}
|
||||
|
||||
return false;
|
||||
return new_non_pk_column;
|
||||
}
|
||||
|
||||
future<shared_ptr<cql_transport::event::schema_change>> create_view_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) {
|
||||
@@ -247,13 +250,6 @@ future<shared_ptr<cql_transport::event::schema_change>> create_view_statement::a
|
||||
boost::range::join(schema->partition_key_columns(), schema->clustering_key_columns())
|
||||
| boost::adaptors::transformed([](auto&& def) { return &def; }));
|
||||
|
||||
if (_partition_keys.empty()) {
|
||||
throw exceptions::invalid_request_exception(sprint("Must select at least a column for a Materialized View"));
|
||||
}
|
||||
if (_clustering_keys.empty()) {
|
||||
throw exceptions::invalid_request_exception(sprint("No columns are defined for Materialized View other than primary key"));
|
||||
}
|
||||
|
||||
// Validate the primary key clause, ensuring only one non-PK base column is used in the view's PK.
|
||||
bool has_non_pk_column = false;
|
||||
std::unordered_set<const column_definition*> target_primary_keys;
|
||||
@@ -295,18 +291,26 @@ future<shared_ptr<cql_transport::event::schema_change>> create_view_statement::a
|
||||
bool def_in_target_pk = std::find(target_primary_keys.begin(), target_primary_keys.end(), &def) != target_primary_keys.end();
|
||||
if (included_def && !def_in_target_pk) {
|
||||
target_non_pk_columns.push_back(&def);
|
||||
} else if (def.is_primary_key() && !def_in_target_pk) {
|
||||
}
|
||||
if (def.is_primary_key() && !def_in_target_pk) {
|
||||
missing_pk_columns.push_back(&def);
|
||||
}
|
||||
}
|
||||
|
||||
if (!missing_pk_columns.empty()) {
|
||||
auto column_names = ::join(", ", missing_pk_columns | boost::adaptors::transformed(std::mem_fn(&column_definition::name)));
|
||||
auto column_names = ::join(", ", missing_pk_columns | boost::adaptors::transformed(std::mem_fn(&column_definition::name_as_text)));
|
||||
throw exceptions::invalid_request_exception(sprint(
|
||||
"Cannot create Materialized View %s without primary key columns from base %s (%s)",
|
||||
column_family(), _base_name->get_column_family(), column_names));
|
||||
}
|
||||
|
||||
if (_partition_keys.empty()) {
|
||||
throw exceptions::invalid_request_exception(sprint("Must select at least a column for a Materialized View"));
|
||||
}
|
||||
if (_clustering_keys.empty()) {
|
||||
throw exceptions::invalid_request_exception(sprint("No columns are defined for Materialized View other than primary key"));
|
||||
}
|
||||
|
||||
schema_builder builder{keyspace(), column_family()};
|
||||
auto add_columns = [this, &builder] (std::vector<const column_definition*>& defs, column_kind kind) mutable {
|
||||
for (auto* def : defs) {
|
||||
|
||||
@@ -86,6 +86,7 @@ future<shared_ptr<cql_transport::event::schema_change>> drop_index_statement::an
|
||||
if (!cfm) {
|
||||
return make_ready_future<::shared_ptr<cql_transport::event::schema_change>>(nullptr);
|
||||
}
|
||||
++_cql_stats->secondary_index_drops;
|
||||
auto builder = schema_builder(cfm);
|
||||
builder.without_index(_index_name);
|
||||
return service::get_local_migration_manager().announce_column_family_update(builder.build(), false, {}, is_local_only).then([cfm] {
|
||||
@@ -102,6 +103,7 @@ future<shared_ptr<cql_transport::event::schema_change>> drop_index_statement::an
|
||||
|
||||
std::unique_ptr<cql3::statements::prepared_statement>
|
||||
drop_index_statement::prepare(database& db, cql_stats& stats) {
|
||||
_cql_stats = &stats;
|
||||
return std::make_unique<prepared_statement>(make_shared<drop_index_statement>(*this));
|
||||
}
|
||||
|
||||
|
||||
@@ -56,6 +56,7 @@ namespace statements {
|
||||
class drop_index_statement : public schema_altering_statement {
|
||||
sstring _index_name;
|
||||
bool _if_exists;
|
||||
cql_stats* _cql_stats = nullptr;
|
||||
public:
|
||||
drop_index_statement(::shared_ptr<index_name> index_name, bool if_exists);
|
||||
|
||||
@@ -74,4 +75,4 @@ private:
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,5 +51,8 @@ cql3::statements::grant_statement::execute(distributed<service::storage_proxy>&
|
||||
}).handle_exception_type([](const auth::nonexistant_role& e) {
|
||||
return make_exception_future<::shared_ptr<cql_transport::messages::result_message>>(
|
||||
exceptions::invalid_request_exception(e.what()));
|
||||
}).handle_exception_type([](const auth::unsupported_authorization_operation& e) {
|
||||
return make_exception_future<::shared_ptr<cql_transport::messages::result_message>>(
|
||||
exceptions::invalid_request_exception(e.what()));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -41,7 +41,7 @@
|
||||
|
||||
#include <set>
|
||||
#include "index_prop_defs.hh"
|
||||
#include "db/index/secondary_index.hh"
|
||||
#include "index/secondary_index.hh"
|
||||
|
||||
void cql3::statements::index_prop_defs::validate() {
|
||||
static std::set<sstring> keywords({ sstring(KW_OPTIONS) });
|
||||
|
||||
@@ -41,7 +41,7 @@
|
||||
|
||||
#include <stdexcept>
|
||||
#include "index_target.hh"
|
||||
#include "db/index/secondary_index.hh"
|
||||
#include "index/secondary_index.hh"
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
@@ -59,6 +59,10 @@ sstring index_target::as_cql_string(schema_ptr schema) const {
|
||||
return sprint("%s(%s)", to_sstring(type), column->to_cql_string());
|
||||
}
|
||||
|
||||
sstring index_target::as_string() const {
|
||||
return column->to_string();
|
||||
}
|
||||
|
||||
index_target::target_type index_target::from_sstring(const sstring& s)
|
||||
{
|
||||
if (s == "keys") {
|
||||
|
||||
@@ -43,7 +43,6 @@
|
||||
|
||||
#include "core/shared_ptr.hh"
|
||||
#include "cql3/column_identifier.hh"
|
||||
#include "db/index/secondary_index.hh"
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
@@ -65,6 +64,7 @@ struct index_target {
|
||||
}
|
||||
|
||||
sstring as_cql_string(schema_ptr schema) const;
|
||||
sstring as_string() const;
|
||||
|
||||
static sstring index_option(target_type type);
|
||||
static target_type from_column_definition(const column_definition& cd);
|
||||
|
||||
@@ -171,6 +171,9 @@ cql3::statements::list_permissions_statement::execute(
|
||||
}).handle_exception_type([](const auth::nonexistant_role& e) {
|
||||
return make_exception_future<::shared_ptr<cql_transport::messages::result_message>>(
|
||||
exceptions::invalid_request_exception(e.what()));
|
||||
}).handle_exception_type([](const auth::unsupported_authorization_operation& e) {
|
||||
return make_exception_future<::shared_ptr<cql_transport::messages::result_message>>(
|
||||
exceptions::invalid_request_exception(e.what()));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -70,7 +70,7 @@ cql3::statements::list_users_statement::execute(distributed<service::storage_pro
|
||||
make_column_spec("name", utf8_type),
|
||||
make_column_spec("super", boolean_type)});
|
||||
|
||||
static const auto make_results = [](auth::service& as, std::unordered_set<sstring>&& roles) {
|
||||
static const auto make_results = [](const auth::service& as, std::unordered_set<sstring>&& roles) {
|
||||
using cql_transport::messages::result_message;
|
||||
|
||||
auto results = std::make_unique<result_set>(metadata);
|
||||
@@ -98,8 +98,8 @@ cql3::statements::list_users_statement::execute(distributed<service::storage_pro
|
||||
});
|
||||
};
|
||||
|
||||
auto& cs = state.get_client_state();
|
||||
auto& as = *cs.get_auth_service();
|
||||
const auto& cs = state.get_client_state();
|
||||
const auto& as = *cs.get_auth_service();
|
||||
const auto user = cs.user();
|
||||
|
||||
return auth::has_superuser(as, *user).then([&cs, &as, user](bool has_superuser) {
|
||||
|
||||
@@ -51,5 +51,8 @@ cql3::statements::revoke_statement::execute(distributed<service::storage_proxy>&
|
||||
}).handle_exception_type([](const auth::nonexistant_role& e) {
|
||||
return make_exception_future<::shared_ptr<cql_transport::messages::result_message>>(
|
||||
exceptions::invalid_request_exception(e.what()));
|
||||
}).handle_exception_type([](const auth::unsupported_authorization_operation& e) {
|
||||
return make_exception_future<::shared_ptr<cql_transport::messages::result_message>>(
|
||||
exceptions::invalid_request_exception(e.what()));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -93,6 +93,17 @@ void validate_cluster_support() {
|
||||
// `create_role_statement`
|
||||
//
|
||||
|
||||
future<> create_role_statement::grant_permissions_to_creator(const service::client_state& cs) const {
|
||||
return do_with(auth::make_role_resource(_role), [&cs](const auth::resource& r) {
|
||||
return auth::grant_applicable_permissions(
|
||||
*cs.get_auth_service(),
|
||||
*cs.user(),
|
||||
r).handle_exception_type([](const auth::unsupported_authorization_operation&) {
|
||||
// Nothing.
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void create_role_statement::validate(distributed<service::storage_proxy>&, const service::client_state&) {
|
||||
validate_cluster_support();
|
||||
}
|
||||
@@ -123,9 +134,12 @@ create_role_statement::execute(distributed<service::storage_proxy>&,
|
||||
std::move(config),
|
||||
extract_authentication_options(_options),
|
||||
[this, &state](const auth::role_config& config, const auth::authentication_options& authen_options) {
|
||||
auto& as = *state.get_client_state().get_auth_service();
|
||||
const auto& cs = state.get_client_state();
|
||||
auto& as = *cs.get_auth_service();
|
||||
|
||||
return auth::create_role(as, _role, config, authen_options).then([] {
|
||||
return auth::create_role(as, _role, config, authen_options).then([this, &cs] {
|
||||
return grant_permissions_to_creator(cs);
|
||||
}).then([] {
|
||||
return void_result_message();
|
||||
}).handle_exception_type([this](const auth::role_already_exists& e) {
|
||||
if (!_if_not_exists) {
|
||||
@@ -300,8 +314,6 @@ future<> list_roles_statement::check_access(const service::client_state& state)
|
||||
|
||||
future<result_message_ptr>
|
||||
list_roles_statement::execute(distributed<service::storage_proxy>&, service::query_state& state, const query_options&) {
|
||||
unimplemented::warn(unimplemented::cause::ROLES);
|
||||
|
||||
static const sstring virtual_table_name("roles");
|
||||
|
||||
static const auto make_column_spec = [](const sstring& name, const ::shared_ptr<const abstract_type>& ty) {
|
||||
@@ -312,14 +324,19 @@ list_roles_statement::execute(distributed<service::storage_proxy>&, service::que
|
||||
ty);
|
||||
};
|
||||
|
||||
static const thread_local auto custom_options_type = map_type_impl::get_instance(utf8_type, utf8_type, true);
|
||||
|
||||
static const thread_local auto metadata = ::make_shared<cql3::metadata>(
|
||||
std::vector<::shared_ptr<column_specification>>{
|
||||
make_column_spec("role", utf8_type),
|
||||
make_column_spec("super", boolean_type),
|
||||
make_column_spec("login", boolean_type)});
|
||||
make_column_spec("login", boolean_type),
|
||||
make_column_spec("options", custom_options_type)});
|
||||
|
||||
static const auto make_results = [](auth::role_manager& rm, auth::role_set&& roles)
|
||||
-> future<result_message_ptr> {
|
||||
static const auto make_results = [](
|
||||
const auth::role_manager& rm,
|
||||
const auth::authenticator& a,
|
||||
auth::role_set&& roles) -> future<result_message_ptr> {
|
||||
auto results = std::make_unique<result_set>(metadata);
|
||||
|
||||
if (roles.empty()) {
|
||||
@@ -333,14 +350,26 @@ list_roles_statement::execute(distributed<service::storage_proxy>&, service::que
|
||||
return do_with(
|
||||
std::move(sorted_roles),
|
||||
std::move(results),
|
||||
[&rm](const std::vector<sstring>& sorted_roles, std::unique_ptr<result_set>& results) {
|
||||
return do_for_each(sorted_roles, [&results, &rm](const sstring& role) {
|
||||
[&rm, &a](const std::vector<sstring>& sorted_roles, std::unique_ptr<result_set>& results) {
|
||||
return do_for_each(sorted_roles, [&results, &rm, &a](const sstring& role) {
|
||||
return when_all_succeed(
|
||||
rm.can_login(role),
|
||||
rm.is_superuser(role)).then([&results, &role](bool login, bool super) {
|
||||
rm.is_superuser(role),
|
||||
a.query_custom_options(role)).then([&results, &role](
|
||||
bool login,
|
||||
bool super,
|
||||
auth::custom_options os) {
|
||||
results->add_column_value(utf8_type->decompose(role));
|
||||
results->add_column_value(boolean_type->decompose(super));
|
||||
results->add_column_value(boolean_type->decompose(login));
|
||||
|
||||
results->add_column_value(
|
||||
custom_options_type->decompose(
|
||||
make_map_value(
|
||||
custom_options_type,
|
||||
map_type_impl::native_type(
|
||||
std::make_move_iterator(os.begin()),
|
||||
std::make_move_iterator(os.end())))));
|
||||
});
|
||||
}).then([&results] {
|
||||
return make_ready_future<result_message_ptr>(::make_shared<result_message::rows>(std::move(results)));
|
||||
@@ -348,12 +377,13 @@ list_roles_statement::execute(distributed<service::storage_proxy>&, service::que
|
||||
});
|
||||
};
|
||||
|
||||
auto& cs = state.get_client_state();
|
||||
auto& as = *cs.get_auth_service();
|
||||
const auto& cs = state.get_client_state();
|
||||
const auto& as = *cs.get_auth_service();
|
||||
const auto user = cs.user();
|
||||
|
||||
return auth::has_superuser(as, *user).then([this, &state, &cs, &as, user](bool super) {
|
||||
auto& rm = as.underlying_role_manager();
|
||||
const auto& rm = as.underlying_role_manager();
|
||||
const auto& a = as.underlying_authenticator();
|
||||
const auto query_mode = _recursive ? auth::recursive_role_query::yes : auth::recursive_role_query::no;
|
||||
|
||||
if (!_grantee) {
|
||||
@@ -361,19 +391,21 @@ list_roles_statement::execute(distributed<service::storage_proxy>&, service::que
|
||||
// only the roles granted to them.
|
||||
return cs.check_has_permission(
|
||||
auth::permission::DESCRIBE,
|
||||
auth::root_role_resource()).then([&cs, &rm, user, query_mode](bool has_describe) {
|
||||
auth::root_role_resource()).then([&cs, &rm, &a, user, query_mode](bool has_describe) {
|
||||
if (has_describe) {
|
||||
return rm.query_all().then([&rm](auto&& roles) { return make_results(rm, std::move(roles)); });
|
||||
return rm.query_all().then([&rm, &a](auto&& roles) {
|
||||
return make_results(rm, a, std::move(roles));
|
||||
});
|
||||
}
|
||||
|
||||
return rm.query_granted(*user->name, query_mode).then([&rm](auth::role_set roles) {
|
||||
return make_results(rm, std::move(roles));
|
||||
return rm.query_granted(*user->name, query_mode).then([&rm, &a](auth::role_set roles) {
|
||||
return make_results(rm, a, std::move(roles));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
return rm.query_granted(*_grantee, query_mode).then([&rm](auth::role_set roles) {
|
||||
return make_results(rm, std::move(roles));
|
||||
return rm.query_granted(*_grantee, query_mode).then([&rm, &a](auth::role_set roles) {
|
||||
return make_results(rm, a, std::move(roles));
|
||||
});
|
||||
}).handle_exception_type([](const auth::nonexistant_role& e) {
|
||||
return make_exception_future<result_message_ptr>(exceptions::invalid_request_exception(e.what()));
|
||||
@@ -394,8 +426,6 @@ future<> grant_role_statement::check_access(const service::client_state& state)
|
||||
|
||||
future<result_message_ptr>
|
||||
grant_role_statement::execute(distributed<service::storage_proxy>&, service::query_state& state, const query_options&) {
|
||||
unimplemented::warn(unimplemented::cause::ROLES);
|
||||
|
||||
auto& as = *state.get_client_state().get_auth_service();
|
||||
|
||||
return as.underlying_role_manager().grant(_grantee, _role).then([] {
|
||||
@@ -421,8 +451,6 @@ future<result_message_ptr> revoke_role_statement::execute(
|
||||
distributed<service::storage_proxy>&,
|
||||
service::query_state& state,
|
||||
const query_options&) {
|
||||
unimplemented::warn(unimplemented::cause::ROLES);
|
||||
|
||||
auto& rm = state.get_client_state().get_auth_service()->underlying_role_manager();
|
||||
|
||||
return rm.revoke(_revokee, _role).then([] {
|
||||
|
||||
@@ -59,6 +59,10 @@ schema_altering_statement::schema_altering_statement(::shared_ptr<cf_name> name)
|
||||
{
|
||||
}
|
||||
|
||||
future<> schema_altering_statement::grant_permissions_to_creator(const service::client_state&) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
bool schema_altering_statement::uses_function(const sstring& ks_name, const sstring& function_name) const
|
||||
{
|
||||
return cf_statement::uses_function(ks_name, function_name);
|
||||
@@ -103,7 +107,11 @@ schema_altering_statement::execute0(distributed<service::storage_proxy>& proxy,
|
||||
|
||||
future<::shared_ptr<messages::result_message>>
|
||||
schema_altering_statement::execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) {
|
||||
return execute0(proxy, state, options, false);
|
||||
return execute0(proxy, state, options, false).then([this, &state](::shared_ptr<messages::result_message> result) {
|
||||
return grant_permissions_to_creator(state.get_client_state()).then([result = std::move(result)] {
|
||||
return result;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<::shared_ptr<messages::result_message>>
|
||||
|
||||
@@ -71,6 +71,14 @@ protected:
|
||||
|
||||
schema_altering_statement(::shared_ptr<cf_name> name);
|
||||
|
||||
/**
|
||||
* When a new database object (keyspace, table) is created, the creator needs to be granted all applicable
|
||||
* permissions on it.
|
||||
*
|
||||
* By default, this function does nothing.
|
||||
*/
|
||||
virtual future<> grant_permissions_to_creator(const service::client_state&);
|
||||
|
||||
virtual bool uses_function(const sstring& ks_name, const sstring& function_name) const override;
|
||||
|
||||
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
|
||||
|
||||
@@ -276,13 +276,14 @@ select_statement::do_execute(distributed<service::storage_proxy>& proxy,
|
||||
return do_with(
|
||||
cql3::selection::result_set_builder(*_selection, now,
|
||||
options.get_cql_serialization_format()),
|
||||
[p, page_size, now](auto& builder) {
|
||||
[this, p, page_size, now](auto& builder) {
|
||||
return do_until([p] {return p->is_exhausted();},
|
||||
[p, &builder, page_size, now] {
|
||||
return p->fetch_page(builder, page_size, now);
|
||||
}
|
||||
).then([&builder] {
|
||||
).then([this, &builder] {
|
||||
auto rs = builder.build();
|
||||
update_stats_rows_read(rs->size());
|
||||
auto msg = ::make_shared<cql_transport::messages::result_message::rows>(std::move(rs));
|
||||
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(std::move(msg));
|
||||
});
|
||||
@@ -302,6 +303,7 @@ select_statement::do_execute(distributed<service::storage_proxy>& proxy,
|
||||
rs->get_metadata().set_has_more_pages(p->state());
|
||||
}
|
||||
|
||||
update_stats_rows_read(rs->size());
|
||||
auto msg = ::make_shared<cql_transport::messages::result_message::rows>(std::move(rs));
|
||||
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(std::move(msg));
|
||||
});
|
||||
@@ -320,10 +322,10 @@ select_statement::execute(distributed<service::storage_proxy>& proxy,
|
||||
// is no way to tell which of these rows belong to the query result before
|
||||
// doing post-query ordering.
|
||||
if (needs_post_query_ordering() && _limit) {
|
||||
return do_with(std::forward<dht::partition_range_vector>(partition_ranges), [this, &proxy, &state, &options, cmd](auto prs) {
|
||||
return do_with(std::forward<dht::partition_range_vector>(partition_ranges), [this, &proxy, &state, &options, cmd](auto& prs) {
|
||||
assert(cmd->partition_limit == query::max_partitions);
|
||||
query::result_merger merger(cmd->row_limit * prs.size(), query::max_partitions);
|
||||
return map_reduce(prs.begin(), prs.end(), [this, &proxy, &state, &options, cmd] (auto pr) {
|
||||
return map_reduce(prs.begin(), prs.end(), [this, &proxy, &state, &options, cmd] (auto& pr) {
|
||||
dht::partition_range_vector prange { pr };
|
||||
auto command = ::make_lw_shared<query::read_command>(*cmd);
|
||||
return proxy.local().query(_schema,
|
||||
@@ -345,6 +347,54 @@ select_statement::execute(distributed<service::storage_proxy>& proxy,
|
||||
}
|
||||
}
|
||||
|
||||
// Function for fetching the selected columns from a list of clustering rows.
|
||||
// It is currently used only in our Secondary Index implementation - ordinary
|
||||
// CQL SELECT statements do not have the syntax to request a list of rows.
|
||||
// FIXME: The current implementation is very inefficient - it requests each
|
||||
// row separately (and all in parallel). Even multiple rows from a single
|
||||
// partition are requested separately. This last case can be easily improved,
|
||||
// but to implement the general case (multiple rows from multiple partitions)
|
||||
// efficiently, we will need more support from other layers.
|
||||
// Note that currently we do not make any assumptions on the order of the keys
|
||||
// given to this function, for more efficient implementation with a large
|
||||
// list, we should probably require that the keys be ordered in token order
|
||||
// (see also issue #3423).
|
||||
future<shared_ptr<cql_transport::messages::result_message>>
|
||||
select_statement::execute(distributed<service::storage_proxy>& proxy,
|
||||
lw_shared_ptr<query::read_command> cmd,
|
||||
std::vector<primary_key>&& primary_keys,
|
||||
service::query_state& state,
|
||||
const query_options& options,
|
||||
gc_clock::time_point now)
|
||||
{
|
||||
return do_with(std::move(primary_keys), [this, &proxy, &state, &options, cmd] (auto& keys) {
|
||||
assert(cmd->partition_limit == query::max_partitions);
|
||||
query::result_merger merger(cmd->row_limit, query::max_partitions);
|
||||
// there is no point to produce rows beyond the first row_limit:
|
||||
auto end = keys.size() <= cmd->row_limit ? keys.end() : keys.begin() + cmd->row_limit;
|
||||
return map_reduce(keys.begin(), end, [this, &proxy, &state, &options, cmd] (auto& key) {
|
||||
auto command = ::make_lw_shared<query::read_command>(*cmd);
|
||||
// for each partition, read just one clustering row (TODO: can
|
||||
// get all needed rows of one partition at once.)
|
||||
command->slice._row_ranges.clear();
|
||||
if (key.clustering) {
|
||||
command->slice._row_ranges.push_back(query::clustering_range::make_singular(key.clustering));
|
||||
}
|
||||
return proxy.local().query(_schema,
|
||||
command,
|
||||
{dht::partition_range::make_singular(key.partition)},
|
||||
options.get_consistency(),
|
||||
state.get_trace_state()).then([] (foreign_ptr<lw_shared_ptr<query::result>>&& result, service::replicas_per_token_range) {
|
||||
return std::move(result);
|
||||
});
|
||||
}, std::move(merger));
|
||||
}).then([this, &options, now, cmd] (auto result) {
|
||||
// note that cmd here still has the garbage clustering range in slice,
|
||||
// but process_results() ignores this part of the slice setting.
|
||||
return this->process_results(std::move(result), cmd, options, now);
|
||||
});
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
select_statement::execute_internal(distributed<service::storage_proxy>& proxy,
|
||||
service::query_state& state,
|
||||
@@ -365,10 +415,10 @@ select_statement::execute_internal(distributed<service::storage_proxy>& proxy,
|
||||
++_stats.reads;
|
||||
|
||||
if (needs_post_query_ordering() && _limit) {
|
||||
return do_with(std::move(partition_ranges), [this, &proxy, &state, command] (auto prs) {
|
||||
return do_with(std::move(partition_ranges), [this, &proxy, &state, command] (auto& prs) {
|
||||
assert(command->partition_limit == query::max_partitions);
|
||||
query::result_merger merger(command->row_limit * prs.size(), query::max_partitions);
|
||||
return map_reduce(prs.begin(), prs.end(), [this, &proxy, &state, command] (auto pr) {
|
||||
return map_reduce(prs.begin(), prs.end(), [this, &proxy, &state, command] (auto& pr) {
|
||||
dht::partition_range_vector prange { pr };
|
||||
auto cmd = ::make_lw_shared<query::read_command>(*command);
|
||||
return proxy.local().query(_schema, cmd, std::move(prange), db::consistency_level::ONE, state.get_trace_state(),
|
||||
@@ -407,6 +457,7 @@ select_statement::process_results(foreign_ptr<lw_shared_ptr<query::result>> resu
|
||||
}
|
||||
rs->trim(cmd->row_limit);
|
||||
}
|
||||
update_stats_rows_read(rs->size());
|
||||
return ::make_shared<cql_transport::messages::result_message::rows>(std::move(rs));
|
||||
}
|
||||
|
||||
@@ -498,10 +549,47 @@ indexed_table_select_statement::do_execute(distributed<service::storage_proxy>&
|
||||
auto now = gc_clock::now();
|
||||
|
||||
++_stats.reads;
|
||||
++_stats.secondary_index_reads;
|
||||
|
||||
assert(_restrictions->uses_secondary_indexing());
|
||||
return find_index_partition_ranges(proxy, state, options).then([limit, now, &state, &options, &proxy, this] (dht::partition_range_vector partition_ranges) {
|
||||
auto command = ::make_lw_shared<query::read_command>(
|
||||
|
||||
// Secondary index search has two steps: 1. use the index table to find a
|
||||
// list of primary keys matching the query. 2. read the rows matching
|
||||
// these primary keys from the base table and return the selected columns.
|
||||
// In "whole_partitions" case, we can do the above in whole partition
|
||||
// granularity. "partition_slices" is similar, but we fetch the same
|
||||
// clustering prefix (make_partition_slice()) from a list of partitions.
|
||||
// In other cases we need to list, and retrieve, individual rows and
|
||||
// not entire partitions. See issue #3405 for more details.
|
||||
bool whole_partitions = false;
|
||||
bool partition_slices = false;
|
||||
if (_schema->clustering_key_size() == 0) {
|
||||
// Obviously, if there are no clustering columns, then we can work at
|
||||
// the granularity of whole partitions.
|
||||
whole_partitions = true;
|
||||
} else {
|
||||
if (_index.depends_on(*(_schema->clustering_key_columns().begin()))) {
|
||||
// Searching on the *first* clustering column means in each of
|
||||
// matching partition, we can take the same contiguous clustering
|
||||
// slice (clustering prefix).
|
||||
partition_slices = true;
|
||||
} else {
|
||||
// Search on any partition column means that either all rows
|
||||
// match or all don't, so we can work with whole partitions.
|
||||
for (auto& cdef : _schema->partition_key_columns()) {
|
||||
if (_index.depends_on(cdef)) {
|
||||
whole_partitions = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (whole_partitions || partition_slices) {
|
||||
// In this case, can use our normal query machinery, which retrieves
|
||||
// entire partitions or the same slice for many partitions.
|
||||
return find_index_partition_ranges(proxy, state, options).then([limit, now, &state, &options, &proxy, this] (dht::partition_range_vector partition_ranges) {
|
||||
auto command = ::make_lw_shared<query::read_command>(
|
||||
_schema->id(),
|
||||
_schema->version(),
|
||||
make_partition_slice(options),
|
||||
@@ -511,35 +599,70 @@ indexed_table_select_statement::do_execute(distributed<service::storage_proxy>&
|
||||
query::max_partitions,
|
||||
utils::UUID(),
|
||||
options.get_timestamp(state));
|
||||
return this->execute(proxy, command, std::move(partition_ranges), state, options, now);
|
||||
});
|
||||
return this->execute(proxy, command, std::move(partition_ranges), state, options, now);
|
||||
});
|
||||
} else {
|
||||
// In this case, we need to retrieve a list of rows (not entire
|
||||
// partitions) and then retrieve those specific rows.
|
||||
return find_index_clustering_rows(proxy, state, options).then([limit, now, &state, &options, &proxy, this] (std::vector<primary_key> primary_keys) {
|
||||
auto command = ::make_lw_shared<query::read_command>(
|
||||
_schema->id(),
|
||||
_schema->version(),
|
||||
// Note: the "clustering bounds" set in make_partition_slice()
|
||||
// here is garbage, and will be overridden by execute() anyway
|
||||
make_partition_slice(options),
|
||||
limit,
|
||||
now,
|
||||
tracing::make_trace_info(state.get_trace_state()),
|
||||
query::max_partitions,
|
||||
utils::UUID(),
|
||||
options.get_timestamp(state));
|
||||
return this->execute(proxy, command, std::move(primary_keys), state, options, now);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
future<dht::partition_range_vector>
|
||||
indexed_table_select_statement::find_index_partition_ranges(distributed<service::storage_proxy>& proxy,
|
||||
service::query_state& state,
|
||||
const query_options& options)
|
||||
// Utility function for getting the schema of the materialized view used for
|
||||
// the secondary index implementation.
|
||||
static schema_ptr
|
||||
get_index_schema(distributed<service::storage_proxy>& proxy,
|
||||
const secondary_index::index& index,
|
||||
const schema_ptr& schema,
|
||||
tracing::trace_state_ptr& trace_state)
|
||||
{
|
||||
const auto& im = index.metadata();
|
||||
sstring index_table_name = im.name() + "_index";
|
||||
tracing::add_table_name(trace_state, schema->ks_name(), index_table_name);
|
||||
return proxy.local().get_db().local().find_schema(schema->ks_name(), index_table_name);
|
||||
}
|
||||
|
||||
// Utility function for reading from the index view (get_index_view()))
|
||||
// the posting-list for a particular value of the indexed column.
|
||||
// Remember a secondary index can only be created on a single column.
|
||||
//static future<service::storage_proxy::coordinator_query_result>
|
||||
static future<foreign_ptr<lw_shared_ptr<query::result>>, std::unordered_map<nonwrapping_range<dht::token>, std::vector<utils::UUID>>>
|
||||
read_posting_list(distributed<service::storage_proxy>& proxy,
|
||||
schema_ptr view_schema,
|
||||
const std::vector<::shared_ptr<restrictions::restrictions>>& index_restrictions,
|
||||
const query_options& options,
|
||||
int32_t limit,
|
||||
service::query_state& state,
|
||||
gc_clock::time_point now)
|
||||
{
|
||||
const auto& im = _index.metadata();
|
||||
sstring index_table_name = sprint("%s_index", im.name());
|
||||
tracing::add_table_name(state.get_trace_state(), keyspace(), index_table_name);
|
||||
auto& db = proxy.local().get_db().local();
|
||||
const auto& view = db.find_column_family(_schema->ks_name(), index_table_name);
|
||||
dht::partition_range_vector partition_ranges;
|
||||
for (const auto& restriction : _restrictions->index_restrictions()) {
|
||||
auto pk = partition_key::from_optional_exploded(*view.schema(), restriction->values(options));
|
||||
auto dk = dht::global_partitioner().decorate_key(*view.schema(), pk);
|
||||
// FIXME: there should be only one index restriction for this index!
|
||||
// Perhaps even one index restriction entirely (do we support
|
||||
// intersection queries?).
|
||||
for (const auto& restriction : index_restrictions) {
|
||||
auto pk = partition_key::from_optional_exploded(*view_schema, restriction->values(options));
|
||||
auto dk = dht::global_partitioner().decorate_key(*view_schema, pk);
|
||||
auto range = dht::partition_range::make_singular(dk);
|
||||
partition_ranges.emplace_back(range);
|
||||
}
|
||||
|
||||
auto now = gc_clock::now();
|
||||
int32_t limit = get_limit(options);
|
||||
|
||||
partition_slice_builder partition_slice_builder{*view.schema()};
|
||||
partition_slice_builder partition_slice_builder{*view_schema};
|
||||
auto cmd = ::make_lw_shared<query::read_command>(
|
||||
view.schema()->id(),
|
||||
view.schema()->version(),
|
||||
view_schema->id(),
|
||||
view_schema->version(),
|
||||
partition_slice_builder.build(),
|
||||
limit,
|
||||
now,
|
||||
@@ -547,35 +670,111 @@ indexed_table_select_statement::find_index_partition_ranges(distributed<service:
|
||||
query::max_partitions,
|
||||
utils::UUID(),
|
||||
options.get_timestamp(state));
|
||||
return proxy.local().query(view.schema(),
|
||||
return proxy.local().query(view_schema,
|
||||
cmd,
|
||||
std::move(partition_ranges),
|
||||
options.get_consistency(),
|
||||
state.get_trace_state()).then([cmd, this, &options, now, &view] (foreign_ptr<lw_shared_ptr<query::result>> result,
|
||||
service::replicas_per_token_range) {
|
||||
state.get_trace_state());
|
||||
}
|
||||
|
||||
// Note: the partitions keys returned by this function will be sorted in
|
||||
// lexicographical order of the partition key columns (in the way that
|
||||
// clustering keys are sorted) - NOT in token order. See issue #3423.
|
||||
future<dht::partition_range_vector>
|
||||
indexed_table_select_statement::find_index_partition_ranges(distributed<service::storage_proxy>& proxy,
|
||||
service::query_state& state,
|
||||
const query_options& options)
|
||||
{
|
||||
schema_ptr view = get_index_schema(proxy, _index, _schema, state.get_trace_state());
|
||||
auto now = gc_clock::now();
|
||||
return read_posting_list(proxy, view, _restrictions->index_restrictions(), options, get_limit(options), state, now).then(
|
||||
[this, now, &options, view] (foreign_ptr<lw_shared_ptr<query::result>> result, service::replicas_per_token_range) {
|
||||
std::vector<const column_definition*> columns;
|
||||
for (const column_definition& cdef : _schema->partition_key_columns()) {
|
||||
columns.emplace_back(view.schema()->get_column_definition(cdef.name()));
|
||||
columns.emplace_back(view->get_column_definition(cdef.name()));
|
||||
}
|
||||
auto selection = selection::selection::for_columns(view.schema(), columns);
|
||||
auto selection = selection::selection::for_columns(view, columns);
|
||||
cql3::selection::result_set_builder builder(*selection, now, options.get_cql_serialization_format());
|
||||
// FIXME: read_posting_list already asks to read primary keys only.
|
||||
// why do we need to specify this again?
|
||||
auto slice = partition_slice_builder(*view).build();
|
||||
query::result_view::consume(*result,
|
||||
cmd->slice,
|
||||
cql3::selection::result_set_builder::visitor(builder, *view.schema(), *selection));
|
||||
slice,
|
||||
cql3::selection::result_set_builder::visitor(builder, *view, *selection));
|
||||
auto rs = cql3::untyped_result_set(::make_shared<cql_transport::messages::result_message::rows>(std::move(builder.build())));
|
||||
dht::partition_range_vector partition_ranges;
|
||||
partition_ranges.reserve(rs.size());
|
||||
// We are reading the list of primary keys as rows of a single
|
||||
// partition (in the index view), so they are sorted in
|
||||
// lexicographical order (N.B. this is NOT token order!). We need
|
||||
// to avoid outputting the same partition key twice, but luckily in
|
||||
// the sorted order, these will be adjacent.
|
||||
stdx::optional<dht::decorated_key> last_dk;
|
||||
for (size_t i = 0; i < rs.size(); i++) {
|
||||
const auto& row = rs.at(i);
|
||||
std::vector<bytes> pk_columns;
|
||||
for (const auto& column : row.get_columns()) {
|
||||
auto blob = row.get_blob(column->name->to_cql_string());
|
||||
auto pk = partition_key::from_exploded(*_schema, { blob });
|
||||
auto dk = dht::global_partitioner().decorate_key(*_schema, pk);
|
||||
auto range = dht::partition_range::make_singular(dk);
|
||||
partition_ranges.emplace_back(range);
|
||||
pk_columns.push_back(row.get_blob(column->name->to_string()));
|
||||
}
|
||||
auto pk = partition_key::from_exploded(*_schema, pk_columns);
|
||||
auto dk = dht::global_partitioner().decorate_key(*_schema, pk);
|
||||
if (last_dk && last_dk->equal(*_schema, dk)) {
|
||||
// Another row of the same partition, no need to output the
|
||||
// same partition key again.
|
||||
continue;
|
||||
}
|
||||
last_dk = dk;
|
||||
auto range = dht::partition_range::make_singular(dk);
|
||||
partition_ranges.emplace_back(range);
|
||||
}
|
||||
return make_ready_future<dht::partition_range_vector>(partition_ranges);
|
||||
}).finally([cmd] {});
|
||||
return partition_ranges;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
// Note: the partitions keys returned by this function will be sorted in
|
||||
// lexicographical order of the partition key columns (in the way that
|
||||
// clustering keys are sorted) - NOT in token order. See issue #3423.
|
||||
future<std::vector<indexed_table_select_statement::primary_key>>
|
||||
indexed_table_select_statement::find_index_clustering_rows(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options)
|
||||
{
|
||||
schema_ptr view = get_index_schema(proxy, _index, _schema, state.get_trace_state());
|
||||
auto now = gc_clock::now();
|
||||
return read_posting_list(proxy, view, _restrictions->index_restrictions(), options, get_limit(options), state, now).then(
|
||||
[this, now, &options, view] (foreign_ptr<lw_shared_ptr<query::result>> result, service::replicas_per_token_range) {
|
||||
std::vector<const column_definition*> columns;
|
||||
for (const column_definition& cdef : _schema->partition_key_columns()) {
|
||||
columns.emplace_back(view->get_column_definition(cdef.name()));
|
||||
}
|
||||
for (const column_definition& cdef : _schema->clustering_key_columns()) {
|
||||
columns.emplace_back(view->get_column_definition(cdef.name()));
|
||||
}
|
||||
auto selection = selection::selection::for_columns(view, columns);
|
||||
cql3::selection::result_set_builder builder(*selection, now, options.get_cql_serialization_format());
|
||||
// FIXME: read_posting_list already asks to read primary keys only.
|
||||
// why do we need to specify this again?
|
||||
auto slice = partition_slice_builder(*view).build();
|
||||
query::result_view::consume(*result,
|
||||
slice,
|
||||
cql3::selection::result_set_builder::visitor(builder, *view, *selection));
|
||||
auto rs = cql3::untyped_result_set(::make_shared<cql_transport::messages::result_message::rows>(std::move(builder.build())));
|
||||
std::vector<primary_key> primary_keys;
|
||||
primary_keys.reserve(rs.size());
|
||||
for (size_t i = 0; i < rs.size(); i++) {
|
||||
const auto& row = rs.at(i);
|
||||
auto pk_columns = _schema->partition_key_columns() | boost::adaptors::transformed([&] (auto& cdef) {
|
||||
return row.get_blob(cdef.name_as_text());
|
||||
});
|
||||
auto pk = partition_key::from_range(pk_columns);
|
||||
auto dk = dht::global_partitioner().decorate_key(*_schema, pk);
|
||||
auto ck_columns = _schema->clustering_key_columns() | boost::adaptors::transformed([&] (auto& cdef) {
|
||||
return row.get_blob(cdef.name_as_text());
|
||||
});
|
||||
auto ck = clustering_key::from_range(ck_columns);
|
||||
primary_keys.emplace_back(primary_key{std::move(dk), std::move(ck)});
|
||||
}
|
||||
return primary_keys;
|
||||
});
|
||||
}
|
||||
|
||||
namespace raw {
|
||||
@@ -657,8 +856,10 @@ select_statement::prepare_restrictions(database& db,
|
||||
bool for_view)
|
||||
{
|
||||
try {
|
||||
// FIXME: this method should take a separate allow_filtering parameter
|
||||
// and pass it on. Currently we pass "for_view" as allow_filtering.
|
||||
return ::make_shared<restrictions::statement_restrictions>(db, schema, statement_type::SELECT, std::move(_where_clause), bound_names,
|
||||
selection->contains_only_static_columns(), selection->contains_a_collection(), for_view);
|
||||
selection->contains_only_static_columns(), selection->contains_a_collection(), for_view, for_view);
|
||||
} catch (const exceptions::unrecognized_entity_exception& e) {
|
||||
if (contains_alias(e.entity)) {
|
||||
throw exceptions::invalid_request_exception(sprint("Aliases aren't allowed in the where clause ('%s')", e.relation->to_string()));
|
||||
@@ -859,15 +1060,22 @@ namespace util {
|
||||
shared_ptr<cql3::statements::raw::select_statement> build_select_statement(
|
||||
const sstring_view& cf_name,
|
||||
const sstring_view& where_clause,
|
||||
std::vector<sstring_view> included_columns) {
|
||||
bool select_all_columns,
|
||||
const std::vector<column_definition>& selected_columns) {
|
||||
std::ostringstream out;
|
||||
out << "SELECT ";
|
||||
if (included_columns.empty()) {
|
||||
if (select_all_columns) {
|
||||
out << "*";
|
||||
} else {
|
||||
out << join(", ", included_columns);
|
||||
// If the column name is not entirely lowercase (or digits or _),
|
||||
// when output to CQL it must be quoted to preserve case as well
|
||||
// as non alphanumeric characters.
|
||||
auto cols = boost::copy_range<std::vector<sstring>>(selected_columns
|
||||
| boost::adaptors::transformed(std::mem_fn(&column_definition::name_as_cql_string)));
|
||||
out << join(", ", cols);
|
||||
}
|
||||
out << " FROM " << cf_name << " WHERE " << where_clause << " ALLOW FILTERING";
|
||||
// Note that cf_name may need to be quoted, just like column names above.
|
||||
out << " FROM " << util::maybe_quote(cf_name.to_string()) << " WHERE " << where_clause << " ALLOW FILTERING";
|
||||
return do_with_parser(out.str(), std::mem_fn(&cql3_parser::CqlParser::selectStatement));
|
||||
}
|
||||
|
||||
|
||||
@@ -124,6 +124,19 @@ public:
|
||||
lw_shared_ptr<query::read_command> cmd, dht::partition_range_vector&& partition_ranges, service::query_state& state,
|
||||
const query_options& options, gc_clock::time_point now);
|
||||
|
||||
struct primary_key {
|
||||
dht::decorated_key partition;
|
||||
clustering_key_prefix clustering;
|
||||
};
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>> execute(
|
||||
distributed<service::storage_proxy>& proxy,
|
||||
lw_shared_ptr<query::read_command> cmd,
|
||||
std::vector<primary_key>&& primary_keys,
|
||||
service::query_state& state,
|
||||
const query_options& options,
|
||||
gc_clock::time_point now);
|
||||
|
||||
shared_ptr<cql_transport::messages::result_message> process_results(foreign_ptr<lw_shared_ptr<query::result>> results,
|
||||
lw_shared_ptr<query::read_command> cmd, const query_options& options, gc_clock::time_point now);
|
||||
|
||||
@@ -138,6 +151,9 @@ public:
|
||||
protected:
|
||||
int32_t get_limit(const query_options& options) const;
|
||||
bool needs_post_query_ordering() const;
|
||||
virtual void update_stats_rows_read(int64_t rows_read) {
|
||||
_stats.rows_read += rows_read;
|
||||
}
|
||||
};
|
||||
|
||||
class primary_key_select_statement : public select_statement {
|
||||
@@ -189,6 +205,15 @@ private:
|
||||
future<dht::partition_range_vector> find_index_partition_ranges(distributed<service::storage_proxy>& proxy,
|
||||
service::query_state& state,
|
||||
const query_options& options);
|
||||
|
||||
future<std::vector<primary_key>> find_index_clustering_rows(distributed<service::storage_proxy>& proxy,
|
||||
service::query_state& state,
|
||||
const query_options& options);
|
||||
|
||||
virtual void update_stats_rows_read(int64_t rows_read) override {
|
||||
_stats.rows_read += rows_read;
|
||||
_stats.secondary_index_rows_read += rows_read;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -35,6 +35,12 @@ struct cql_stats {
|
||||
uint64_t batches_pure_logged = 0;
|
||||
uint64_t batches_pure_unlogged = 0;
|
||||
uint64_t batches_unlogged_from_logged = 0;
|
||||
uint64_t rows_read = 0;
|
||||
|
||||
int64_t secondary_index_creates = 0;
|
||||
int64_t secondary_index_drops = 0;
|
||||
int64_t secondary_index_reads = 0;
|
||||
int64_t secondary_index_rows_read = 0;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -142,6 +142,7 @@ public:
|
||||
using row = untyped_result_set_row;
|
||||
typedef std::vector<row> rows_type;
|
||||
using const_iterator = rows_type::const_iterator;
|
||||
using iterator = rows_type::const_iterator;
|
||||
|
||||
untyped_result_set(::shared_ptr<cql_transport::messages::result_message>);
|
||||
untyped_result_set(untyped_result_set&&) = default;
|
||||
|
||||
14
cql3/util.hh
14
cql3/util.hh
@@ -72,11 +72,23 @@ inline sstring rename_column_in_where_clause(const sstring_view& where_clause, c
|
||||
return relations_to_where_clause(std::move(new_relations));
|
||||
}
|
||||
|
||||
/// build a CQL "select" statement with the desired parameters.
|
||||
/// If select_all_columns==true, all columns are selected and the value of
|
||||
/// selected_columns is ignored.
|
||||
shared_ptr<cql3::statements::raw::select_statement> build_select_statement(
|
||||
const sstring_view& cf_name,
|
||||
const sstring_view& where_clause,
|
||||
std::vector<sstring_view> included_columns);
|
||||
bool select_all_columns,
|
||||
const std::vector<column_definition>& selected_columns);
|
||||
|
||||
/// maybe_quote() takes an identifier - the name of a column, table or
|
||||
/// keyspace name - and transforms it to a string which can be used in CQL
|
||||
/// commands. Namely, if the identifier is not entirely lower-case (including
|
||||
/// digits and underscores), it needs to be quoted to be represented in CQL.
|
||||
/// Without this quoting, CQL folds uppercase letters to lower case, and
|
||||
/// forbids non-alpha-numeric characters in identifier names.
|
||||
/// Quoting involves wrapping the string in double-quotes ("). A double-quote
|
||||
/// character itself is quoted by doubling it.
|
||||
sstring maybe_quote(const sstring& s);
|
||||
|
||||
} // namespace util
|
||||
|
||||
246
database.cc
246
database.cc
@@ -1053,30 +1053,31 @@ column_family::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old, sstabl
|
||||
database_sstable_write_monitor monitor(std::move(permit), newtab, _compaction_manager, _compaction_strategy);
|
||||
return do_with(std::move(monitor), [this, old, newtab] (auto& monitor) {
|
||||
auto&& priority = service::get_local_memtable_flush_priority();
|
||||
return write_memtable_to_sstable(*old, newtab, monitor, incremental_backups_enabled(), priority, false).then([this, newtab, old, &monitor] {
|
||||
// Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush
|
||||
// controller. Cache update does not affect the input of the memtable cpu controller, so it can be subject to
|
||||
// priority inversion.
|
||||
return with_scheduling_group(default_scheduling_group(), [this, &monitor, old = std::move(old), newtab = std::move(newtab)] () mutable {
|
||||
return newtab->open_data().then([this, old, newtab] () {
|
||||
dblog.debug("Flushing to {} done", newtab->get_filename());
|
||||
return with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, old, newtab] {
|
||||
return update_cache(old, newtab);
|
||||
auto f = write_memtable_to_sstable(*old, newtab, monitor, incremental_backups_enabled(), priority, false);
|
||||
// Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush
|
||||
// controller. Cache update does not affect the input of the memtable cpu controller, so it can be subject to
|
||||
// priority inversion.
|
||||
return with_scheduling_group(default_scheduling_group(), [this, &monitor, old = std::move(old), newtab = std::move(newtab), f = std::move(f)] () mutable {
|
||||
return f.then([this, newtab, old, &monitor] {
|
||||
return newtab->open_data().then([this, old, newtab] () {
|
||||
dblog.debug("Flushing to {} done", newtab->get_filename());
|
||||
return with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, old, newtab] {
|
||||
return update_cache(old, newtab);
|
||||
});
|
||||
}).then([this, old, newtab] () noexcept {
|
||||
_memtables->erase(old);
|
||||
dblog.debug("Memtable for {} replaced", newtab->get_filename());
|
||||
return stop_iteration::yes;
|
||||
});
|
||||
}).handle_exception([this, old, newtab, &monitor] (auto e) {
|
||||
monitor.write_failed();
|
||||
newtab->mark_for_deletion();
|
||||
dblog.error("failed to write sstable {}: {}", newtab->get_filename(), e);
|
||||
// If we failed this write we will try the write again and that will create a new flush reader
|
||||
// that will decrease dirty memory again. So we need to reset the accounting.
|
||||
old->revert_flushed_memory();
|
||||
return stop_iteration(_async_gate.is_closed());
|
||||
});
|
||||
}).then([this, old, newtab] () noexcept {
|
||||
_memtables->erase(old);
|
||||
dblog.debug("Memtable for {} replaced", newtab->get_filename());
|
||||
return stop_iteration::yes;
|
||||
}).handle_exception([this, old, newtab, &monitor] (auto e) {
|
||||
monitor.write_failed();
|
||||
newtab->mark_for_deletion();
|
||||
dblog.error("failed to write sstable {}: {}", newtab->get_filename(), e);
|
||||
// If we failed this write we will try the write again and that will create a new flush reader
|
||||
// that will decrease dirty memory again. So we need to reset the accounting.
|
||||
old->revert_flushed_memory();
|
||||
return stop_iteration::no;
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1246,6 +1247,17 @@ void column_family::set_metrics() {
|
||||
ms::make_gauge("live_sstable", ms::description("Live sstable count"), _stats.live_sstable_count)(cf)(ks),
|
||||
ms::make_gauge("pending_compaction", ms::description("Estimated number of compactions pending for this column family"), _stats.pending_compactions)(cf)(ks)
|
||||
});
|
||||
|
||||
// View metrics are created only for base tables, so there's no point in adding them to views (which cannot act as base tables for other views)
|
||||
if (!_schema->is_view()) {
|
||||
_metrics.add_group("column_family", {
|
||||
ms::make_total_operations("view_updates_pushed_remote", _view_stats.view_updates_pushed_remote, ms::description("Number of updates (mutations) pushed to remote view replicas"))(cf)(ks),
|
||||
ms::make_total_operations("view_updates_failed_remote", _view_stats.view_updates_failed_remote, ms::description("Number of updates (mutations) that failed to be pushed to remote view replicas"))(cf)(ks),
|
||||
ms::make_total_operations("view_updates_pushed_local", _view_stats.view_updates_pushed_local, ms::description("Number of updates (mutations) pushed to local view replicas"))(cf)(ks),
|
||||
ms::make_total_operations("view_updates_failed_local", _view_stats.view_updates_failed_local, ms::description("Number of updates (mutations) that failed to be pushed to local view replicas"))(cf)(ks),
|
||||
});
|
||||
}
|
||||
|
||||
if (_schema->ks_name() != db::system_keyspace::NAME && _schema->ks_name() != db::schema_tables::v3::NAME && _schema->ks_name() != "system_traces") {
|
||||
_metrics.add_group("column_family", {
|
||||
ms::make_histogram("read_latency", ms::description("Read latency histogram"), [this] {return _stats.estimated_read.get_histogram(std::chrono::microseconds(100));})(cf)(ks),
|
||||
@@ -1614,7 +1626,7 @@ inline bool column_family::manifest_json_filter(const lister::path&, const direc
|
||||
|
||||
// TODO: possibly move it to seastar
|
||||
template <typename Service, typename PtrType, typename Func>
|
||||
static future<> invoke_shards_with_ptr(std::vector<shard_id> shards, distributed<Service>& s, PtrType ptr, Func&& func) {
|
||||
static future<> invoke_shards_with_ptr(std::unordered_set<shard_id> shards, distributed<Service>& s, PtrType ptr, Func&& func) {
|
||||
return parallel_for_each(std::move(shards), [&s, &func, ptr] (shard_id id) {
|
||||
return s.invoke_on(id, [func, foreign = make_foreign(ptr)] (Service& s) mutable {
|
||||
return func(s, std::move(foreign));
|
||||
@@ -1641,7 +1653,14 @@ future<> distributed_loader::open_sstable(distributed<database>& db, sstables::e
|
||||
return f.then([&db, comps = std::move(comps), func = std::move(func)] (sstables::sstable_open_info info) {
|
||||
// shared components loaded, now opening sstable in all shards that own it with shared components
|
||||
return do_with(std::move(info), [&db, comps = std::move(comps), func = std::move(func)] (auto& info) {
|
||||
return invoke_shards_with_ptr(info.owners, db, std::move(info.components),
|
||||
// All shards that own the sstable is interested in it in addition to shard that
|
||||
// is responsible for its generation. We may need to add manually this shard
|
||||
// because sstable may not contain data that belong to it.
|
||||
auto shards_interested_in_this_sstable = boost::copy_range<std::unordered_set<shard_id>>(info.owners);
|
||||
shard_id shard_responsible_for_generation = column_family::calculate_shard_from_sstable_generation(comps.generation);
|
||||
shards_interested_in_this_sstable.insert(shard_responsible_for_generation);
|
||||
|
||||
return invoke_shards_with_ptr(std::move(shards_interested_in_this_sstable), db, std::move(info.components),
|
||||
[owners = info.owners, data = info.data.dup(), index = info.index.dup(), comps, func] (database& db, auto components) {
|
||||
auto& cf = db.find_column_family(comps.ks, comps.cf);
|
||||
return func(cf, sstables::foreign_sstable_open_info{std::move(components), owners, data, index});
|
||||
@@ -1962,31 +1981,31 @@ future<> distributed_loader::populate_column_family(distributed<database>& db, s
|
||||
// case is still an invalid case, but it is way easier for us to treat it
|
||||
// by waiting for all files to be loaded, and then checking if we saw a
|
||||
// file during scan_dir, without its corresponding TOC.
|
||||
enum class status {
|
||||
enum class component_status {
|
||||
has_some_file,
|
||||
has_toc_file,
|
||||
has_temporary_toc_file,
|
||||
};
|
||||
|
||||
struct sstable_descriptor {
|
||||
std::experimental::optional<sstables::sstable::version_types> version;
|
||||
std::experimental::optional<sstables::sstable::format_types> format;
|
||||
component_status status;
|
||||
sstables::sstable::version_types version;
|
||||
sstables::sstable::format_types format;
|
||||
};
|
||||
|
||||
auto verifier = make_lw_shared<std::unordered_map<unsigned long, status>>();
|
||||
auto descriptor = make_lw_shared<sstable_descriptor>();
|
||||
auto verifier = make_lw_shared<std::unordered_map<unsigned long, sstable_descriptor>>();
|
||||
|
||||
return do_with(std::vector<future<>>(), [&db, sstdir = std::move(sstdir), verifier, descriptor, ks, cf] (std::vector<future<>>& futures) {
|
||||
return lister::scan_dir(sstdir, { directory_entry_type::regular }, [&db, verifier, descriptor, &futures] (lister::path sstdir, directory_entry de) {
|
||||
return do_with(std::vector<future<>>(), [&db, sstdir = std::move(sstdir), verifier, ks, cf] (std::vector<future<>>& futures) {
|
||||
return lister::scan_dir(sstdir, { directory_entry_type::regular }, [&db, verifier, &futures] (lister::path sstdir, directory_entry de) {
|
||||
// FIXME: The secondary indexes are in this level, but with a directory type, (starting with ".")
|
||||
auto f = distributed_loader::probe_file(db, sstdir.native(), de.name).then([verifier, descriptor, sstdir, de] (auto entry) {
|
||||
auto f = distributed_loader::probe_file(db, sstdir.native(), de.name).then([verifier, sstdir, de] (auto entry) {
|
||||
if (entry.component == sstables::sstable::component_type::TemporaryStatistics) {
|
||||
return remove_file(sstables::sstable::filename(sstdir.native(), entry.ks, entry.cf, entry.version, entry.generation,
|
||||
entry.format, sstables::sstable::component_type::TemporaryStatistics));
|
||||
}
|
||||
|
||||
if (verifier->count(entry.generation)) {
|
||||
if (verifier->at(entry.generation) == status::has_toc_file) {
|
||||
if (verifier->at(entry.generation).status == component_status::has_toc_file) {
|
||||
lister::path file_path(sstdir / de.name.c_str());
|
||||
if (entry.component == sstables::sstable::component_type::TOC) {
|
||||
throw sstables::malformed_sstable_exception("Invalid State encountered. TOC file already processed", file_path.native());
|
||||
@@ -1994,27 +2013,19 @@ future<> distributed_loader::populate_column_family(distributed<database>& db, s
|
||||
throw sstables::malformed_sstable_exception("Invalid State encountered. Temporary TOC file found after TOC file was processed", file_path.native());
|
||||
}
|
||||
} else if (entry.component == sstables::sstable::component_type::TOC) {
|
||||
verifier->at(entry.generation) = status::has_toc_file;
|
||||
verifier->at(entry.generation).status = component_status::has_toc_file;
|
||||
} else if (entry.component == sstables::sstable::component_type::TemporaryTOC) {
|
||||
verifier->at(entry.generation) = status::has_temporary_toc_file;
|
||||
verifier->at(entry.generation).status = component_status::has_temporary_toc_file;
|
||||
}
|
||||
} else {
|
||||
if (entry.component == sstables::sstable::component_type::TOC) {
|
||||
verifier->emplace(entry.generation, status::has_toc_file);
|
||||
verifier->emplace(entry.generation, sstable_descriptor{component_status::has_toc_file, entry.version, entry.format});
|
||||
} else if (entry.component == sstables::sstable::component_type::TemporaryTOC) {
|
||||
verifier->emplace(entry.generation, status::has_temporary_toc_file);
|
||||
verifier->emplace(entry.generation, sstable_descriptor{component_status::has_temporary_toc_file, entry.version, entry.format});
|
||||
} else {
|
||||
verifier->emplace(entry.generation, status::has_some_file);
|
||||
verifier->emplace(entry.generation, sstable_descriptor{component_status::has_some_file, entry.version, entry.format});
|
||||
}
|
||||
}
|
||||
|
||||
// Retrieve both version and format used for this column family.
|
||||
if (!descriptor->version) {
|
||||
descriptor->version = entry.version;
|
||||
}
|
||||
if (!descriptor->format) {
|
||||
descriptor->format = entry.format;
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
|
||||
@@ -2045,14 +2056,12 @@ future<> distributed_loader::populate_column_family(distributed<database>& db, s
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).then([verifier, sstdir, descriptor, ks = std::move(ks), cf = std::move(cf)] {
|
||||
return do_for_each(*verifier, [sstdir = std::move(sstdir), ks = std::move(ks), cf = std::move(cf), descriptor, verifier] (auto v) {
|
||||
if (v.second == status::has_temporary_toc_file) {
|
||||
}).then([verifier, sstdir, ks = std::move(ks), cf = std::move(cf)] {
|
||||
return do_for_each(*verifier, [sstdir = std::move(sstdir), ks = std::move(ks), cf = std::move(cf), verifier] (auto v) {
|
||||
if (v.second.status == component_status::has_temporary_toc_file) {
|
||||
unsigned long gen = v.first;
|
||||
assert(descriptor->version);
|
||||
sstables::sstable::version_types version = descriptor->version.value();
|
||||
assert(descriptor->format);
|
||||
sstables::sstable::format_types format = descriptor->format.value();
|
||||
sstables::sstable::version_types version = v.second.version;
|
||||
sstables::sstable::format_types format = v.second.format;
|
||||
|
||||
if (engine().cpu_id() != 0) {
|
||||
dblog.debug("At directory: {}, partial SSTable with generation {} not relevant for this shard, ignoring", sstdir, v.first);
|
||||
@@ -2060,7 +2069,7 @@ future<> distributed_loader::populate_column_family(distributed<database>& db, s
|
||||
}
|
||||
// shard 0 is the responsible for removing a partial sstable.
|
||||
return sstables::sstable::remove_sstable_with_temp_toc(ks, cf, sstdir, gen, version, format);
|
||||
} else if (v.second != status::has_toc_file) {
|
||||
} else if (v.second.status != component_status::has_toc_file) {
|
||||
throw sstables::malformed_sstable_exception(sprint("At directory: %s: no TOC found for SSTable with generation %d!. Refusing to boot", sstdir, v.first));
|
||||
}
|
||||
return make_ready_future<>();
|
||||
@@ -2650,6 +2659,7 @@ bool database::update_column_family(schema_ptr new_schema) {
|
||||
void database::remove(const column_family& cf) {
|
||||
auto s = cf.schema();
|
||||
auto& ks = find_keyspace(s->ks_name());
|
||||
_querier_cache.evict_all_for_table(s->id());
|
||||
_column_families.erase(s->id());
|
||||
ks.metadata()->remove_column_family(s);
|
||||
_ks_cf_to_uuid.erase(std::make_pair(s->ks_name(), s->cf_name()));
|
||||
@@ -2666,6 +2676,7 @@ future<> database::drop_column_family(const sstring& ks_name, const sstring& cf_
|
||||
auto uuid = find_uuid(ks_name, cf_name);
|
||||
auto cf = _column_families.at(uuid);
|
||||
remove(*cf);
|
||||
cf->clear_views();
|
||||
auto& ks = find_keyspace(ks_name);
|
||||
return truncate(ks, *cf, std::move(tsf), snapshot).finally([this, cf] {
|
||||
return cf->stop();
|
||||
@@ -2813,6 +2824,7 @@ keyspace::make_column_family_config(const schema& s, const db::config& db_config
|
||||
cfg.commitlog_scheduling_group = _config.commitlog_scheduling_group;
|
||||
cfg.enable_metrics_reporting = db_config.enable_keyspace_column_family_metrics();
|
||||
cfg.large_partition_warning_threshold_bytes = db_config.compaction_large_partition_warning_threshold_mb()*1024*1024;
|
||||
cfg.view_update_concurrency_semaphore = _config.view_update_concurrency_semaphore;
|
||||
|
||||
return cfg;
|
||||
}
|
||||
@@ -2916,6 +2928,11 @@ bool database::has_schema(const sstring& ks_name, const sstring& cf_name) const
|
||||
return _ks_cf_to_uuid.count(std::make_pair(ks_name, cf_name)) > 0;
|
||||
}
|
||||
|
||||
std::vector<view_ptr> database::get_views() const {
|
||||
return boost::copy_range<std::vector<view_ptr>>(get_non_system_column_families()
|
||||
| boost::adaptors::filtered([] (auto& cf) { return cf->schema()->is_view(); })
|
||||
| boost::adaptors::transformed([] (auto& cf) { return view_ptr(cf->schema()); }));
|
||||
}
|
||||
|
||||
void database::create_in_memory_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
|
||||
keyspace ks(ksm, std::move(make_keyspace_config(*ksm)));
|
||||
@@ -3261,7 +3278,7 @@ future<mutation> database::do_apply_counter_update(column_family& cf, const froz
|
||||
std::move(regular_columns), { }, { }, cql_serialization_format::internal(), query::max_rows);
|
||||
|
||||
return do_with(std::move(slice), std::move(m), std::vector<locked_cell>(),
|
||||
[this, &cf, timeout, trace_state = std::move(trace_state)] (const query::partition_slice& slice, mutation& m, std::vector<locked_cell>& locks) mutable {
|
||||
[this, &cf, timeout, trace_state = std::move(trace_state), op = cf.write_in_progress()] (const query::partition_slice& slice, mutation& m, std::vector<locked_cell>& locks) mutable {
|
||||
tracing::trace(trace_state, "Acquiring counter locks");
|
||||
return cf.lock_counter_cells(m, timeout).then([&, m_schema = cf.schema(), trace_state = std::move(trace_state), timeout, this] (std::vector<locked_cell> lcs) mutable {
|
||||
locks = std::move(lcs);
|
||||
@@ -3494,16 +3511,19 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, db::timeout_
|
||||
throw std::runtime_error(sprint("attempted to mutate using not synced schema of %s.%s, version=%s",
|
||||
s->ks_name(), s->cf_name(), s->version()));
|
||||
}
|
||||
|
||||
// Signal to view building code that a write is in progress,
|
||||
// so it knows when new writes start being sent to a new view.
|
||||
auto op = cf.write_in_progress();
|
||||
if (cf.views().empty()) {
|
||||
return apply_with_commitlog(std::move(s), cf, std::move(uuid), m, timeout);
|
||||
return apply_with_commitlog(std::move(s), cf, std::move(uuid), m, timeout).finally([op = std::move(op)] { });
|
||||
}
|
||||
future<row_locker::lock_holder> f = cf.push_view_replica_updates(s, m);
|
||||
return f.then([this, s = std::move(s), uuid = std::move(uuid), &m, timeout] (row_locker::lock_holder lock) {
|
||||
auto& cf = find_column_family(uuid);
|
||||
future<row_locker::lock_holder> f = cf.push_view_replica_updates(s, m, timeout);
|
||||
return f.then([this, s = std::move(s), uuid = std::move(uuid), &m, timeout, &cf, op = std::move(op)] (row_locker::lock_holder lock) mutable {
|
||||
return apply_with_commitlog(std::move(s), cf, std::move(uuid), m, timeout).finally(
|
||||
// Hold the local lock on the base-table partition or row
|
||||
// taken before the read, until the update is done.
|
||||
[lock = std::move(lock)] { });
|
||||
[lock = std::move(lock), op = std::move(op)] { });
|
||||
});
|
||||
}
|
||||
|
||||
@@ -3581,6 +3601,7 @@ database::make_keyspace_config(const keyspace_metadata& ksm) {
|
||||
cfg.query_scheduling_group = _dbcfg.query_scheduling_group;
|
||||
cfg.commitlog_scheduling_group = _dbcfg.commitlog_scheduling_group;
|
||||
cfg.enable_metrics_reporting = _cfg->enable_keyspace_column_family_metrics();
|
||||
cfg.view_update_concurrency_semaphore = &_view_update_concurrency_sem;
|
||||
return cfg;
|
||||
}
|
||||
|
||||
@@ -3757,7 +3778,10 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun
|
||||
return f.then([this, &cf, truncated_at, low_mark, should_flush] {
|
||||
return cf.discard_sstables(truncated_at).then([this, &cf, truncated_at, low_mark, should_flush](db::replay_position rp) {
|
||||
// TODO: indexes.
|
||||
assert(low_mark <= rp);
|
||||
// Note: since discard_sstables was changed to only count tables owned by this shard,
|
||||
// we can get zero rp back. Changed assert, and ensure we save at least low_mark.
|
||||
assert(low_mark <= rp || rp == db::replay_position());
|
||||
rp = std::max(low_mark, rp);
|
||||
return truncate_views(cf, truncated_at, should_flush).then([&cf, truncated_at, rp] {
|
||||
return db::system_keyspace::save_truncation_record(cf, truncated_at, rp);
|
||||
});
|
||||
@@ -4165,8 +4189,11 @@ future<db::replay_position> column_family::discard_sstables(db_clock::time_point
|
||||
|
||||
for (auto& p : *cf._sstables->all()) {
|
||||
if (p->max_data_age() <= gc_trunc) {
|
||||
rp = std::max(p->get_stats_metadata().position, rp);
|
||||
remove.emplace_back(p);
|
||||
// Only one shard that own the sstable will submit it for deletion to avoid race condition in delete procedure.
|
||||
if (*boost::min_element(p->get_shards_for_this_sstable()) == engine().cpu_id()) {
|
||||
rp = std::max(p->get_stats_metadata().position, rp);
|
||||
remove.emplace_back(p);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
pruned->insert(p);
|
||||
@@ -4266,10 +4293,12 @@ void column_family::set_schema(schema_ptr s) {
|
||||
|
||||
static std::vector<view_ptr>::iterator find_view(std::vector<view_ptr>& views, const view_ptr& v) {
|
||||
return std::find_if(views.begin(), views.end(), [&v] (auto&& e) {
|
||||
return e->cf_name() == v->cf_name();
|
||||
return e->id() == v->id();
|
||||
});
|
||||
}
|
||||
|
||||
void column_family::add_or_update_view(view_ptr v) {
|
||||
v->view_info()->initialize_base_dependent_fields(*schema());
|
||||
auto existing = find_view(_views, v);
|
||||
if (existing != _views.end()) {
|
||||
*existing = std::move(v);
|
||||
@@ -4285,6 +4314,10 @@ void column_family::remove_view(view_ptr v) {
|
||||
}
|
||||
}
|
||||
|
||||
void column_family::clear_views() {
|
||||
_views.clear();
|
||||
}
|
||||
|
||||
const std::vector<view_ptr>& column_family::views() const {
|
||||
return _views;
|
||||
}
|
||||
@@ -4312,13 +4345,17 @@ std::vector<view_ptr> column_family::affected_views(const schema_ptr& base, cons
|
||||
future<> column_family::generate_and_propagate_view_updates(const schema_ptr& base,
|
||||
std::vector<view_ptr>&& views,
|
||||
mutation&& m,
|
||||
flat_mutation_reader_opt existings) const {
|
||||
flat_mutation_reader_opt existings,
|
||||
db::timeout_clock::time_point timeout) const {
|
||||
auto base_token = m.token();
|
||||
return db::view::generate_view_updates(base,
|
||||
std::move(views),
|
||||
flat_mutation_reader_from_mutations({std::move(m)}),
|
||||
std::move(existings)).then([base_token = std::move(base_token)] (auto&& updates) {
|
||||
db::view::mutate_MV(std::move(base_token), std::move(updates));
|
||||
std::move(existings)).then([this, timeout, base_token = std::move(base_token)] (auto&& updates) mutable {
|
||||
return seastar::get_units(*_config.view_update_concurrency_semaphore, 1, timeout).then(
|
||||
[this, base_token = std::move(base_token), updates = std::move(updates)] (auto units) mutable {
|
||||
db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats).handle_exception([units = std::move(units)] (auto ignored) { });
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -4326,7 +4363,7 @@ future<> column_family::generate_and_propagate_view_updates(const schema_ptr& ba
|
||||
* Given an update for the base table, calculates the set of potentially affected views,
|
||||
* generates the relevant updates, and sends them to the paired view replicas.
|
||||
*/
|
||||
future<row_locker::lock_holder> column_family::push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm) const {
|
||||
future<row_locker::lock_holder> column_family::push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm, db::timeout_clock::time_point timeout) const {
|
||||
//FIXME: Avoid unfreezing here.
|
||||
auto m = fm.unfreeze(s);
|
||||
auto& base = schema();
|
||||
@@ -4337,7 +4374,7 @@ future<row_locker::lock_holder> column_family::push_view_replica_updates(const s
|
||||
}
|
||||
auto cr_ranges = db::view::calculate_affected_clustering_ranges(*base, m.decorated_key(), m.partition(), views);
|
||||
if (cr_ranges.empty()) {
|
||||
return generate_and_propagate_view_updates(base, std::move(views), std::move(m), { }).then([] {
|
||||
return generate_and_propagate_view_updates(base, std::move(views), std::move(m), { }, timeout).then([] {
|
||||
// In this case we are not doing a read-before-write, just a
|
||||
// write, so no lock is needed.
|
||||
return make_ready_future<row_locker::lock_holder>();
|
||||
@@ -4359,18 +4396,18 @@ future<row_locker::lock_holder> column_family::push_view_replica_updates(const s
|
||||
// We'll return this lock to the caller, which will release it after
|
||||
// writing the base-table update.
|
||||
future<row_locker::lock_holder> lockf = local_base_lock(base, m.decorated_key(), slice.default_row_ranges());
|
||||
return lockf.then([m = std::move(m), slice = std::move(slice), views = std::move(views), base, this] (row_locker::lock_holder lock) {
|
||||
return lockf.then([m = std::move(m), slice = std::move(slice), views = std::move(views), base, this, timeout] (row_locker::lock_holder lock) {
|
||||
return do_with(
|
||||
dht::partition_range::make_singular(m.decorated_key()),
|
||||
std::move(slice),
|
||||
std::move(m),
|
||||
[base, views = std::move(views), lock = std::move(lock), this] (auto& pk, auto& slice, auto& m) mutable {
|
||||
auto reader = this->as_mutation_source().make_reader(
|
||||
[base, views = std::move(views), lock = std::move(lock), this, timeout] (auto& pk, auto& slice, auto& m) mutable {
|
||||
auto reader = this->make_reader(
|
||||
base,
|
||||
pk,
|
||||
slice,
|
||||
service::get_local_sstable_query_read_priority());
|
||||
return this->generate_and_propagate_view_updates(base, std::move(views), std::move(m), std::move(reader)).then([lock = std::move(lock)] () mutable {
|
||||
return this->generate_and_propagate_view_updates(base, std::move(views), std::move(m), std::move(reader), timeout).then([lock = std::move(lock)] () mutable {
|
||||
// return the local partition/row lock we have taken so it
|
||||
// remains locked until the caller is done modifying this
|
||||
// partition/row and destroys the lock object.
|
||||
@@ -4457,6 +4494,31 @@ column_family::local_base_lock(const schema_ptr& s, const dht::decorated_key& pk
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Given some updates on the base table and assuming there are no pre-existing, overlapping updates,
|
||||
* generates the mutations to be applied to the base table's views, and sends them to the paired
|
||||
* view replicas. The future resolves when the updates have been acknowledged by the repicas, i.e.,
|
||||
* propagating the view updates to the view replicas happens synchronously.
|
||||
*
|
||||
* @param views the affected views which need to be updated.
|
||||
* @param base_token The token to use to match the base replica with the paired replicas.
|
||||
* @param reader the base table updates being applied, which all correspond to the base token.
|
||||
* @return a future that resolves when the updates have been acknowledged by the view replicas
|
||||
*/
|
||||
future<> column_family::populate_views(
|
||||
std::vector<view_ptr> views,
|
||||
dht::token base_token,
|
||||
flat_mutation_reader&& reader) {
|
||||
auto& schema = reader.schema();
|
||||
return db::view::generate_view_updates(
|
||||
schema,
|
||||
std::move(views),
|
||||
std::move(reader),
|
||||
{ }).then([base_token = std::move(base_token), this] (auto&& updates) {
|
||||
return db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats);
|
||||
});
|
||||
}
|
||||
|
||||
void column_family::set_hit_rate(gms::inet_address addr, cache_temperature rate) {
|
||||
auto& e = _cluster_cache_hit_rates[addr];
|
||||
e.rate = rate;
|
||||
@@ -4516,16 +4578,14 @@ flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s,
|
||||
}
|
||||
return reader;
|
||||
};
|
||||
return make_combined_reader(s, std::make_unique<incremental_reader_selector>(s,
|
||||
std::move(sstables),
|
||||
pr,
|
||||
slice,
|
||||
pc,
|
||||
std::move(resource_tracker),
|
||||
std::move(trace_state),
|
||||
fwd,
|
||||
fwd_mr,
|
||||
std::move(reader_factory_fn)),
|
||||
auto all_readers = boost::copy_range<std::vector<flat_mutation_reader>>(
|
||||
*sstables->all()
|
||||
| boost::adaptors::transformed([&] (sstables::shared_sstable sst) -> flat_mutation_reader {
|
||||
return reader_factory_fn(sst, pr);
|
||||
})
|
||||
);
|
||||
return make_combined_reader(s,
|
||||
std::move(all_readers),
|
||||
fwd,
|
||||
fwd_mr);
|
||||
}
|
||||
@@ -4544,16 +4604,14 @@ flat_mutation_reader make_range_sstable_reader(schema_ptr s,
|
||||
auto reader_factory_fn = [s, &slice, &pc, resource_tracker, fwd, fwd_mr, &monitor_generator] (sstables::shared_sstable& sst, const dht::partition_range& pr) {
|
||||
return sst->read_range_rows_flat(s, pr, slice, pc, resource_tracker, fwd, fwd_mr, monitor_generator(sst));
|
||||
};
|
||||
return make_combined_reader(s, std::make_unique<incremental_reader_selector>(s,
|
||||
std::move(sstables),
|
||||
pr,
|
||||
slice,
|
||||
pc,
|
||||
std::move(resource_tracker),
|
||||
std::move(trace_state),
|
||||
fwd,
|
||||
fwd_mr,
|
||||
std::move(reader_factory_fn)),
|
||||
auto sstable_readers = boost::copy_range<std::vector<flat_mutation_reader>>(
|
||||
*sstables->all()
|
||||
| boost::adaptors::transformed([&] (sstables::shared_sstable sst) {
|
||||
return reader_factory_fn(sst, pr);
|
||||
})
|
||||
);
|
||||
return make_combined_reader(s,
|
||||
std::move(sstable_readers),
|
||||
fwd,
|
||||
fwd_mr);
|
||||
}
|
||||
|
||||
32
database.hh
32
database.hh
@@ -310,6 +310,7 @@ public:
|
||||
seastar::scheduling_group streaming_scheduling_group;
|
||||
bool enable_metrics_reporting = false;
|
||||
uint64_t large_partition_warning_threshold_bytes = std::numeric_limits<uint64_t>::max();
|
||||
db::timeout_semaphore* view_update_concurrency_semaphore;
|
||||
};
|
||||
struct no_commitlog {};
|
||||
struct stats {
|
||||
@@ -344,6 +345,7 @@ private:
|
||||
schema_ptr _schema;
|
||||
config _config;
|
||||
mutable stats _stats;
|
||||
mutable db::view::stats _view_stats;
|
||||
|
||||
uint64_t _failed_counter_applies_to_memtable = 0;
|
||||
|
||||
@@ -461,6 +463,11 @@ private:
|
||||
double _cached_percentile = -1;
|
||||
lowres_clock::time_point _percentile_cache_timestamp;
|
||||
std::chrono::milliseconds _percentile_cache_value;
|
||||
|
||||
// Phaser used to synchronize with in-progress writes. This is useful for code that,
|
||||
// after some modification, needs to ensure that news writes will see it before
|
||||
// it can proceed, such as the view building code.
|
||||
utils::phased_barrier _pending_writes_phaser;
|
||||
private:
|
||||
void update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable, const std::vector<unsigned>& shards_for_the_sstable) noexcept;
|
||||
// Adds new sstable to the set of sstables
|
||||
@@ -784,10 +791,19 @@ public:
|
||||
|
||||
future<> run_with_compaction_disabled(std::function<future<> ()> func);
|
||||
|
||||
utils::phased_barrier::operation write_in_progress() {
|
||||
return _pending_writes_phaser.start();
|
||||
}
|
||||
|
||||
future<> await_pending_writes() {
|
||||
return _pending_writes_phaser.advance_and_await();
|
||||
}
|
||||
|
||||
void add_or_update_view(view_ptr v);
|
||||
void remove_view(view_ptr v);
|
||||
void clear_views();
|
||||
const std::vector<view_ptr>& views() const;
|
||||
future<row_locker::lock_holder> push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm) const;
|
||||
future<row_locker::lock_holder> push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm, db::timeout_clock::time_point timeout) const;
|
||||
void add_coordinator_read_latency(utils::estimated_histogram::duration latency);
|
||||
std::chrono::milliseconds get_coordinator_read_latency_percentile(double percentile);
|
||||
|
||||
@@ -798,12 +814,19 @@ public:
|
||||
uint64_t large_partition_warning_threshold_bytes() const {
|
||||
return _config.large_partition_warning_threshold_bytes;
|
||||
}
|
||||
|
||||
future<> populate_views(
|
||||
std::vector<view_ptr>,
|
||||
dht::token base_token,
|
||||
flat_mutation_reader&&);
|
||||
|
||||
private:
|
||||
std::vector<view_ptr> affected_views(const schema_ptr& base, const mutation& update) const;
|
||||
future<> generate_and_propagate_view_updates(const schema_ptr& base,
|
||||
std::vector<view_ptr>&& views,
|
||||
mutation&& m,
|
||||
flat_mutation_reader_opt existings) const;
|
||||
flat_mutation_reader_opt existings,
|
||||
db::timeout_clock::time_point timeout) const;
|
||||
|
||||
mutable row_locker _row_locker;
|
||||
future<row_locker::lock_holder> local_base_lock(const schema_ptr& s, const dht::decorated_key& pk, const query::clustering_row_ranges& rows) const;
|
||||
@@ -989,6 +1012,7 @@ public:
|
||||
seastar::scheduling_group query_scheduling_group;
|
||||
seastar::scheduling_group streaming_scheduling_group;
|
||||
bool enable_metrics_reporting = false;
|
||||
db::timeout_semaphore* view_update_concurrency_semaphore = nullptr;
|
||||
};
|
||||
private:
|
||||
std::unique_ptr<locator::abstract_replication_strategy> _replication_strategy;
|
||||
@@ -1119,6 +1143,8 @@ private:
|
||||
|
||||
semaphore _sstable_load_concurrency_sem{max_concurrent_sstable_loads()};
|
||||
|
||||
db::timeout_semaphore _view_update_concurrency_sem{100}; // Stand-in hack for #2538
|
||||
|
||||
concrete_execution_stage<future<lw_shared_ptr<query::result>>,
|
||||
column_family*,
|
||||
schema_ptr,
|
||||
@@ -1273,6 +1299,8 @@ public:
|
||||
|
||||
std::vector<lw_shared_ptr<column_family>> get_non_system_column_families() const;
|
||||
|
||||
std::vector<view_ptr> get_views() const;
|
||||
|
||||
const std::unordered_map<std::pair<sstring, sstring>, utils::UUID, utils::tuple_hash>&
|
||||
get_column_families_mapping() const {
|
||||
return _ks_cf_to_uuid;
|
||||
|
||||
@@ -723,7 +723,7 @@ public:
|
||||
*/
|
||||
auto me = shared_from_this();
|
||||
auto fp = _file_pos;
|
||||
return _pending_ops.wait_for_pending(timeout).then([me = std::move(me), fp, timeout] {
|
||||
return _pending_ops.wait_for_pending(timeout).then([me, fp, timeout] {
|
||||
if (fp != me->_file_pos) {
|
||||
// some other request already wrote this buffer.
|
||||
// If so, wait for the operation at our intended file offset
|
||||
|
||||
@@ -735,6 +735,7 @@ public:
|
||||
val(enable_sstable_data_integrity_check, bool, false, Used, "Enable interposer which checks for integrity of every sstable write." \
|
||||
" Performance is affected to some extent as a result. Useful to help debugging problems that may arise at another layers.") \
|
||||
val(cpu_scheduler, bool, true, Used, "Enable cpu scheduling") \
|
||||
val(view_building, bool, true, Used, "Enable view building; should only be set to false when the node is experience issues due to view building") \
|
||||
/* done! */
|
||||
|
||||
#define _make_value_member(name, type, deflt, status, desc, ...) \
|
||||
|
||||
@@ -42,10 +42,8 @@ const std::string manager::FILENAME_PREFIX("HintsLog" + commitlog::descriptor::S
|
||||
const std::chrono::seconds manager::hint_file_write_timeout = std::chrono::seconds(2);
|
||||
const std::chrono::seconds manager::hints_flush_period = std::chrono::seconds(10);
|
||||
const std::chrono::seconds manager::space_watchdog::_watchdog_period = std::chrono::seconds(1);
|
||||
// TODO: remove this when we switch to C++17
|
||||
constexpr size_t manager::_max_hints_send_queue_length;
|
||||
|
||||
size_t db::hints::manager::max_shard_disk_space_size;
|
||||
size_t db::hints::resource_manager::max_shard_disk_space_size;
|
||||
|
||||
manager::manager(sstring hints_directory, std::vector<sstring> hinted_dcs, int64_t max_hint_window_ms, distributed<database>& db)
|
||||
: _hints_dir(boost::filesystem::path(hints_directory) / format("{:d}", engine().cpu_id()).c_str())
|
||||
@@ -53,8 +51,8 @@ manager::manager(sstring hints_directory, std::vector<sstring> hinted_dcs, int64
|
||||
, _local_snitch_ptr(locator::i_endpoint_snitch::get_local_snitch_ptr())
|
||||
, _max_hint_window_us(max_hint_window_ms * 1000)
|
||||
, _local_db(db.local())
|
||||
, _max_send_in_flight_memory(std::max(memory::stats().total_memory() / 10, _max_hints_send_queue_length))
|
||||
, _min_send_hint_budget(_max_send_in_flight_memory / _max_hints_send_queue_length)
|
||||
, _max_send_in_flight_memory(std::max(memory::stats().total_memory() / 10, resource_manager::max_hints_send_queue_length))
|
||||
, _min_send_hint_budget(_max_send_in_flight_memory / resource_manager::max_hints_send_queue_length)
|
||||
, _send_limiter(_max_send_in_flight_memory)
|
||||
, _space_watchdog(*this)
|
||||
{
|
||||
@@ -256,8 +254,8 @@ future<db::commitlog> manager::end_point_hints_manager::add_store() noexcept {
|
||||
commitlog::config cfg;
|
||||
|
||||
cfg.commit_log_location = _hints_dir.c_str();
|
||||
cfg.commitlog_segment_size_in_mb = _hint_segment_size_in_mb;
|
||||
cfg.commitlog_total_space_in_mb = _max_hints_per_ep_size_mb;
|
||||
cfg.commitlog_segment_size_in_mb = resource_manager::hint_segment_size_in_mb;
|
||||
cfg.commitlog_total_space_in_mb = resource_manager::max_hints_per_ep_size_mb;
|
||||
cfg.fname_prefix = manager::FILENAME_PREFIX;
|
||||
|
||||
return commitlog::create_commitlog(std::move(cfg)).then([this] (commitlog l) {
|
||||
@@ -335,7 +333,7 @@ future<> manager::end_point_hints_manager::sender::do_send_one_mutation(mutation
|
||||
// to be generated as a result of hints sending.
|
||||
if (boost::range::find(natural_endpoints, end_point_key()) != natural_endpoints.end()) {
|
||||
manager_logger.trace("Sending directly to {}", end_point_key());
|
||||
return _proxy.send_to_endpoint(std::move(m), end_point_key(), write_type::SIMPLE);
|
||||
return _proxy.send_to_endpoint(std::move(m), end_point_key(), { }, write_type::SIMPLE);
|
||||
} else {
|
||||
manager_logger.trace("Endpoints set has changed and {} is no longer a replica. Mutating from scratch...", end_point_key());
|
||||
return _proxy.mutate({std::move(m)}, consistency_level::ALL, nullptr);
|
||||
@@ -470,9 +468,9 @@ void manager::space_watchdog::on_timer() {
|
||||
}).then([this] {
|
||||
// Adjust the quota to take into account the space we guarantee to every end point manager
|
||||
size_t adjusted_quota = 0;
|
||||
size_t delta = _shard_manager._ep_managers.size() * _hint_segment_size_in_mb * 1024 * 1024;
|
||||
if (max_shard_disk_space_size > delta) {
|
||||
adjusted_quota = max_shard_disk_space_size - delta;
|
||||
size_t delta = _shard_manager._ep_managers.size() * resource_manager::hint_segment_size_in_mb * 1024 * 1024;
|
||||
if (resource_manager::max_shard_disk_space_size > delta) {
|
||||
adjusted_quota = resource_manager::max_shard_disk_space_size - delta;
|
||||
}
|
||||
|
||||
bool can_hint = _total_size < adjusted_quota;
|
||||
@@ -510,7 +508,7 @@ void manager::space_watchdog::on_timer() {
|
||||
bool manager::too_many_in_flight_hints_for(ep_key_type ep) const noexcept {
|
||||
// There is no need to check the DC here because if there is an in-flight hint for this end point then this means that
|
||||
// its DC has already been checked and found to be ok.
|
||||
return _stats.size_of_hints_in_progress > _max_size_of_hints_in_progress && !utils::fb_utilities::is_me(ep) && hints_in_progress_for(ep) > 0 && local_gossiper().get_endpoint_downtime(ep) <= _max_hint_window_us;
|
||||
return _stats.size_of_hints_in_progress > resource_manager::max_size_of_hints_in_progress && !utils::fb_utilities::is_me(ep) && hints_in_progress_for(ep) > 0 && local_gossiper().get_endpoint_downtime(ep) <= _max_hint_window_us;
|
||||
}
|
||||
|
||||
bool manager::can_hint_for(ep_key_type ep) const noexcept {
|
||||
@@ -527,7 +525,7 @@ bool manager::can_hint_for(ep_key_type ep) const noexcept {
|
||||
// hints is more than the maximum allowed value.
|
||||
//
|
||||
// In the worst case there's going to be (_max_size_of_hints_in_progress + N - 1) in-flight hints, where N is the total number Nodes in the cluster.
|
||||
if (_stats.size_of_hints_in_progress > _max_size_of_hints_in_progress && hints_in_progress_for(ep) > 0) {
|
||||
if (_stats.size_of_hints_in_progress > resource_manager::max_size_of_hints_in_progress && hints_in_progress_for(ep) > 0) {
|
||||
manager_logger.trace("size_of_hints_in_progress {} hints_in_progress_for({}) {}", _stats.size_of_hints_in_progress, ep, hints_in_progress_for(ep));
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@
|
||||
#include "gms/gossiper.hh"
|
||||
#include "db/commitlog/commitlog.hh"
|
||||
#include "utils/loading_shared_values.hh"
|
||||
#include "db/hints/resource_manager.hh"
|
||||
|
||||
namespace db {
|
||||
namespace hints {
|
||||
@@ -378,13 +379,8 @@ public:
|
||||
static const std::string FILENAME_PREFIX;
|
||||
static const std::chrono::seconds hints_flush_period;
|
||||
static const std::chrono::seconds hint_file_write_timeout;
|
||||
static size_t max_shard_disk_space_size;
|
||||
|
||||
private:
|
||||
static constexpr uint64_t _max_size_of_hints_in_progress = 10 * 1024 * 1024; // 10MB
|
||||
static constexpr size_t _hint_segment_size_in_mb = 32;
|
||||
static constexpr size_t _max_hints_per_ep_size_mb = 128; // 4 files 32MB each
|
||||
static constexpr size_t _max_hints_send_queue_length = 128;
|
||||
const boost::filesystem::path _hints_dir;
|
||||
|
||||
node_to_hint_store_factory_type _store_factory;
|
||||
|
||||
47
db/hints/resource_manager.hh
Normal file
47
db/hints/resource_manager.hh
Normal file
@@ -0,0 +1,47 @@
|
||||
/*
|
||||
* Copyright (C) 2018 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <seastar/core/semaphore.hh>
|
||||
#include <seastar/core/gate.hh>
|
||||
#include <seastar/core/memory.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
#include "seastarx.hh"
|
||||
#include <unordered_set>
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <gms/inet_address.hh>
|
||||
|
||||
namespace db {
|
||||
namespace hints {
|
||||
|
||||
class resource_manager {
|
||||
public:
|
||||
static constexpr uint64_t max_size_of_hints_in_progress = 10 * 1024 * 1024; // 10MB
|
||||
static constexpr size_t hint_segment_size_in_mb = 32;
|
||||
static constexpr size_t max_hints_per_ep_size_mb = 128; // 4 files 32MB each
|
||||
static constexpr size_t max_hints_send_queue_length = 128;
|
||||
static size_t max_shard_disk_space_size;
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
@@ -1,389 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright (C) 2015 ScyllaDB
|
||||
*
|
||||
* Modified by ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "core/sstring.hh"
|
||||
#include "seastarx.hh"
|
||||
|
||||
namespace db {
|
||||
namespace index {
|
||||
|
||||
/**
|
||||
* Abstract base class for different types of secondary indexes.
|
||||
*
|
||||
* Do not extend this directly, please pick from PerColumnSecondaryIndex or PerRowSecondaryIndex
|
||||
*/
|
||||
class secondary_index {
|
||||
public:
|
||||
static const sstring custom_index_option_name;
|
||||
|
||||
/**
|
||||
* The name of the option used to specify that the index is on the collection keys.
|
||||
*/
|
||||
static const sstring index_keys_option_name;
|
||||
|
||||
/**
|
||||
* The name of the option used to specify that the index is on the collection values.
|
||||
*/
|
||||
static const sstring index_values_option_name;
|
||||
|
||||
/**
|
||||
* The name of the option used to specify that the index is on the collection (map) entries.
|
||||
*/
|
||||
static const sstring index_entries_option_name;
|
||||
|
||||
#if 0 // TODO:
|
||||
|
||||
public static final AbstractType<?> keyComparator = StorageService.getPartitioner().preservesOrder()
|
||||
? BytesType.instance
|
||||
: new LocalByPartionerType(StorageService.getPartitioner());
|
||||
|
||||
/**
|
||||
* Base CF that has many indexes
|
||||
*/
|
||||
protected ColumnFamilyStore baseCfs;
|
||||
|
||||
|
||||
/**
|
||||
* The column definitions which this index is responsible for
|
||||
*/
|
||||
protected final Set<ColumnDefinition> columnDefs = Collections.newSetFromMap(new ConcurrentHashMap<ColumnDefinition,Boolean>());
|
||||
|
||||
/**
|
||||
* Perform any initialization work
|
||||
*/
|
||||
public abstract void init();
|
||||
|
||||
/**
|
||||
* Reload an existing index following a change to its configuration,
|
||||
* or that of the indexed column(s). Differs from init() in that we expect
|
||||
* expect new resources (such as CFS for a KEYS index) to be created by
|
||||
* init() but not here
|
||||
*/
|
||||
public abstract void reload();
|
||||
|
||||
/**
|
||||
* Validates the index_options passed in the ColumnDef
|
||||
* @throws ConfigurationException
|
||||
*/
|
||||
public abstract void validateOptions() throws ConfigurationException;
|
||||
|
||||
/**
|
||||
* @return The name of the index
|
||||
*/
|
||||
abstract public String getIndexName();
|
||||
|
||||
/**
|
||||
* All internal 2ndary indexes will return "_internal_" for this. Custom
|
||||
* 2ndary indexes will return their class name. This only matter for
|
||||
* SecondaryIndexManager.groupByIndexType.
|
||||
*/
|
||||
String indexTypeForGrouping()
|
||||
{
|
||||
// Our internal indexes overwrite this
|
||||
return getClass().getCanonicalName();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the unique name for this index and column
|
||||
* to be stored in the SystemKeyspace that tracks if each column is built
|
||||
*
|
||||
* @param columnName the name of the column
|
||||
* @return the unique name
|
||||
*/
|
||||
abstract public String getNameForSystemKeyspace(ByteBuffer columnName);
|
||||
|
||||
/**
|
||||
* Checks if the index for specified column is fully built
|
||||
*
|
||||
* @param columnName the column
|
||||
* @return true if the index is fully built
|
||||
*/
|
||||
public boolean isIndexBuilt(ByteBuffer columnName)
|
||||
{
|
||||
return SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), getNameForSystemKeyspace(columnName));
|
||||
}
|
||||
|
||||
public void setIndexBuilt()
|
||||
{
|
||||
for (ColumnDefinition columnDef : columnDefs)
|
||||
SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), getNameForSystemKeyspace(columnDef.name.bytes));
|
||||
}
|
||||
|
||||
public void setIndexRemoved()
|
||||
{
|
||||
for (ColumnDefinition columnDef : columnDefs)
|
||||
SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), getNameForSystemKeyspace(columnDef.name.bytes));
|
||||
}
|
||||
|
||||
/**
|
||||
* Called at query time
|
||||
* Creates a implementation specific searcher instance for this index type
|
||||
* @param columns the list of columns which belong to this index type
|
||||
* @return the secondary index search impl
|
||||
*/
|
||||
protected abstract SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer> columns);
|
||||
|
||||
/**
|
||||
* Forces this indexes' in memory data to disk
|
||||
*/
|
||||
public abstract void forceBlockingFlush();
|
||||
|
||||
/**
|
||||
* Allow access to the underlying column family store if there is one
|
||||
* @return the underlying column family store or null
|
||||
*/
|
||||
public abstract ColumnFamilyStore getIndexCfs();
|
||||
|
||||
|
||||
/**
|
||||
* Delete all files and references to this index
|
||||
* @param columnName the indexed column to remove
|
||||
*/
|
||||
public abstract void removeIndex(ByteBuffer columnName);
|
||||
|
||||
/**
|
||||
* Remove the index and unregisters this index's mbean if one exists
|
||||
*/
|
||||
public abstract void invalidate();
|
||||
|
||||
/**
|
||||
* Truncate all the data from the current index
|
||||
*
|
||||
* @param truncatedAt The truncation timestamp, all data before that timestamp should be rejected.
|
||||
*/
|
||||
public abstract void truncateBlocking(long truncatedAt);
|
||||
|
||||
/**
|
||||
* Builds the index using the data in the underlying CFS
|
||||
* Blocks till it's complete
|
||||
*/
|
||||
protected void buildIndexBlocking()
|
||||
{
|
||||
logger.info(String.format("Submitting index build of %s for data in %s",
|
||||
getIndexName(), StringUtils.join(baseCfs.getSSTables(), ", ")));
|
||||
|
||||
try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(ColumnFamilyStore.CANONICAL_SSTABLES).refs)
|
||||
{
|
||||
SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
|
||||
Collections.singleton(getIndexName()),
|
||||
new ReducingKeyIterator(sstables));
|
||||
Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
|
||||
FBUtilities.waitOnFuture(future);
|
||||
forceBlockingFlush();
|
||||
setIndexBuilt();
|
||||
}
|
||||
logger.info("Index build of {} complete", getIndexName());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Builds the index using the data in the underlying CF, non blocking
|
||||
*
|
||||
*
|
||||
* @return A future object which the caller can block on (optional)
|
||||
*/
|
||||
public Future<?> buildIndexAsync()
|
||||
{
|
||||
// if we're just linking in the index to indexedColumns on an already-built index post-restart, we're done
|
||||
boolean allAreBuilt = true;
|
||||
for (ColumnDefinition cdef : columnDefs)
|
||||
{
|
||||
if (!SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), getNameForSystemKeyspace(cdef.name.bytes)))
|
||||
{
|
||||
allAreBuilt = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (allAreBuilt)
|
||||
return null;
|
||||
|
||||
// build it asynchronously; addIndex gets called by CFS open and schema update, neither of which
|
||||
// we want to block for a long period. (actual build is serialized on CompactionManager.)
|
||||
Runnable runnable = new Runnable()
|
||||
{
|
||||
public void run()
|
||||
{
|
||||
baseCfs.forceBlockingFlush();
|
||||
buildIndexBlocking();
|
||||
}
|
||||
};
|
||||
FutureTask<?> f = new FutureTask<Object>(runnable, null);
|
||||
|
||||
new Thread(f, "Creating index: " + getIndexName()).start();
|
||||
return f;
|
||||
}
|
||||
|
||||
public ColumnFamilyStore getBaseCfs()
|
||||
{
|
||||
return baseCfs;
|
||||
}
|
||||
|
||||
private void setBaseCfs(ColumnFamilyStore baseCfs)
|
||||
{
|
||||
this.baseCfs = baseCfs;
|
||||
}
|
||||
|
||||
public Set<ColumnDefinition> getColumnDefs()
|
||||
{
|
||||
return columnDefs;
|
||||
}
|
||||
|
||||
void addColumnDef(ColumnDefinition columnDef)
|
||||
{
|
||||
columnDefs.add(columnDef);
|
||||
}
|
||||
|
||||
void removeColumnDef(ByteBuffer name)
|
||||
{
|
||||
Iterator<ColumnDefinition> it = columnDefs.iterator();
|
||||
while (it.hasNext())
|
||||
{
|
||||
if (it.next().name.bytes.equals(name))
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
|
||||
/** Returns true if the index supports lookups for the given operator, false otherwise. */
|
||||
public boolean supportsOperator(Operator operator)
|
||||
{
|
||||
return operator == Operator.EQ;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the decoratedKey for a column value. Assumes an index CFS is present.
|
||||
* @param value column value
|
||||
* @return decorated key
|
||||
*/
|
||||
public DecoratedKey getIndexKeyFor(ByteBuffer value)
|
||||
{
|
||||
return getIndexCfs().partitioner.decorateKey(value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the provided cell name is indexed by this secondary index.
|
||||
*/
|
||||
public abstract boolean indexes(CellName name);
|
||||
|
||||
/**
|
||||
* This is the primary way to create a secondary index instance for a CF column.
|
||||
* It will validate the index_options before initializing.
|
||||
*
|
||||
* @param baseCfs the source of data for the Index
|
||||
* @param cdef the meta information about this column (index_type, index_options, name, etc...)
|
||||
*
|
||||
* @return The secondary index instance for this column
|
||||
* @throws ConfigurationException
|
||||
*/
|
||||
public static SecondaryIndex createInstance(ColumnFamilyStore baseCfs, ColumnDefinition cdef) throws ConfigurationException
|
||||
{
|
||||
SecondaryIndex index;
|
||||
|
||||
switch (cdef.getIndexType())
|
||||
{
|
||||
case KEYS:
|
||||
index = new KeysIndex();
|
||||
break;
|
||||
case COMPOSITES:
|
||||
index = CompositesIndex.create(cdef);
|
||||
break;
|
||||
case CUSTOM:
|
||||
assert cdef.getIndexOptions() != null;
|
||||
String class_name = cdef.getIndexOptions().get(CUSTOM_INDEX_OPTION_NAME);
|
||||
assert class_name != null;
|
||||
try
|
||||
{
|
||||
index = (SecondaryIndex) Class.forName(class_name).newInstance();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Unknown index type: " + cdef.getIndexName());
|
||||
}
|
||||
|
||||
index.addColumnDef(cdef);
|
||||
index.validateOptions();
|
||||
index.setBaseCfs(baseCfs);
|
||||
|
||||
return index;
|
||||
}
|
||||
|
||||
public abstract boolean validate(ByteBuffer rowKey, Cell cell);
|
||||
|
||||
public abstract long estimateResultRows();
|
||||
|
||||
/**
|
||||
* Returns the index comparator for index backed by CFS, or null.
|
||||
*
|
||||
* Note: it would be cleaner to have this be a member method. However we need this when opening indexes
|
||||
* sstables, but by then the CFS won't be fully initiated, so the SecondaryIndex object won't be accessible.
|
||||
*/
|
||||
public static CellNameType getIndexComparator(CFMetaData baseMetadata, ColumnDefinition cdef)
|
||||
{
|
||||
switch (cdef.getIndexType())
|
||||
{
|
||||
case KEYS:
|
||||
return new SimpleDenseCellNameType(keyComparator);
|
||||
case COMPOSITES:
|
||||
return CompositesIndex.getIndexComparator(baseMetadata, cdef);
|
||||
case CUSTOM:
|
||||
return null;
|
||||
}
|
||||
throw new AssertionError();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return Objects.toStringHelper(this).add("columnDefs", columnDefs).toString();
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
@@ -827,15 +827,6 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:
|
||||
/*auto& old_aggregates = */read_schema_for_keyspaces(proxy, AGGREGATES, keyspaces).get0();
|
||||
#endif
|
||||
|
||||
// Incoming mutations have the version field deleted. Delete here as well so that
|
||||
// schemas which are otherwise equal don't appear as differing.
|
||||
for (auto&& e : old_column_families) {
|
||||
schema_mutations& sm = e.second;
|
||||
if (sm.scylla_tables()) {
|
||||
delete_schema_version(*sm.scylla_tables());
|
||||
}
|
||||
}
|
||||
|
||||
proxy.local().mutate_locally(std::move(mutations)).get0();
|
||||
|
||||
if (do_flush) {
|
||||
|
||||
143
db/system_distributed_keyspace.cc
Normal file
143
db/system_distributed_keyspace.cc
Normal file
@@ -0,0 +1,143 @@
|
||||
/*
|
||||
* Copyright (C) 2018 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
#include "database.hh"
|
||||
#include "db/consistency_level_type.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "schema_builder.hh"
|
||||
#include "types.hh"
|
||||
|
||||
#include <seastar/core/reactor.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
|
||||
#include <boost/range/adaptor/transformed.hpp>
|
||||
|
||||
#include <optional>
|
||||
#include <vector>
|
||||
#include <experimental/optional>
|
||||
|
||||
namespace db {
|
||||
|
||||
schema_ptr view_build_status() {
|
||||
static thread_local auto schema = [] {
|
||||
auto id = generate_legacy_id(system_distributed_keyspace::NAME, system_distributed_keyspace::VIEW_BUILD_STATUS);
|
||||
return schema_builder(system_distributed_keyspace::NAME, system_distributed_keyspace::VIEW_BUILD_STATUS, std::experimental::make_optional(id))
|
||||
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
|
||||
.with_column("view_name", utf8_type, column_kind::partition_key)
|
||||
.with_column("host_id", uuid_type, column_kind::clustering_key)
|
||||
.with_column("status", utf8_type)
|
||||
.with_version(system_keyspace::generate_schema_version(id))
|
||||
.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
static std::vector<schema_ptr> all_tables() {
|
||||
return {
|
||||
view_build_status(),
|
||||
};
|
||||
}
|
||||
|
||||
system_distributed_keyspace::system_distributed_keyspace(cql3::query_processor& qp, service::migration_manager& mm)
|
||||
: _qp(qp)
|
||||
, _mm(mm) {
|
||||
}
|
||||
|
||||
future<> system_distributed_keyspace::start() {
|
||||
if (engine().cpu_id() != 0) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
static auto ignore_existing = [] (seastar::noncopyable_function<future<>()> func) {
|
||||
return futurize_apply(std::move(func)).handle_exception_type([] (exceptions::already_exists_exception& ignored) { });
|
||||
};
|
||||
|
||||
// We use min_timestamp so that the default keyspace metadata will lose with any manual adjustments.
|
||||
// See issue #2129.
|
||||
return ignore_existing([this] {
|
||||
auto ksm = keyspace_metadata::new_keyspace(
|
||||
NAME,
|
||||
"org.apache.cassandra.locator.SimpleStrategy",
|
||||
{{"replication_factor", "3"}},
|
||||
true);
|
||||
return _mm.announce_new_keyspace(ksm, api::min_timestamp, false);
|
||||
}).then([this] {
|
||||
return do_with(all_tables(), [this] (std::vector<schema_ptr>& tables) {
|
||||
return do_for_each(tables, [this] (schema_ptr table) {
|
||||
return ignore_existing([this, table = std::move(table)] {
|
||||
return _mm.announce_new_column_family(std::move(table), false);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> system_distributed_keyspace::stop() {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<std::unordered_map<utils::UUID, sstring>> system_distributed_keyspace::view_status(sstring ks_name, sstring view_name) const {
|
||||
return _qp.process(
|
||||
sprint("SELECT host_id, status FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS),
|
||||
db::consistency_level::ONE,
|
||||
{ std::move(ks_name), std::move(view_name) },
|
||||
false).then([this] (::shared_ptr<cql3::untyped_result_set> cql_result) {
|
||||
return boost::copy_range<std::unordered_map<utils::UUID, sstring>>(*cql_result
|
||||
| boost::adaptors::transformed([] (const cql3::untyped_result_set::row& row) {
|
||||
auto host_id = row.get_as<utils::UUID>("host_id");
|
||||
auto status = row.get_as<sstring>("status");
|
||||
return std::pair(std::move(host_id), std::move(status));
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
future<> system_distributed_keyspace::start_view_build(sstring ks_name, sstring view_name) const {
|
||||
return db::system_keyspace::get_local_host_id().then([this, ks_name = std::move(ks_name), view_name = std::move(view_name)] (utils::UUID host_id) {
|
||||
return _qp.process(
|
||||
sprint("INSERT INTO %s.%s (keyspace_name, view_name, host_id, status) VALUES (?, ?, ?, ?)", NAME, VIEW_BUILD_STATUS),
|
||||
db::consistency_level::ONE,
|
||||
{ std::move(ks_name), std::move(view_name), std::move(host_id), "STARTED" },
|
||||
false).discard_result();
|
||||
});
|
||||
}
|
||||
|
||||
future<> system_distributed_keyspace::finish_view_build(sstring ks_name, sstring view_name) const {
|
||||
return db::system_keyspace::get_local_host_id().then([this, ks_name = std::move(ks_name), view_name = std::move(view_name)] (utils::UUID host_id) {
|
||||
return _qp.process(
|
||||
sprint("UPDATE %s.%s SET status = ? WHERE keyspace_name = ? AND view_name = ? AND host_id = ?", NAME, VIEW_BUILD_STATUS),
|
||||
db::consistency_level::ONE,
|
||||
{ "SUCCESS", std::move(ks_name), std::move(view_name), std::move(host_id) },
|
||||
false).discard_result();
|
||||
});
|
||||
}
|
||||
|
||||
future<> system_distributed_keyspace::remove_view(sstring ks_name, sstring view_name) const {
|
||||
return _qp.process(
|
||||
sprint("DELETE FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS),
|
||||
db::consistency_level::ONE,
|
||||
{ std::move(ks_name), std::move(view_name) },
|
||||
false).discard_result();
|
||||
}
|
||||
|
||||
}
|
||||
58
db/system_distributed_keyspace.hh
Normal file
58
db/system_distributed_keyspace.hh
Normal file
@@ -0,0 +1,58 @@
|
||||
/*
|
||||
* Copyright (C) 2018 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "bytes.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "schema.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "utils/UUID.hh"
|
||||
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
|
||||
#include <unordered_map>
|
||||
|
||||
namespace db {
|
||||
|
||||
class system_distributed_keyspace {
|
||||
public:
|
||||
static constexpr auto NAME = "system_distributed";
|
||||
static constexpr auto VIEW_BUILD_STATUS = "view_build_status";
|
||||
|
||||
private:
|
||||
cql3::query_processor& _qp;
|
||||
service::migration_manager& _mm;
|
||||
|
||||
public:
|
||||
system_distributed_keyspace(cql3::query_processor&, service::migration_manager&);
|
||||
|
||||
future<> start();
|
||||
future<> stop();
|
||||
|
||||
future<std::unordered_map<utils::UUID, sstring>> view_status(sstring ks_name, sstring view_name) const;
|
||||
future<> start_view_build(sstring ks_name, sstring view_name) const;
|
||||
future<> finish_view_build(sstring ks_name, sstring view_name) const;
|
||||
future<> remove_view(sstring ks_name, sstring view_name) const;
|
||||
};
|
||||
|
||||
}
|
||||
@@ -74,6 +74,7 @@
|
||||
#include "db/size_estimates_virtual_reader.hh"
|
||||
#include "db/timeout_clock.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
#include "db/view/build_progress_virtual_reader.hh"
|
||||
#include "db/schema_tables.hh"
|
||||
|
||||
using days = std::chrono::duration<int, std::ratio<24 * 3600>>;
|
||||
@@ -642,6 +643,22 @@ schema_ptr built_views() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr scylla_views_builds_in_progress() {
|
||||
static thread_local auto schema = [] {
|
||||
auto id = generate_legacy_id(NAME, SCYLLA_VIEWS_BUILDS_IN_PROGRESS);
|
||||
return schema_builder(NAME, SCYLLA_VIEWS_BUILDS_IN_PROGRESS, stdx::make_optional(id))
|
||||
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
|
||||
.with_column("view_name", utf8_type, column_kind::clustering_key)
|
||||
.with_column("cpu_id", int32_type, column_kind::clustering_key)
|
||||
.with_column("next_token", utf8_type)
|
||||
.with_column("generation_number", int32_type)
|
||||
.with_column("first_token", utf8_type)
|
||||
.with_version(generate_schema_version(id))
|
||||
.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
} //</v3>
|
||||
|
||||
namespace legacy {
|
||||
@@ -1541,7 +1558,8 @@ std::vector<schema_ptr> all_tables() {
|
||||
r.insert(r.end(), { built_indexes(), hints(), batchlog(), paxos(), local(),
|
||||
peers(), peer_events(), range_xfers(),
|
||||
compactions_in_progress(), compaction_history(),
|
||||
sstable_activity(), size_estimates(),
|
||||
sstable_activity(), size_estimates(), v3::views_builds_in_progress(), v3::built_views(),
|
||||
v3::scylla_views_builds_in_progress(),
|
||||
});
|
||||
// legacy schema
|
||||
r.insert(r.end(), {
|
||||
@@ -1558,10 +1576,14 @@ static void maybe_add_virtual_reader(schema_ptr s, database& db) {
|
||||
if (s.get() == size_estimates().get()) {
|
||||
db.find_column_family(s).set_virtual_reader(mutation_source(db::size_estimates::virtual_reader()));
|
||||
}
|
||||
if (s.get() == v3::views_builds_in_progress().get()) {
|
||||
db.find_column_family(s).set_virtual_reader(mutation_source(db::view::build_progress_virtual_reader(db)));
|
||||
}
|
||||
}
|
||||
|
||||
static bool maybe_write_in_user_memory(schema_ptr s, database& db) {
|
||||
return (s.get() == batchlog().get());
|
||||
return (s.get() == batchlog().get())
|
||||
|| s == v3::scylla_views_builds_in_progress();
|
||||
}
|
||||
|
||||
void make(database& db, bool durable, bool volatile_testing_only) {
|
||||
@@ -1783,6 +1805,85 @@ mutation make_size_estimates_mutation(const sstring& ks, std::vector<range_estim
|
||||
return m_to_apply;
|
||||
}
|
||||
|
||||
future<> register_view_for_building(sstring ks_name, sstring view_name, const dht::token& token) {
|
||||
sstring req = sprint("INSERT INTO system.%s (keyspace_name, view_name, generation_number, cpu_id, first_token) VALUES (?, ?, ?, ?, ?)",
|
||||
v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS);
|
||||
return execute_cql(
|
||||
std::move(req),
|
||||
std::move(ks_name),
|
||||
std::move(view_name),
|
||||
0,
|
||||
int32_t(engine().cpu_id()),
|
||||
dht::global_partitioner().to_sstring(token)).discard_result();
|
||||
}
|
||||
|
||||
future<> update_view_build_progress(sstring ks_name, sstring view_name, const dht::token& token) {
|
||||
sstring req = sprint("INSERT INTO system.%s (keyspace_name, view_name, next_token, cpu_id) VALUES (?, ?, ?, ?)",
|
||||
v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS);
|
||||
return execute_cql(
|
||||
std::move(req),
|
||||
std::move(ks_name),
|
||||
std::move(view_name),
|
||||
dht::global_partitioner().to_sstring(token),
|
||||
int32_t(engine().cpu_id())).discard_result();
|
||||
}
|
||||
|
||||
future<> remove_view_build_progress_across_all_shards(sstring ks_name, sstring view_name) {
|
||||
return execute_cql(
|
||||
sprint("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ?", v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS),
|
||||
std::move(ks_name),
|
||||
std::move(view_name)).discard_result();
|
||||
}
|
||||
|
||||
future<> mark_view_as_built(sstring ks_name, sstring view_name) {
|
||||
return execute_cql(
|
||||
sprint("INSERT INTO system.%s (keyspace_name, view_name) VALUES (?, ?)", v3::BUILT_VIEWS),
|
||||
std::move(ks_name),
|
||||
std::move(view_name)).discard_result();
|
||||
}
|
||||
|
||||
future<> remove_built_view(sstring ks_name, sstring view_name) {
|
||||
return execute_cql(
|
||||
sprint("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ?", v3::BUILT_VIEWS),
|
||||
std::move(ks_name),
|
||||
std::move(view_name)).discard_result();
|
||||
}
|
||||
|
||||
future<std::vector<view_name>> load_built_views() {
|
||||
return execute_cql(sprint("SELECT * FROM system.%s", v3::BUILT_VIEWS)).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
|
||||
return boost::copy_range<std::vector<view_name>>(*cql_result
|
||||
| boost::adaptors::transformed([] (const cql3::untyped_result_set::row& row) {
|
||||
auto ks_name = row.get_as<sstring>("keyspace_name");
|
||||
auto cf_name = row.get_as<sstring>("view_name");
|
||||
return std::pair(std::move(ks_name), std::move(cf_name));
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
future<std::vector<view_build_progress>> load_view_build_progress() {
|
||||
return execute_cql(sprint("SELECT keyspace_name, view_name, first_token, next_token, cpu_id FROM system.%s",
|
||||
v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS)).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
|
||||
std::vector<view_build_progress> progress;
|
||||
for (auto& row : *cql_result) {
|
||||
auto ks_name = row.get_as<sstring>("keyspace_name");
|
||||
auto cf_name = row.get_as<sstring>("view_name");
|
||||
auto first_token = dht::global_partitioner().from_sstring(row.get_as<sstring>("first_token"));
|
||||
auto next_token_sstring = row.get_opt<sstring>("next_token");
|
||||
std::optional<dht::token> next_token;
|
||||
if (next_token_sstring) {
|
||||
next_token = dht::global_partitioner().from_sstring(std::move(next_token_sstring).value());
|
||||
}
|
||||
auto cpu_id = row.get_as<int32_t>("cpu_id");
|
||||
progress.emplace_back(view_build_progress{
|
||||
view_name(std::move(ks_name), std::move(cf_name)),
|
||||
std::move(first_token),
|
||||
std::move(next_token),
|
||||
static_cast<shard_id>(cpu_id)});
|
||||
}
|
||||
return progress;
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace system_keyspace
|
||||
|
||||
sstring system_keyspace_name() {
|
||||
|
||||
@@ -40,8 +40,10 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <optional>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
#include "schema.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
@@ -99,6 +101,7 @@ static constexpr auto SIZE_ESTIMATES = "size_estimates";
|
||||
static constexpr auto AVAILABLE_RANGES = "available_ranges";
|
||||
static constexpr auto VIEWS_BUILDS_IN_PROGRESS = "views_builds_in_progress";
|
||||
static constexpr auto BUILT_VIEWS = "built_views";
|
||||
static constexpr auto SCYLLA_VIEWS_BUILDS_IN_PROGRESS = "scylla_views_builds_in_progress";
|
||||
}
|
||||
|
||||
namespace legacy {
|
||||
@@ -122,6 +125,14 @@ struct range_estimates {
|
||||
int64_t mean_partition_size;
|
||||
};
|
||||
|
||||
using view_name = std::pair<sstring, sstring>;
|
||||
struct view_build_progress {
|
||||
view_name view;
|
||||
dht::token first_token;
|
||||
std::optional<dht::token> next_token;
|
||||
shard_id cpu_id;
|
||||
};
|
||||
|
||||
extern schema_ptr hints();
|
||||
extern schema_ptr batchlog();
|
||||
extern schema_ptr built_indexes(); // TODO (from Cassandra): make private
|
||||
@@ -651,5 +662,13 @@ future<> set_bootstrap_state(bootstrap_state state);
|
||||
*/
|
||||
mutation make_size_estimates_mutation(const sstring& ks, std::vector<range_estimates> estimates);
|
||||
|
||||
future<> register_view_for_building(sstring ks_name, sstring view_name, const dht::token& token);
|
||||
future<> update_view_build_progress(sstring ks_name, sstring view_name, const dht::token& token);
|
||||
future<> remove_view_build_progress_across_all_shards(sstring ks_name, sstring view_name);
|
||||
future<> mark_view_as_built(sstring ks_name, sstring view_name);
|
||||
future<> remove_built_view(sstring ks_name, sstring view_name);
|
||||
future<std::vector<view_name>> load_built_views();
|
||||
future<std::vector<view_build_progress>> load_view_build_progress();
|
||||
|
||||
} // namespace system_keyspace
|
||||
} // namespace db
|
||||
|
||||
@@ -27,6 +27,6 @@
|
||||
|
||||
namespace db {
|
||||
using timeout_clock = seastar::lowres_clock;
|
||||
using timeout_semaphore = basic_semaphore<default_timeout_exception_factory, timeout_clock>;
|
||||
using timeout_semaphore = seastar::basic_semaphore<seastar::default_timeout_exception_factory, timeout_clock>;
|
||||
static constexpr timeout_clock::time_point no_timeout = timeout_clock::time_point::max();
|
||||
}
|
||||
|
||||
195
db/view/build_progress_virtual_reader.hh
Normal file
195
db/view/build_progress_virtual_reader.hh
Normal file
@@ -0,0 +1,195 @@
|
||||
/*
|
||||
* Copyright (C) 2018 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "database.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "db/timeout_clock.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "flat_mutation_reader.hh"
|
||||
#include "mutation_fragment.hh"
|
||||
#include "mutation_reader.hh"
|
||||
#include "query-request.hh"
|
||||
#include "schema.hh"
|
||||
#include "tracing/tracing.hh"
|
||||
|
||||
#include <boost/range/iterator_range.hpp>
|
||||
|
||||
#include <iterator>
|
||||
#include <memory>
|
||||
|
||||
namespace db::view {
|
||||
|
||||
// Allows a user to query the views_builds_in_progress system table
|
||||
// in terms of the scylla_views_builds_in_progress one, which is
|
||||
// a superset of the former. When querying, we don't have to adjust
|
||||
// the clustering key, but we have to adjust the requested regular
|
||||
// columns. When reading the results from the scylla_views_builds_in_progress
|
||||
// table, we adjust the clustering key (we shed the cpu_id column) and map
|
||||
// back the regular columns.
|
||||
class build_progress_virtual_reader {
|
||||
database& _db;
|
||||
|
||||
struct build_progress_reader : flat_mutation_reader::impl {
|
||||
column_id _scylla_next_token_col;
|
||||
column_id _scylla_generation_number_col;
|
||||
column_id _legacy_last_token_col;
|
||||
column_id _legacy_generation_number_col;
|
||||
const query::partition_slice& _legacy_slice;
|
||||
query::partition_slice _slice;
|
||||
flat_mutation_reader _underlying;
|
||||
|
||||
build_progress_reader(
|
||||
schema_ptr legacy_schema,
|
||||
column_family& scylla_views_build_progress,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
: flat_mutation_reader::impl(std::move(legacy_schema))
|
||||
, _scylla_next_token_col(scylla_views_build_progress.schema()->get_column_definition("next_token")->id)
|
||||
, _scylla_generation_number_col(scylla_views_build_progress.schema()->get_column_definition("generation_number")->id)
|
||||
, _legacy_last_token_col(_schema->get_column_definition("last_token")->id)
|
||||
, _legacy_generation_number_col(_schema->get_column_definition("generation_number")->id)
|
||||
, _legacy_slice(slice)
|
||||
, _slice(adjust_partition_slice())
|
||||
, _underlying(scylla_views_build_progress.make_reader(
|
||||
scylla_views_build_progress.schema(),
|
||||
range,
|
||||
slice,
|
||||
pc,
|
||||
std::move(trace_state),
|
||||
fwd,
|
||||
fwd_mr)) {
|
||||
}
|
||||
|
||||
const schema& underlying_schema() const {
|
||||
return *_underlying.schema();
|
||||
}
|
||||
|
||||
query::partition_slice adjust_partition_slice() {
|
||||
auto slice = _legacy_slice;
|
||||
std::vector<column_id> adjusted_columns;
|
||||
for (auto col_id : slice.regular_columns) {
|
||||
if (col_id == _legacy_last_token_col) {
|
||||
adjusted_columns.push_back(_scylla_next_token_col);
|
||||
} else if (col_id == _legacy_generation_number_col) {
|
||||
adjusted_columns.push_back(_scylla_generation_number_col);
|
||||
}
|
||||
}
|
||||
slice.regular_columns = std::move(adjusted_columns);
|
||||
return slice;
|
||||
}
|
||||
|
||||
clustering_key adjust_ckey(clustering_key& ck) {
|
||||
if (ck.size(underlying_schema()) < 3) {
|
||||
return std::move(ck);
|
||||
}
|
||||
// Drop the cpu_id from the clustering key
|
||||
auto end = ck.begin(*_schema);
|
||||
std::advance(end, 1);
|
||||
auto r = boost::make_iterator_range(ck.begin(*_schema), std::move(end));
|
||||
return clustering_key_prefix::from_exploded(r);
|
||||
}
|
||||
|
||||
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
|
||||
return _underlying.fill_buffer(timeout).then([this] {
|
||||
_end_of_stream = _underlying.is_end_of_stream();
|
||||
while (!_underlying.is_buffer_empty()) {
|
||||
auto mf = _underlying.pop_mutation_fragment();
|
||||
if (mf.is_clustering_row()) {
|
||||
auto scylla_in_progress_row = std::move(mf).as_clustering_row();
|
||||
auto legacy_in_progress_row = row();
|
||||
// Drop the first_token from the regular columns
|
||||
scylla_in_progress_row.cells().for_each_cell([&, this] (column_id id, atomic_cell_or_collection& c) {
|
||||
if (id == _scylla_next_token_col) {
|
||||
legacy_in_progress_row.append_cell(_legacy_last_token_col, std::move(c));
|
||||
} else if (id == _scylla_generation_number_col) {
|
||||
legacy_in_progress_row.append_cell(_legacy_generation_number_col, std::move(c));
|
||||
}
|
||||
});
|
||||
mf = clustering_row(
|
||||
adjust_ckey(scylla_in_progress_row.key()),
|
||||
std::move(scylla_in_progress_row.tomb()),
|
||||
std::move(scylla_in_progress_row.marker()),
|
||||
std::move(legacy_in_progress_row));
|
||||
} else if (mf.is_range_tombstone()) {
|
||||
auto scylla_in_progress_rt = std::move(mf).as_range_tombstone();
|
||||
mf = range_tombstone(
|
||||
adjust_ckey(scylla_in_progress_rt.start),
|
||||
scylla_in_progress_rt.start_kind,
|
||||
scylla_in_progress_rt.end,
|
||||
scylla_in_progress_rt.end_kind,
|
||||
scylla_in_progress_rt.tomb);
|
||||
}
|
||||
push_mutation_fragment(std::move(mf));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
virtual void next_partition() override {
|
||||
_end_of_stream = false;
|
||||
clear_buffer_to_next_partition();
|
||||
if (is_buffer_empty()) {
|
||||
_underlying.next_partition();
|
||||
}
|
||||
}
|
||||
|
||||
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
|
||||
clear_buffer();
|
||||
_end_of_stream = false;
|
||||
return _underlying.fast_forward_to(pr, timeout);
|
||||
}
|
||||
|
||||
virtual future<> fast_forward_to(position_range range, db::timeout_clock::time_point timeout) override {
|
||||
forward_buffer_to(range.start());
|
||||
_end_of_stream = false;
|
||||
return _underlying.fast_forward_to(std::move(range), timeout);
|
||||
}
|
||||
};
|
||||
|
||||
public:
|
||||
build_progress_virtual_reader(database& db)
|
||||
: _db(db) {
|
||||
}
|
||||
|
||||
flat_mutation_reader operator()(
|
||||
schema_ptr s,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return flat_mutation_reader(std::make_unique<build_progress_reader>(
|
||||
std::move(s),
|
||||
_db.find_column_family(s->ks_name(), system_keyspace::v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS),
|
||||
range,
|
||||
slice,
|
||||
pc,
|
||||
std::move(trace_state),
|
||||
fwd,
|
||||
fwd_mr));
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
1030
db/view/view.cc
1030
db/view/view.cc
File diff suppressed because it is too large
Load Diff
@@ -33,6 +33,13 @@ namespace db {
|
||||
|
||||
namespace view {
|
||||
|
||||
struct stats {
|
||||
int64_t view_updates_pushed_local = 0;
|
||||
int64_t view_updates_pushed_remote = 0;
|
||||
int64_t view_updates_failed_local = 0;
|
||||
int64_t view_updates_failed_remote = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* Whether the view filter considers the specified partition key.
|
||||
*
|
||||
@@ -92,8 +99,7 @@ query::clustering_row_ranges calculate_affected_clustering_ranges(
|
||||
const mutation_partition& mp,
|
||||
const std::vector<view_ptr>& views);
|
||||
|
||||
void mutate_MV(const dht::token& base_token,
|
||||
std::vector<mutation> mutations);
|
||||
future<> mutate_MV(const dht::token& base_token, std::vector<mutation> mutations, db::view::stats& stats);
|
||||
|
||||
}
|
||||
|
||||
|
||||
197
db/view/view_builder.hh
Normal file
197
db/view/view_builder.hh
Normal file
@@ -0,0 +1,197 @@
|
||||
/*
|
||||
* Copyright (C) 2018 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "database_fwd.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "keys.hh"
|
||||
#include "query-request.hh"
|
||||
#include "service/migration_listener.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "sstables/sstable_set.hh"
|
||||
#include "utils/exponential_backoff_retry.hh"
|
||||
#include "utils/serialized_action.hh"
|
||||
#include "utils/UUID.hh"
|
||||
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/lowres_clock.hh>
|
||||
#include <seastar/core/semaphore.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/core/shared_future.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
|
||||
#include <optional>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
namespace db::view {
|
||||
|
||||
/**
|
||||
* The view_builder is a sharded service responsible for building all defined materialized views.
|
||||
* This process entails walking over the existing data in a given base table, and using it to
|
||||
* calculate and insert the respective entries for one or more views.
|
||||
*
|
||||
* We employ a flat_mutation_reader for each base table for which we're building views.
|
||||
*
|
||||
* We aim to be resource-conscious. On a given shard, at any given moment, we consume at most
|
||||
* from one reader. We also strive for fairness, in that each build step inserts entries for
|
||||
* the views of a different base. Each build step reads and generates updates for batch_size rows.
|
||||
*
|
||||
* We lack a controller, which could potentially allow us to go faster (to execute multiple steps at
|
||||
* the same time, or consume more rows per batch), and also which would apply backpressure, so we
|
||||
* could, for example, delay executing a build step.
|
||||
*
|
||||
* View building is necessarily a sharded process. That means that on restart, if the number of shards
|
||||
* has changed, we need to calculate the most conservative token range that has been built, and build
|
||||
* the remainder.
|
||||
*
|
||||
* Interaction with the system tables:
|
||||
* - When we start building a view, we add an entry to the scylla_views_builds_in_progress
|
||||
* system table. If the node restarts at this point, we'll consider these newly inserted
|
||||
* views as having made no progress, and we'll treat them as new views;
|
||||
* - When we finish a build step, we update the progress of the views that we built during
|
||||
* this step by writing the next token to the scylla_views_builds_in_progress table. If
|
||||
* the node restarts here, we'll start building the views at the token in the next_token column.
|
||||
* - When we finish building a view, we mark it as completed in the built views system table, and
|
||||
* remove it from the in-progress system table. Under failure, the following can happen:
|
||||
* * When we fail to mark the view as built, we'll redo the last step upon node reboot;
|
||||
* * When we fail to delete the in-progress record, upon reboot we'll remove this record.
|
||||
* A view is marked as completed only when all shards have finished their share of the work, that is,
|
||||
* if a view is not built, then all shards will still have an entry in the in-progress system table,
|
||||
* - A view that a shard finished building, but not all other shards, remains in the in-progress system
|
||||
* table, with first_token == next_token.
|
||||
* Interaction with the distributed system table (view_build_status):
|
||||
* - When we start building a view, we mark the view build as being in-progress;
|
||||
* - When we finish building a view, we mark the view as being built. Upon failure,
|
||||
* we ensure that if the view is in the in-progress system table, then it may not
|
||||
* have been written to this table. We don't load the built views from this table
|
||||
* when starting. When starting, the following happens:
|
||||
* * If the view is in the system.built_views table and not the in-progress
|
||||
* system table, then it will be in view_build_status;
|
||||
* * If the view is in the system.built_views table and not in this one, it
|
||||
* will still be in the in-progress system table - we detect this and mark
|
||||
* it as built in this table too, keeping the invariant;
|
||||
* * If the view is in this table but not in system.built_views, then it will
|
||||
* also be in the in-progress system table - we don't detect this and will
|
||||
* redo the missing step, for simplicity.
|
||||
*/
|
||||
class view_builder final : public service::migration_listener::only_view_notifications, public seastar::peering_sharded_service<view_builder> {
|
||||
/**
|
||||
* Keeps track of the build progress for a particular view.
|
||||
* When the view is built, next_token == first_token.
|
||||
*/
|
||||
struct view_build_status final {
|
||||
view_ptr view;
|
||||
dht::token first_token;
|
||||
std::optional<dht::token> next_token;
|
||||
};
|
||||
|
||||
/**
|
||||
* Keeps track of the build progress for all the views of a particular
|
||||
* base table. Each execution of the build step comprises a query of
|
||||
* the base table for the selected range.
|
||||
*
|
||||
* We pin the set of sstables that potentially contain data that should be added to a
|
||||
* view (they are pinned by the flat_mutation_reader). Adding a view v' overwrites the
|
||||
* set of pinned sstables, regardless of there being another view v'' being built. The
|
||||
* new set will potentially contain new data already in v'', written as part of the write
|
||||
* path. We assume this case is rare and optimize for fewer disk space in detriment of
|
||||
* network bandwidth.
|
||||
*/
|
||||
struct build_step final {
|
||||
// Ensure we pin the column_family. It may happen that all views are removed,
|
||||
// and that the base table is too before we can detect it.
|
||||
lw_shared_ptr<column_family> base;
|
||||
query::partition_slice pslice;
|
||||
dht::partition_range prange;
|
||||
flat_mutation_reader reader{nullptr};
|
||||
dht::decorated_key current_key{dht::minimum_token(), partition_key::make_empty()};
|
||||
std::vector<view_build_status> build_status;
|
||||
|
||||
const dht::token& current_token() const {
|
||||
return current_key.token();
|
||||
}
|
||||
};
|
||||
|
||||
using base_to_build_step_type = std::unordered_map<utils::UUID, build_step>;
|
||||
|
||||
database& _db;
|
||||
db::system_distributed_keyspace& _sys_dist_ks;
|
||||
service::migration_manager& _mm;
|
||||
base_to_build_step_type _base_to_build_step;
|
||||
base_to_build_step_type::iterator _current_step = _base_to_build_step.end();
|
||||
serialized_action _build_step{std::bind(&view_builder::do_build_step, this)};
|
||||
// Ensures bookkeeping operations are serialized, meaning that while we execute
|
||||
// a build step we don't consider newly added or removed views. This simplifies
|
||||
// the algorithms. Also synchronizes an operation wrt. a call to stop().
|
||||
seastar::semaphore _sem{1};
|
||||
seastar::abort_source _as;
|
||||
future<> _started = make_ready_future<>();
|
||||
// Used to coordinate between shards the conclusion of the build process for a particular view.
|
||||
std::unordered_set<utils::UUID> _built_views;
|
||||
// Used for testing.
|
||||
std::unordered_map<std::pair<sstring, sstring>, seastar::shared_promise<>, utils::tuple_hash> _build_notifiers;
|
||||
|
||||
public:
|
||||
static constexpr size_t batch_size = 128;
|
||||
|
||||
public:
|
||||
view_builder(database&, db::system_distributed_keyspace&, service::migration_manager&);
|
||||
view_builder(view_builder&&) = delete;
|
||||
|
||||
/**
|
||||
* Loads the state stored in the system tables to resume building the existing views.
|
||||
* Requires that all views have been loaded from the system tables and are accessible
|
||||
* through the database, and that the commitlog has been replayed.
|
||||
*/
|
||||
future<> start();
|
||||
|
||||
/**
|
||||
* Stops the view building process.
|
||||
*/
|
||||
future<> stop();
|
||||
|
||||
virtual void on_create_view(const sstring& ks_name, const sstring& view_name) override;
|
||||
virtual void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override;
|
||||
virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) override;
|
||||
|
||||
// For tests
|
||||
future<> wait_until_built(const sstring& ks_name, const sstring& view_name, lowres_clock::time_point timeout);
|
||||
|
||||
private:
|
||||
build_step& get_or_create_build_step(utils::UUID);
|
||||
void initialize_reader_at_current_token(build_step&);
|
||||
void load_view_status(view_build_status, std::unordered_set<utils::UUID>&);
|
||||
void reshard(std::vector<std::vector<view_build_status>>, std::unordered_set<utils::UUID>&);
|
||||
future<> calculate_shard_build_step(std::vector<system_keyspace::view_name>, std::vector<system_keyspace::view_build_progress>);
|
||||
future<> add_new_view(view_ptr, build_step&);
|
||||
future<> do_build_step();
|
||||
void execute(build_step&, exponential_backoff_retry);
|
||||
future<> maybe_mark_view_as_built(view_ptr, dht::token);
|
||||
|
||||
struct consumer;
|
||||
};
|
||||
|
||||
}
|
||||
2
dist/ami/files/scylla-ami
vendored
2
dist/ami/files/scylla-ami
vendored
Submodule dist/ami/files/scylla-ami updated: 5170011fbc...6ed71a3126
1
dist/common/modprobe.d/scylla-raid0.conf
vendored
1
dist/common/modprobe.d/scylla-raid0.conf
vendored
@@ -1 +0,0 @@
|
||||
options raid0 devices_discard_performance=Y
|
||||
4
dist/common/scripts/scylla_raid_setup
vendored
4
dist/common/scripts/scylla_raid_setup
vendored
@@ -96,7 +96,9 @@ elif is_gentoo_variant; then
|
||||
emerge -uq sys-fs/mdadm sys-fs/xfsprogs
|
||||
fi
|
||||
if [ "$ID" = "ubuntu" ] && [ "$VERSION_ID" = "14.04" ]; then
|
||||
udevadm settle
|
||||
mdadm --create --verbose --force --run $RAID --level=0 -c1024 --raid-devices=$NR_DISK $DISKS
|
||||
udevadm settle
|
||||
mkfs.xfs $RAID -f
|
||||
else
|
||||
for dsk in $DISKS; do
|
||||
@@ -107,7 +109,9 @@ else
|
||||
fi
|
||||
done
|
||||
wait
|
||||
udevadm settle
|
||||
mdadm --create --verbose --force --run $RAID --level=0 -c1024 --raid-devices=$NR_DISK $DISKS
|
||||
udevadm settle
|
||||
mkfs.xfs $RAID -f -K
|
||||
fi
|
||||
if is_debian_variant; then
|
||||
|
||||
38
dist/debian/build_deb.sh
vendored
38
dist/debian/build_deb.sh
vendored
@@ -2,10 +2,11 @@
|
||||
|
||||
. /etc/os-release
|
||||
print_usage() {
|
||||
echo "build_deb.sh -target <codename> --dist --rebuild-dep"
|
||||
echo "build_deb.sh -target <codename> --dist --rebuild-dep --jobs 2"
|
||||
echo " --target target distribution codename"
|
||||
echo " --dist create a public distribution package"
|
||||
echo " --no-clean don't rebuild pbuilder tgz"
|
||||
echo " --jobs specify number of jobs"
|
||||
exit 1
|
||||
}
|
||||
install_deps() {
|
||||
@@ -19,6 +20,7 @@ install_deps() {
|
||||
DIST=0
|
||||
TARGET=
|
||||
NO_CLEAN=0
|
||||
JOBS=0
|
||||
while [ $# -gt 0 ]; do
|
||||
case "$1" in
|
||||
"--dist")
|
||||
@@ -33,6 +35,10 @@ while [ $# -gt 0 ]; do
|
||||
NO_CLEAN=1
|
||||
shift 1
|
||||
;;
|
||||
"--jobs")
|
||||
JOBS=$2
|
||||
shift 2
|
||||
;;
|
||||
*)
|
||||
print_usage
|
||||
;;
|
||||
@@ -131,7 +137,7 @@ if [ "$TARGET" = "jessie" ]; then
|
||||
sed -i -e "s/@@INSTALL_FSTRIM@@/dh_installinit --no-start --name scylla-fstrim/g" debian/rules
|
||||
sed -i -e "s/@@INSTALL_NODE_EXPORTER@@/dh_installinit --no-start --name node-exporter/g" debian/rules
|
||||
sed -i -e "s#@@COMPILER@@#/opt/scylladb/bin/g++-7#g" debian/rules
|
||||
sed -i -e "s/@@BUILD_DEPENDS@@/libsystemd-dev, scylla-gcc72-g++-7, libunwind-dev, scylla-antlr35, scylla-libthrift010-dev, scylla-antlr35-c++-dev, scylla-libboost-program-options163-dev, scylla-libboost-filesystem163-dev, scylla-libboost-system163-dev, scylla-libboost-thread163-dev, scylla-libboost-test163-dev/g" debian/control
|
||||
sed -i -e "s/@@BUILD_DEPENDS@@/libsystemd-dev, scylla-gcc73-g++-7, libunwind-dev, scylla-antlr35, scylla-libthrift010-dev, scylla-antlr35-c++-dev, scylla-libboost-program-options165-dev, scylla-libboost-filesystem165-dev, scylla-libboost-system165-dev, scylla-libboost-thread165-dev, scylla-libboost-test165-dev/g" debian/control
|
||||
sed -i -e "s/@@DEPENDS@@//g" debian/control
|
||||
sed -i -e "s#@@INSTALL@@##g" debian/scylla-server.install
|
||||
sed -i -e "s#@@HKDOTTIMER_D@@#dist/common/systemd/scylla-housekeeping-daily.timer /lib/systemd/system#g" debian/scylla-server.install
|
||||
@@ -148,7 +154,7 @@ elif [ "$TARGET" = "stretch" ]; then
|
||||
sed -i -e "s/@@INSTALL_FSTRIM@@/dh_installinit --no-start --name scylla-fstrim/g" debian/rules
|
||||
sed -i -e "s/@@INSTALL_NODE_EXPORTER@@/dh_installinit --no-start --name node-exporter/g" debian/rules
|
||||
sed -i -e "s#@@COMPILER@@#/opt/scylladb/bin/g++-7#g" debian/rules
|
||||
sed -i -e "s/@@BUILD_DEPENDS@@/libsystemd-dev, scylla-gcc72-g++-7, libunwind-dev, antlr3, scylla-libthrift010-dev, scylla-antlr35-c++-dev, libboost-program-options1.62-dev, libboost-filesystem1.62-dev, libboost-system1.62-dev, libboost-thread1.62-dev, libboost-test1.62-dev/g" debian/control
|
||||
sed -i -e "s/@@BUILD_DEPENDS@@/libsystemd-dev, scylla-gcc73-g++-7, libunwind-dev, antlr3, scylla-libthrift010-dev, scylla-antlr35-c++-dev, libboost-program-options1.62-dev, libboost-filesystem1.62-dev, libboost-system1.62-dev, libboost-thread1.62-dev, libboost-test1.62-dev/g" debian/control
|
||||
sed -i -e "s/@@DEPENDS@@//g" debian/control
|
||||
sed -i -e "s#@@INSTALL@@##g" debian/scylla-server.install
|
||||
sed -i -e "s#@@HKDOTTIMER_D@@#dist/common/systemd/scylla-housekeeping-daily.timer /lib/systemd/system#g" debian/scylla-server.install
|
||||
@@ -166,7 +172,7 @@ elif [ "$TARGET" = "trusty" ]; then
|
||||
sed -i -e "s/@@INSTALL_FSTRIM@@//g" debian/rules
|
||||
sed -i -e "s/@@INSTALL_NODE_EXPORTER@@//g" debian/rules
|
||||
sed -i -e "s#@@COMPILER@@#/opt/scylladb/bin/g++-7#g" debian/rules
|
||||
sed -i -e "s/@@BUILD_DEPENDS@@/scylla-gcc72-g++-7, libunwind8-dev, scylla-antlr35, scylla-libthrift010-dev, scylla-antlr35-c++-dev, scylla-libboost-program-options163-dev, scylla-libboost-filesystem163-dev, scylla-libboost-system163-dev, scylla-libboost-thread163-dev, scylla-libboost-test163-dev/g" debian/control
|
||||
sed -i -e "s/@@BUILD_DEPENDS@@/scylla-gcc73-g++-7, libunwind8-dev, scylla-antlr35, scylla-libthrift010-dev, scylla-antlr35-c++-dev, scylla-libboost-program-options165-dev, scylla-libboost-filesystem165-dev, scylla-libboost-system165-dev, scylla-libboost-thread165-dev, scylla-libboost-test165-dev/g" debian/control
|
||||
sed -i -e "s/@@DEPENDS@@/hugepages, num-utils/g" debian/control
|
||||
sed -i -e "s#@@INSTALL@@#dist/debian/sudoers.d/scylla etc/sudoers.d#g" debian/scylla-server.install
|
||||
sed -i -e "s#@@HKDOTTIMER_D@@##g" debian/scylla-server.install
|
||||
@@ -183,7 +189,7 @@ elif [ "$TARGET" = "xenial" ]; then
|
||||
sed -i -e "s/@@INSTALL_FSTRIM@@/dh_installinit --no-start --name scylla-fstrim/g" debian/rules
|
||||
sed -i -e "s/@@INSTALL_NODE_EXPORTER@@/dh_installinit --no-start --name node-exporter/g" debian/rules
|
||||
sed -i -e "s#@@COMPILER@@#/opt/scylladb/bin/g++-7#g" debian/rules
|
||||
sed -i -e "s/@@BUILD_DEPENDS@@/libsystemd-dev, scylla-gcc72-g++-7, libunwind-dev, antlr3, scylla-libthrift010-dev, scylla-antlr35-c++-dev, scylla-libboost-program-options163-dev, scylla-libboost-filesystem163-dev, scylla-libboost-system163-dev, scylla-libboost-thread163-dev, scylla-libboost-test163-dev/g" debian/control
|
||||
sed -i -e "s/@@BUILD_DEPENDS@@/libsystemd-dev, scylla-gcc73-g++-7, libunwind-dev, antlr3, scylla-libthrift010-dev, scylla-antlr35-c++-dev, scylla-libboost-program-options165-dev, scylla-libboost-filesystem165-dev, scylla-libboost-system165-dev, scylla-libboost-thread165-dev, scylla-libboost-test165-dev/g" debian/control
|
||||
sed -i -e "s/@@DEPENDS@@/hugepages, /g" debian/control
|
||||
sed -i -e "s#@@INSTALL@@##g" debian/scylla-server.install
|
||||
sed -i -e "s#@@HKDOTTIMER_D@@#dist/common/systemd/scylla-housekeeping-daily.timer /lib/systemd/system#g" debian/scylla-server.install
|
||||
@@ -200,7 +206,7 @@ elif [ "$TARGET" = "bionic" ]; then
|
||||
sed -i -e "s/@@INSTALL_FSTRIM@@/dh_installinit --no-start --name scylla-fstrim/g" debian/rules
|
||||
sed -i -e "s/@@INSTALL_NODE_EXPORTER@@/dh_installinit --no-start --name node-exporter/g" debian/rules
|
||||
sed -i -e "s#@@COMPILER@@#g++-7#g" debian/rules
|
||||
sed -i -e "s/@@BUILD_DEPENDS@@/libsystemd-dev, g++, libunwind-dev, antlr3, scylla-libthrift010-dev, scylla-antlr35-c++-dev, libboost-program-options-dev, libboost-filesystem-dev, libboost-system-dev, libboost-thread-dev, libboost-test-dev/g" debian/control
|
||||
sed -i -e "s/@@BUILD_DEPENDS@@/libsystemd-dev, scylla-gcc73-g++-7, libunwind-dev, antlr3, scylla-libthrift010-dev, scylla-antlr35-c++-dev, scylla-libboost-program-options165-dev, scylla-libboost-filesystem165-dev, scylla-libboost-system165-dev, scylla-libboost-thread165-dev, scylla-libboost-test165-dev/g" debian/control
|
||||
sed -i -e "s/@@DEPENDS@@/hugepages, /g" debian/control
|
||||
sed -i -e "s#@@INSTALL@@##g" debian/scylla-server.install
|
||||
sed -i -e "s#@@HKDOTTIMER_D@@#dist/common/systemd/scylla-housekeeping-daily.timer /lib/systemd/system#g" debian/scylla-server.install
|
||||
@@ -237,6 +243,9 @@ fi
|
||||
if [ "$TARGET" != "trusty" ]; then
|
||||
cp dist/common/systemd/scylla-server.service.in debian/scylla-server.service
|
||||
sed -i -e "s#@@SYSCONFDIR@@#/etc/default#g" debian/scylla-server.service
|
||||
if [ "$TARGET" = "jessie" ]; then
|
||||
sed -i -e "s#AmbientCapabilities=CAP_SYS_NICE##g" debian/scylla-server.service
|
||||
fi
|
||||
cp dist/common/systemd/scylla-housekeeping-daily.service.in debian/scylla-server.scylla-housekeeping-daily.service
|
||||
sed -i -e "s#@@REPOFILES@@#'/etc/apt/sources.list.d/scylla*.list'#g" debian/scylla-server.scylla-housekeeping-daily.service
|
||||
cp dist/common/systemd/scylla-housekeeping-restart.service.in debian/scylla-server.scylla-housekeeping-restart.service
|
||||
@@ -245,16 +254,19 @@ if [ "$TARGET" != "trusty" ]; then
|
||||
cp dist/common/systemd/node-exporter.service debian/scylla-server.node-exporter.service
|
||||
fi
|
||||
|
||||
cp ./dist/debian/pbuilderrc ~/.pbuilderrc
|
||||
sudo cp ./dist/debian/pbuilderrc ~root/.pbuilderrc
|
||||
if [ $NO_CLEAN -eq 0 ]; then
|
||||
sudo rm -fv /var/cache/pbuilder/scylla-server-$TARGET.tgz
|
||||
sudo -E DIST=$TARGET /usr/sbin/pbuilder clean
|
||||
sudo -E DIST=$TARGET /usr/sbin/pbuilder create --allow-untrusted
|
||||
sudo -H DIST=$TARGET /usr/sbin/pbuilder clean
|
||||
sudo -H DIST=$TARGET /usr/sbin/pbuilder create --allow-untrusted
|
||||
fi
|
||||
sudo -E DIST=$TARGET /usr/sbin/pbuilder update --allow-untrusted
|
||||
if [ $JOBS -ne 0 ]; then
|
||||
DEB_BUILD_OPTIONS="parallel=$JOBS"
|
||||
fi
|
||||
sudo -H DIST=$TARGET /usr/sbin/pbuilder update --allow-untrusted
|
||||
if [ "$TARGET" = "trusty" ] || [ "$TARGET" = "xenial" ] || [ "$TARGET" = "yakkety" ] || [ "$TARGET" = "zesty" ] || [ "$TARGET" = "artful" ] || [ "$TARGET" = "bionic" ]; then
|
||||
sudo -E DIST=$TARGET /usr/sbin/pbuilder execute --save-after-exec dist/debian/ubuntu_enable_ppa.sh
|
||||
sudo -H DIST=$TARGET /usr/sbin/pbuilder execute --save-after-exec dist/debian/ubuntu_enable_ppa.sh
|
||||
elif [ "$TARGET" = "jessie" ] || [ "$TARGET" = "stretch" ]; then
|
||||
sudo -E DIST=$TARGET /usr/sbin/pbuilder execute --save-after-exec dist/debian/debian_install_gpgkey.sh
|
||||
sudo -H DIST=$TARGET /usr/sbin/pbuilder execute --save-after-exec dist/debian/debian_install_gpgkey.sh
|
||||
fi
|
||||
sudo -E DIST=$TARGET pdebuild --buildresult build/debs
|
||||
sudo -H DIST=$TARGET DEB_BUILD_OPTIONS=$DEB_BUILD_OPTIONS pdebuild --buildresult build/debs
|
||||
|
||||
5
dist/debian/rules.in
vendored
5
dist/debian/rules.in
vendored
@@ -1,12 +1,13 @@
|
||||
#!/usr/bin/make -f
|
||||
|
||||
export PYBUILD_DISABLE=1
|
||||
jobs := $(shell echo $$DEB_BUILD_OPTIONS | sed -r "s/.*parallel=([0-9]+).*/-j\1/")
|
||||
|
||||
override_dh_auto_configure:
|
||||
./configure.py --enable-dpdk --mode=release --static-thrift --static-boost --static-yaml-cpp --compiler=@@COMPILER@@ --cflags="-I/opt/scylladb/include -L/opt/scylladb/lib/x86-linux-gnu/" --ldflags="-Wl,-rpath=/opt/scylladb/lib"
|
||||
./configure.py --with=scylla --with=iotune --enable-dpdk --mode=release --static-thrift --static-boost --static-yaml-cpp --compiler=@@COMPILER@@ --cflags="-I/opt/scylladb/include -L/opt/scylladb/lib/x86-linux-gnu/" --ldflags="-Wl,-rpath=/opt/scylladb/lib"
|
||||
|
||||
override_dh_auto_build:
|
||||
PATH="/opt/scylladb/bin:$$PATH" ninja
|
||||
PATH="/opt/scylladb/bin:$$PATH" ninja $(jobs)
|
||||
|
||||
override_dh_auto_clean:
|
||||
rm -rf build/release seastar/build
|
||||
|
||||
2
dist/docker/redhat/Dockerfile
vendored
2
dist/docker/redhat/Dockerfile
vendored
@@ -26,7 +26,7 @@ ADD commandlineparser.py /commandlineparser.py
|
||||
ADD docker-entrypoint.py /docker-entrypoint.py
|
||||
|
||||
# Install Scylla:
|
||||
RUN curl http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo -o /etc/yum.repos.d/scylla.repo && \
|
||||
RUN curl http://downloads.scylladb.com/rpm/centos/scylla-2.2.repo -o /etc/yum.repos.d/scylla.repo && \
|
||||
yum -y install epel-release && \
|
||||
yum -y clean expire-cache && \
|
||||
yum -y update && \
|
||||
|
||||
3
dist/docker/redhat/commandlineparser.py
vendored
3
dist/docker/redhat/commandlineparser.py
vendored
@@ -9,7 +9,8 @@ def parse():
|
||||
parser.add_argument('--cpuset', default=None, help="e.g. --cpuset 0-3 for the first four CPUs")
|
||||
parser.add_argument('--smp', default=None, help="e.g --smp 2 to use two CPUs")
|
||||
parser.add_argument('--memory', default=None, help="e.g. --memory 1G to use 1 GB of RAM")
|
||||
parser.add_argument('--overprovisioned', default='0', choices=['0', '1'], help="run in overprovisioned environment")
|
||||
parser.add_argument('--overprovisioned', default=None, choices=['0', '1'],
|
||||
help="run in overprovisioned environment. By default it will run in overprovisioned mode unless --cpuset is specified")
|
||||
parser.add_argument('--listen-address', default=None, dest='listenAddress')
|
||||
parser.add_argument('--broadcast-address', default=None, dest='broadcastAddress')
|
||||
parser.add_argument('--broadcast-rpc-address', default=None, dest='broadcastRpcAddress')
|
||||
|
||||
2
dist/docker/redhat/scyllasetup.py
vendored
2
dist/docker/redhat/scyllasetup.py
vendored
@@ -53,7 +53,7 @@ class ScyllaSetup:
|
||||
args += [ "--memory %s" % self._memory ]
|
||||
if self._smp is not None:
|
||||
args += [ "--smp %s" % self._smp ]
|
||||
if self._overprovisioned == "1":
|
||||
if self._overprovisioned == "1" or (self._overprovisioned is None and self._cpuset is None):
|
||||
args += [ "--overprovisioned" ]
|
||||
|
||||
if self._listenAddress is None:
|
||||
|
||||
27
dist/redhat/scylla.spec.in
vendored
27
dist/redhat/scylla.spec.in
vendored
@@ -7,8 +7,12 @@ Group: Applications/Databases
|
||||
License: AGPLv3
|
||||
URL: http://www.scylladb.com/
|
||||
Source0: %{name}-@@VERSION@@-@@RELEASE@@.tar
|
||||
Requires: scylla-server = @@VERSION@@ scylla-jmx = @@VERSION@@ scylla-tools = @@VERSION@@ scylla-tools-core = @@VERSION@@ scylla-kernel-conf = @@VERSION@@ scylla-libgcc72 scylla-libstdc++72
|
||||
Requires: scylla-server = @@VERSION@@ scylla-jmx = @@VERSION@@ scylla-tools = @@VERSION@@ scylla-tools-core = @@VERSION@@ scylla-kernel-conf = @@VERSION@@ scylla-libgcc73 scylla-libstdc++73
|
||||
Obsoletes: scylla-server < 1.1
|
||||
Obsoletes: scylla-libgcc72
|
||||
Obsoletes: scylla-libstdc++72
|
||||
Provides: scylla-libgcc72
|
||||
Provides: scylla-libstdc++72
|
||||
|
||||
%description
|
||||
Scylla is a highly scalable, eventually consistent, distributed,
|
||||
@@ -52,7 +56,7 @@ License: AGPLv3
|
||||
URL: http://www.scylladb.com/
|
||||
BuildRequires: libaio-devel libstdc++-devel cryptopp-devel hwloc-devel numactl-devel libpciaccess-devel libxml2-devel zlib-devel thrift-devel yaml-cpp-devel yaml-cpp-static lz4-devel snappy-devel jsoncpp-devel systemd-devel xz-devel pcre-devel elfutils-libelf-devel bzip2-devel keyutils-libs-devel xfsprogs-devel make gnutls-devel systemd-devel lksctp-tools-devel protobuf-devel protobuf-compiler libunwind-devel systemtap-sdt-devel ninja-build cmake python ragel grep kernel-headers
|
||||
%{?fedora:BuildRequires: boost-devel antlr3-tool antlr3-C++-devel python3 gcc-c++ libasan libubsan python3-pyparsing dnf-yum}
|
||||
%{?rhel:BuildRequires: scylla-libstdc++72-static scylla-boost163-devel scylla-boost163-static scylla-antlr35-tool scylla-antlr35-C++-devel python34 scylla-gcc72-c++, scylla-python34-pyparsing20}
|
||||
%{?rhel:BuildRequires: scylla-libstdc++73-static scylla-boost163-devel scylla-boost163-static scylla-antlr35-tool scylla-antlr35-C++-devel python34 scylla-gcc73-c++, scylla-python34-pyparsing20}
|
||||
Requires: scylla-conf systemd-libs hwloc collectd PyYAML python-urwid pciutils pyparsing python-requests curl util-linux python-setuptools pciutils python3-pyudev mdadm xfsprogs
|
||||
%{?rhel:Requires: python34 python34-PyYAML kernel >= 3.10.0-514}
|
||||
%{?fedora:Requires: python3 python3-PyYAML}
|
||||
@@ -86,11 +90,11 @@ cflags="--cflags=${defines[*]}"
|
||||
|
||||
%define is_housekeeping_conf %( if @@HOUSEKEEPING_CONF@@; then echo "1" ; else echo "0"; fi )
|
||||
%if 0%{?fedora}
|
||||
./configure.py %{?configure_opt} --mode=release "$cflags"
|
||||
./configure.py %{?configure_opt} --with=scylla --with=iotune --mode=release "$cflags"
|
||||
%endif
|
||||
%if 0%{?rhel}
|
||||
. /etc/profile.d/scylla.sh
|
||||
python3.4 ./configure.py %{?configure_opt} --mode=release "$cflags" --static-boost --static-yaml-cpp --compiler=/opt/scylladb/bin/g++-7.2 --python python3.4 --ldflag=-Wl,-rpath=/opt/scylladb/lib64
|
||||
python3.4 ./configure.py %{?configure_opt} --with=scylla --with=iotune --mode=release "$cflags" --static-boost --static-yaml-cpp --compiler=/opt/scylladb/bin/g++-7.3 --python python3.4 --ldflag=-Wl,-rpath=/opt/scylladb/lib64
|
||||
%endif
|
||||
ninja-build %{?_smp_mflags} build/release/scylla build/release/iotune
|
||||
cp dist/common/systemd/scylla-server.service.in build/scylla-server.service
|
||||
@@ -109,9 +113,6 @@ mkdir -p $RPM_BUILD_ROOT%{_sysconfdir}/security/limits.d/
|
||||
mkdir -p $RPM_BUILD_ROOT%{_sysconfdir}/collectd.d/
|
||||
mkdir -p $RPM_BUILD_ROOT%{_sysconfdir}/scylla/
|
||||
mkdir -p $RPM_BUILD_ROOT%{_sysconfdir}/scylla.d/
|
||||
%if 0%{?rhel}
|
||||
mkdir -p $RPM_BUILD_ROOT%{_sysconfdir}/modprobe.d/
|
||||
%endif
|
||||
mkdir -p $RPM_BUILD_ROOT%{_sysctldir}/
|
||||
mkdir -p $RPM_BUILD_ROOT%{_docdir}/scylla/
|
||||
mkdir -p $RPM_BUILD_ROOT%{_unitdir}
|
||||
@@ -122,9 +123,6 @@ install -m644 dist/common/limits.d/scylla.conf $RPM_BUILD_ROOT%{_sysconfdir}/sec
|
||||
install -m644 dist/common/collectd.d/scylla.conf $RPM_BUILD_ROOT%{_sysconfdir}/collectd.d/
|
||||
install -m644 dist/common/scylla.d/*.conf $RPM_BUILD_ROOT%{_sysconfdir}/scylla.d/
|
||||
install -m644 dist/common/sysctl.d/*.conf $RPM_BUILD_ROOT%{_sysctldir}/
|
||||
%if 0%{?rhel}
|
||||
install -m644 dist/common/modprobe.d/*.conf $RPM_BUILD_ROOT%{_sysconfdir}/modprobe.d/
|
||||
%endif
|
||||
install -d -m755 $RPM_BUILD_ROOT%{_sysconfdir}/scylla
|
||||
install -m644 conf/scylla.yaml $RPM_BUILD_ROOT%{_sysconfdir}/scylla/
|
||||
install -m644 conf/cassandra-rackdc.properties $RPM_BUILD_ROOT%{_sysconfdir}/scylla/
|
||||
@@ -317,18 +315,9 @@ if Scylla is the main application on your server and you wish to optimize its la
|
||||
# We cannot use the sysctl_apply rpm macro because it is not present in 7.0
|
||||
# following is a "manual" expansion
|
||||
/usr/lib/systemd/systemd-sysctl 99-scylla-sched.conf >/dev/null 2>&1 || :
|
||||
# Write modprobe.d params when module already loaded
|
||||
%if 0%{?rhel}
|
||||
if [ -e /sys/module/raid0/parameters/devices_discard_performance ]; then
|
||||
echo Y > /sys/module/raid0/parameters/devices_discard_performance
|
||||
fi
|
||||
%endif
|
||||
|
||||
%files kernel-conf
|
||||
%defattr(-,root,root)
|
||||
%if 0%{?rhel}
|
||||
%config(noreplace) %{_sysconfdir}/modprobe.d/*.conf
|
||||
%endif
|
||||
%{_sysctldir}/*.conf
|
||||
|
||||
%changelog
|
||||
|
||||
@@ -77,10 +77,9 @@ $ docker run --name some-scylla --volume /var/lib/scylla:/var/lib/scylla -d scyl
|
||||
|
||||
## Configuring resource limits
|
||||
|
||||
Scylla utilizes all CPUs and all memory by default.
|
||||
To configure resource limits for your Docker container, you can use the `--smp`, `--memory`, and `--cpuset` command line options documented in the section "Command-line options".
|
||||
|
||||
If you run multiple Scylla instances on the same machine, it is highly recommended that you enable the `--overprovisioned` command line option, which enables certain optimizations for Scylla to run efficiently in an overprovisioned environment.
|
||||
The Scylla docker image defaults to running on overprovisioned mode and won't apply any CPU pinning optimizations, which it normally does in non-containerized environments.
|
||||
For better performance, it is recommended to configure resource limits for your Docker container using the `--smp`, `--memory`, and `--cpuset` command line options, as well as
|
||||
disabling the overprovisioned flag as documented in the section "Command-line options".
|
||||
|
||||
## Restart Scylla
|
||||
|
||||
@@ -163,12 +162,13 @@ $ docker run --name some-scylla -d scylladb/scylla --memory 4G
|
||||
### `--overprovisioned ENABLE`
|
||||
|
||||
The `--overprovisioned` command line option enables or disables optimizations for running Scylla in an overprovisioned environment.
|
||||
If no `--overprovisioned` option is specified, Scylla defaults to running with optimizations *disabled*.
|
||||
If no `--overprovisioned` option is specified, Scylla defaults to running with optimizations *enabled*. If `--overprovisioned` is
|
||||
not specified and is left at its default, specifying `--cpuset` will automatically disable `--overprovisioned`
|
||||
|
||||
For example, to enable optimizations for running in an overprovisioned environment:
|
||||
For example, to enable optimizations for running in an statically partitioned environment:
|
||||
|
||||
```console
|
||||
$ docker run --name some-scylla -d scylladb/scylla --overprovisioned 1
|
||||
$ docker run --name some-scylla -d scylladb/scylla --overprovisioned 0
|
||||
```
|
||||
|
||||
### `--cpuset CPUSET`
|
||||
|
||||
@@ -621,3 +621,37 @@ make_flat_multi_range_reader(schema_ptr s, mutation_source source, const dht::pa
|
||||
return make_flat_mutation_reader<flat_multi_range_mutation_reader>(std::move(s), std::move(source), ranges,
|
||||
slice, pc, std::move(trace_state), fwd_mr);
|
||||
}
|
||||
|
||||
flat_mutation_reader
|
||||
make_flat_mutation_reader_from_fragments(schema_ptr schema, std::deque<mutation_fragment> fragments) {
|
||||
class reader : public flat_mutation_reader::impl {
|
||||
std::deque<mutation_fragment> _fragments;
|
||||
public:
|
||||
reader(schema_ptr schema, std::deque<mutation_fragment> fragments)
|
||||
: flat_mutation_reader::impl(std::move(schema))
|
||||
, _fragments(std::move(fragments)) {
|
||||
}
|
||||
virtual future<> fill_buffer(db::timeout_clock::time_point) override {
|
||||
while (!(_end_of_stream = _fragments.empty()) && !is_buffer_full()) {
|
||||
push_mutation_fragment(std::move(_fragments.front()));
|
||||
_fragments.pop_front();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
virtual void next_partition() override {
|
||||
clear_buffer_to_next_partition();
|
||||
if (is_buffer_empty()) {
|
||||
while (!(_end_of_stream = _fragments.empty()) && !_fragments.front().is_partition_start()) {
|
||||
_fragments.pop_front();
|
||||
}
|
||||
}
|
||||
}
|
||||
virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override {
|
||||
throw std::runtime_error("This reader can't be fast forwarded to another range.");
|
||||
}
|
||||
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
|
||||
throw std::runtime_error("This reader can't be fast forwarded to another position.");
|
||||
}
|
||||
};
|
||||
return make_flat_mutation_reader<reader>(std::move(schema), std::move(fragments));
|
||||
}
|
||||
|
||||
@@ -32,6 +32,8 @@
|
||||
#include <seastar/util/gcc6-concepts.hh>
|
||||
#include "db/timeout_clock.hh"
|
||||
|
||||
#include <deque>
|
||||
|
||||
using seastar::future;
|
||||
|
||||
class mutation_source;
|
||||
@@ -487,7 +489,9 @@ flat_mutation_reader transform(flat_mutation_reader r, T t) {
|
||||
return _reader.fast_forward_to(pr);
|
||||
}
|
||||
virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override {
|
||||
throw std::bad_function_call();
|
||||
forward_buffer_to(pr.start());
|
||||
_end_of_stream = false;
|
||||
return _reader.fast_forward_to(std::move(pr), timeout);
|
||||
}
|
||||
virtual size_t buffer_size() const override {
|
||||
return flat_mutation_reader::impl::buffer_size() + _reader.buffer_size();
|
||||
@@ -553,6 +557,9 @@ make_flat_multi_range_reader(schema_ptr s, mutation_source source, const dht::pa
|
||||
tracing::trace_state_ptr trace_state = nullptr,
|
||||
flat_mutation_reader::partition_range_forwarding fwd_mr = flat_mutation_reader::partition_range_forwarding::yes);
|
||||
|
||||
flat_mutation_reader
|
||||
make_flat_mutation_reader_from_fragments(schema_ptr, std::deque<mutation_fragment>);
|
||||
|
||||
// Calls the consumer for each element of the reader's stream until end of stream
|
||||
// is reached or the consumer requests iteration to stop by returning stop_iteration::yes.
|
||||
// The consumer should accept mutation as the argument and return stop_iteration.
|
||||
|
||||
@@ -478,7 +478,8 @@ future<> gossiper::apply_state_locally(std::map<inet_address, endpoint_state> ma
|
||||
int local_generation = local_ep_state_ptr.get_heart_beat_state().get_generation();
|
||||
int remote_generation = remote_state.get_heart_beat_state().get_generation();
|
||||
logger.trace("{} local generation {}, remote generation {}", ep, local_generation, remote_generation);
|
||||
if (local_generation != 0 && remote_generation > local_generation + MAX_GENERATION_DIFFERENCE) {
|
||||
// A node was removed with nodetool removenode can have a generation of 2
|
||||
if (local_generation > 2 && remote_generation > local_generation + MAX_GENERATION_DIFFERENCE) {
|
||||
// assume some peer has corrupted memory and is broadcasting an unbelievable generation about another peer (or itself)
|
||||
logger.warn("received an invalid gossip generation for peer {}; local generation = {}, received generation = {}",
|
||||
ep, local_generation, remote_generation);
|
||||
@@ -853,6 +854,7 @@ int gossiper::get_max_endpoint_state_version(endpoint_state state) {
|
||||
|
||||
// Runs inside seastar::async context
|
||||
void gossiper::evict_from_membership(inet_address endpoint) {
|
||||
auto permit = lock_endpoint(endpoint).get0();
|
||||
_unreachable_endpoints.erase(endpoint);
|
||||
container().invoke_on_all([endpoint] (auto& g) {
|
||||
g.endpoint_state_map.erase(endpoint);
|
||||
|
||||
77
index/secondary_index.hh
Normal file
77
index/secondary_index.hh
Normal file
@@ -0,0 +1,77 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright (C) 2015 ScyllaDB
|
||||
*
|
||||
* Modified by ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "core/sstring.hh"
|
||||
#include "seastarx.hh"
|
||||
|
||||
namespace db {
|
||||
namespace index {
|
||||
|
||||
/**
|
||||
* Abstract base class for different types of secondary indexes.
|
||||
*
|
||||
* Do not extend this directly, please pick from PerColumnSecondaryIndex or PerRowSecondaryIndex
|
||||
*/
|
||||
class secondary_index {
|
||||
public:
|
||||
static const sstring custom_index_option_name;
|
||||
|
||||
/**
|
||||
* The name of the option used to specify that the index is on the collection keys.
|
||||
*/
|
||||
static const sstring index_keys_option_name;
|
||||
|
||||
/**
|
||||
* The name of the option used to specify that the index is on the collection values.
|
||||
*/
|
||||
static const sstring index_values_option_name;
|
||||
|
||||
/**
|
||||
* The name of the option used to specify that the index is on the collection (map) entries.
|
||||
*/
|
||||
static const sstring index_entries_option_name;
|
||||
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
@@ -42,6 +42,7 @@
|
||||
#include "index/secondary_index_manager.hh"
|
||||
|
||||
#include "cql3/statements/index_target.hh"
|
||||
#include "cql3/util.hh"
|
||||
#include "index/target_parser.hh"
|
||||
#include "db/query_context.hh"
|
||||
#include "schema_builder.hh"
|
||||
@@ -93,11 +94,14 @@ void secondary_index_manager::add_index(const index_metadata& im) {
|
||||
_indices.emplace(im.name(), index{index_target_name, im});
|
||||
}
|
||||
|
||||
static sstring index_table_name(const sstring& index_name) {
|
||||
return sprint("%s_index", index_name);
|
||||
}
|
||||
|
||||
view_ptr secondary_index_manager::create_view_for_index(const index_metadata& im) const {
|
||||
auto schema = _cf.schema();
|
||||
sstring index_table_name = sprint("%s_index", im.name());
|
||||
sstring index_target_name = im.options().at(cql3::statements::index_target::target_option_name);
|
||||
schema_builder builder{schema->ks_name(), index_table_name};
|
||||
schema_builder builder{schema->ks_name(), index_table_name(im.name())};
|
||||
auto target = target_parser::parse(schema, im);
|
||||
const auto* index_target = std::get<const column_definition*>(target);
|
||||
auto target_type = std::get<cql3::statements::index_target::target_type>(target);
|
||||
@@ -106,6 +110,9 @@ view_ptr secondary_index_manager::create_view_for_index(const index_metadata& im
|
||||
}
|
||||
builder.with_column(index_target->name(), index_target->type, column_kind::partition_key);
|
||||
for (auto& col : schema->partition_key_columns()) {
|
||||
if (col == *index_target) {
|
||||
continue;
|
||||
}
|
||||
builder.with_column(col.name(), col.type, column_kind::clustering_key);
|
||||
}
|
||||
for (auto& col : schema->clustering_key_columns()) {
|
||||
@@ -114,7 +121,7 @@ view_ptr secondary_index_manager::create_view_for_index(const index_metadata& im
|
||||
}
|
||||
builder.with_column(col.name(), col.type, column_kind::clustering_key);
|
||||
}
|
||||
const sstring where_clause = sprint("%s IS NOT NULL", index_target_name);
|
||||
const sstring where_clause = sprint("%s IS NOT NULL", cql3::util::maybe_quote(index_target_name));
|
||||
builder.with_view_info(*schema, false, where_clause);
|
||||
return view_ptr{builder.build()};
|
||||
}
|
||||
@@ -129,4 +136,14 @@ std::vector<index_metadata> secondary_index_manager::get_dependent_indices(const
|
||||
std::vector<index> secondary_index_manager::list_indexes() const {
|
||||
return boost::copy_range<std::vector<index>>(_indices | boost::adaptors::map_values);
|
||||
}
|
||||
|
||||
bool secondary_index_manager::is_index(view_ptr view) const {
|
||||
for (auto& i : list_indexes()) {
|
||||
if (view->cf_name() == index_table_name(i.metadata().name())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -70,6 +70,7 @@ public:
|
||||
view_ptr create_view_for_index(const index_metadata& index) const;
|
||||
std::vector<index_metadata> get_dependent_indices(const column_definition& cdef) const;
|
||||
std::vector<index> list_indexes() const;
|
||||
bool is_index(view_ptr) const;
|
||||
private:
|
||||
void add_index(const index_metadata& im);
|
||||
};
|
||||
|
||||
@@ -79,18 +79,6 @@ struct target_parser {
|
||||
target_type = index_target::target_type::values;
|
||||
}
|
||||
|
||||
// in the case of a quoted column name the name in the target string
|
||||
// will be enclosed in quotes, which we need to unwrap. It may also
|
||||
// include quote characters internally, escaped like so:
|
||||
// abc"def -> abc""def.
|
||||
// Because the target string is stored in a CQL compatible form, we
|
||||
// need to un-escape any such quotes to get the actual column name
|
||||
static const sstring quote{"\""};
|
||||
if (boost::starts_with(target, quote)) {
|
||||
column_name = column_name.substr(1, column_name.length()-2);
|
||||
static const std::regex two_quotes("\"\"");
|
||||
column_name = std::regex_replace(std::string{column_name}, two_quotes, std::string{quote});
|
||||
}
|
||||
auto column = schema->get_column_definition(utf8_type->decompose(column_name));
|
||||
if (!column) {
|
||||
return stdx::nullopt;
|
||||
|
||||
4
init.cc
4
init.cc
@@ -34,8 +34,8 @@ logging::logger startlog("init");
|
||||
// duplicated in cql_test_env.cc
|
||||
// until proper shutdown is done.
|
||||
|
||||
void init_storage_service(distributed<database>& db, sharded<auth::service>& auth_service) {
|
||||
service::init_storage_service(db, auth_service).get();
|
||||
void init_storage_service(distributed<database>& db, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
|
||||
service::init_storage_service(db, auth_service, sys_dist_ks).get();
|
||||
// #293 - do not stop anything
|
||||
//engine().at_exit([] { return service::deinit_storage_service(); });
|
||||
}
|
||||
|
||||
3
init.hh
3
init.hh
@@ -25,6 +25,7 @@
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include "auth/service.hh"
|
||||
#include "db/config.hh"
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "database.hh"
|
||||
#include "log.hh"
|
||||
|
||||
@@ -36,7 +37,7 @@ extern logging::logger startlog;
|
||||
|
||||
class bad_configuration_error : public std::exception {};
|
||||
|
||||
void init_storage_service(distributed<database>& db, sharded<auth::service>&);
|
||||
void init_storage_service(distributed<database>& db, sharded<auth::service>&, sharded<db::system_distributed_keyspace>&);
|
||||
void init_ms_fd_gossiper(sstring listen_address
|
||||
, uint16_t storage_port
|
||||
, uint16_t ssl_storage_port
|
||||
|
||||
36
keys.hh
36
keys.hh
@@ -146,6 +146,19 @@ public:
|
||||
auto components(const schema& s) const {
|
||||
return components();
|
||||
}
|
||||
|
||||
bool is_empty() const {
|
||||
return _bytes.empty();
|
||||
}
|
||||
|
||||
explicit operator bool() const {
|
||||
return !is_empty();
|
||||
}
|
||||
|
||||
// For backward compatibility with existing code.
|
||||
bool is_empty(const schema& s) const {
|
||||
return is_empty();
|
||||
}
|
||||
};
|
||||
|
||||
template <typename TopLevel, typename TopLevelView>
|
||||
@@ -304,6 +317,19 @@ public:
|
||||
return get_compound_type(s)->end(_bytes);
|
||||
}
|
||||
|
||||
bool is_empty() const {
|
||||
return _bytes.empty();
|
||||
}
|
||||
|
||||
explicit operator bool() const {
|
||||
return !is_empty();
|
||||
}
|
||||
|
||||
// For backward compatibility with existing code.
|
||||
bool is_empty(const schema& s) const {
|
||||
return is_empty();
|
||||
}
|
||||
|
||||
// Returns a range of bytes_view
|
||||
auto components() const {
|
||||
return TopLevelView::compound::element_type::components(representation());
|
||||
@@ -516,10 +542,6 @@ public:
|
||||
bool is_full(const schema& s) const {
|
||||
return TopLevel::get_compound_type(s)->is_full(base::_bytes);
|
||||
}
|
||||
|
||||
bool is_empty(const schema& s) const {
|
||||
return TopLevel::get_compound_type(s)->is_empty(base::_bytes);
|
||||
}
|
||||
};
|
||||
|
||||
template <typename TopLevel, typename TopLevelView, typename FullTopLevel>
|
||||
@@ -538,10 +560,6 @@ public:
|
||||
return TopLevel::get_compound_type(s)->is_full(base::_bytes);
|
||||
}
|
||||
|
||||
bool is_empty(const schema& s) const {
|
||||
return TopLevel::get_compound_type(s)->is_empty(base::_bytes);
|
||||
}
|
||||
|
||||
// Can be called only if is_full()
|
||||
FullTopLevel to_full(const schema& s) const {
|
||||
return FullTopLevel::from_exploded(s, base::explode(s));
|
||||
@@ -799,4 +817,4 @@ struct appending_hash<clustering_key_prefix> {
|
||||
void operator()(Hasher& h, const clustering_key_prefix& ck, const schema& s) const {
|
||||
appending_hash<clustering_key_prefix_view>()(h, ck.view(), s);
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user