Merge "Support SSTables 3.x in Scylla runtime" from Vladimir and Piotr

"
This patchset makes it possible to use SSTables 'mc' format, commonly
referred to as 'SSTables 3.x', when running Scylla instance.

Several bugs found on this way are fixed. Also, a configuration option
is introduced to allow running Scylla either with 'mc' or 'la' format
as default.

Tests: unit {release}

+ tested Scylla with both 'la' and 'mc' formats to work fine:

cqlsh> CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};                                                                  [3/1890]
cqlsh> USE test;
cqlsh:test> CREATE TABLE cfsst3 (pk int, ck int, rc int, PRIMARY KEY (pk, ck)) WITH compression = {'sstable_compression': ''};
cqlsh:test> INSERT INTO cfsst3 (pk, ck, rc) VALUES ( 4, 7, 8);
    <<flush>>
cqlsh:test> DELETE from cfsst3 WHERE pk = 4 and ck> 3 and ck < 8;
    <<flush>>
cqlsh:test> INSERT INTO cfsst3 (pk, ck) VALUES ( 2, 3);
cqlsh:test> INSERT INTO cfsst3 (pk, ck) VALUES ( 4, 6);
cqlsh:test> SELECT * FROM cfsst3 ;

 pk | ck | rc
----+----+------
  2 |  3 | null
  4 |  6 | null

(2 rows)
    <<Scylla restart>>
cqlsh:test> INSERT INTO cfsst3 (pk, ck) VALUES ( 5, 7);
cqlsh:test> INSERT INTO cfsst3 (pk, ck) VALUES ( 6, 8);
cqlsh:test> INSERT INTO cfsst3 (pk, ck) VALUES ( 7, 9);
cqlsh:test> INSERT INTO cfsst3 (pk, ck) VALUES ( 8, 10);
cqlsh:test> SELECT * from cfsst3 ;

 pk | ck | rc
----+----+------
  5 |  7 | null
  8 | 10 | null
  2 |  3 | null
  4 |  6 | null
  7 |  9 | null
  6 |  8 | null

(6 rows)
"

* 'projects/sstables-30/try-runtime/v8' of https://github.com/argenet/scylla:
  database: Honour enable_sstables_mc_format configuration option.
  sstables: Support SSTables 'mc' format as a feature.
  db: Add configuration option for enabling SSTables 'mc' format.
  tests: Add test for reading a complex column with zero subcolumns (SST3).
  sstables: Fix parsing of complex columns with zero subcolumns.
  sstables: Explicitly cast api::timestamp_type to uint64_t when delta-encoding.
  sstables: Use parser_type instead of abstract_type::parse_type in column_translation.
  bytes: Add helper for turning bytes_view into sstring_view.
  sstables: Only forward the call to fast_forwarding_to in mp_row_consumer_m if filter exists.
  sstables: Fix string formatting for exception messages in m_format_read_helpers.
  sstables: Don't validate timestamps against the max value on parsing.
  sstables: Always store only min bases in serialization_header.
  sstables: Support 'mc' version parsing from filename.
  SST3: Make sure we call consume_partition_end
This commit is contained in:
Avi Kivity
2018-09-26 10:22:37 +03:00
committed by Duarte Nunes
19 changed files with 130 additions and 43 deletions

View File

@@ -35,6 +35,10 @@ using bytes_mutable_view = basic_mutable_view<bytes_view::value_type>;
using bytes_opt = std::experimental::optional<bytes>;
using sstring_view = std::experimental::string_view;
inline sstring_view to_sstring_view(bytes_view view) {
return {reinterpret_cast<const char*>(view.data()), view.size()};
}
namespace std {
template <>

View File

@@ -88,7 +88,9 @@ logging::logger dblog("database");
namespace {
sstables::sstable::version_types get_highest_supported_format() {
if (service::get_local_storage_service().cluster_supports_la_sstable()) {
if (service::get_local_storage_service().cluster_supports_mc_sstable()) {
return sstables::sstable::version_types::mc;
} else if (service::get_local_storage_service().cluster_supports_la_sstable()) {
return sstables::sstable::version_types::la;
} else {
return sstables::sstable::version_types::ka;

View File

@@ -739,6 +739,7 @@ public:
" Performance is affected to some extent as a result. Useful to help debugging problems that may arise at another layers.") \
val(cpu_scheduler, bool, true, Used, "Enable cpu scheduling") \
val(view_building, bool, true, Used, "Enable view building; should only be set to false when the node is experience issues due to view building") \
val(enable_sstables_mc_format, bool, false, Used, "Enable SSTables 'mc' format to be used as the default file format; FOR TESTING PURPOSES ONLY - TO BE REMOVED BEFORE RELEASE") \
/* done! */
#define _make_value_member(name, type, deflt, status, desc, ...) \

View File

@@ -100,6 +100,7 @@ static const sstring XXHASH_FEATURE = "XXHASH";
static const sstring ROLES_FEATURE = "ROLES";
static const sstring LA_SSTABLE_FEATURE = "LA_SSTABLE_FORMAT";
static const sstring STREAM_WITH_RPC_STREAM = "STREAM_WITH_RPC_STREAM";
static const sstring MC_SSTABLE_FEATURE = "MC_SSTABLE_FORMAT";
distributed<storage_service> _the_storage_service;
@@ -209,7 +210,11 @@ sstring storage_service::get_config_supported_features() {
LA_SSTABLE_FEATURE,
STREAM_WITH_RPC_STREAM,
};
if (service::get_local_storage_service()._db.local().get_config().experimental()) {
auto& config = service::get_local_storage_service()._db.local().get_config();
if (config.enable_sstables_mc_format()) {
features.push_back(MC_SSTABLE_FEATURE);
}
if (config.experimental()) {
features.push_back(MATERIALIZED_VIEWS_FEATURE);
features.push_back(INDEXES_FEATURE);
}
@@ -426,6 +431,7 @@ void storage_service::register_features() {
_roles_feature = gms::feature(ROLES_FEATURE);
_la_sstable_feature = gms::feature(LA_SSTABLE_FEATURE);
_stream_with_rpc_stream_feature = gms::feature(STREAM_WITH_RPC_STREAM);
_mc_sstable_feature = gms::feature(MC_SSTABLE_FEATURE);
if (_db.local().get_config().experimental()) {
_materialized_views_feature = gms::feature(MATERIALIZED_VIEWS_FEATURE);

View File

@@ -289,6 +289,7 @@ private:
gms::feature _roles_feature;
gms::feature _la_sstable_feature;
gms::feature _stream_with_rpc_stream_feature;
gms::feature _mc_sstable_feature;
public:
void enable_all_features() {
_range_tombstones_feature.enable();
@@ -305,6 +306,7 @@ public:
_roles_feature.enable();
_la_sstable_feature.enable();
_stream_with_rpc_stream_feature.enable();
_mc_sstable_feature.enable();
}
void finish_bootstrapping() {
@@ -2287,6 +2289,10 @@ public:
bool cluster_supports_stream_with_rpc_stream() const {
return bool(_stream_with_rpc_stream_feature);
}
bool cluster_supports_mc_sstable() const {
return bool(_mc_sstable_feature);
}
};
inline future<> init_storage_service(distributed<database>& db, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks) {

View File

@@ -28,6 +28,7 @@
#include "schema.hh"
#include "sstables/types.hh"
#include "utils/UUID.hh"
#include "db/marshal/type_parser.hh"
namespace sstables {
@@ -40,7 +41,7 @@ inline column_values_fixed_lengths get_clustering_values_fixed_lengths(const ser
column_values_fixed_lengths lengths;
lengths.reserve(header.clustering_key_types_names.elements.size());
for (auto&& t : header.clustering_key_types_names.elements) {
auto type = abstract_type::parse_type(sstring(std::cbegin(t.value), std::cend(t.value)));
auto type = db::marshal::type_parser::parse(to_sstring_view(t.value));
lengths.push_back(type->value_length_if_fixed());
}

View File

@@ -125,40 +125,33 @@ inline future<> read_vint(random_access_reader& in, T& value) {
}
}
inline api::timestamp_type parse_timestamp(uint64_t value) {
if (value > api::max_timestamp) {
throw malformed_sstable_exception("Too big timestamp: " + value);
}
return static_cast<api::timestamp_type >(value);
}
inline api::timestamp_type parse_timestamp(const serialization_header& header,
uint64_t delta) {
return parse_timestamp(header.min_timestamp.value + delta);
return static_cast<api::timestamp_type>(header.get_min_timestamp() + delta);
}
inline gc_clock::duration parse_ttl(uint32_t value) {
if (value > std::numeric_limits<gc_clock::duration::rep>::max()) {
throw malformed_sstable_exception("Too big ttl: " + value);
throw malformed_sstable_exception(format("Too big ttl: {}", value));
}
return gc_clock::duration(value);
}
inline gc_clock::duration parse_ttl(const serialization_header& header,
uint32_t delta) {
return parse_ttl(header.min_ttl.value + delta);
return parse_ttl(header.get_min_ttl() + delta);
}
inline gc_clock::time_point parse_expiry(uint32_t value) {
if (value > std::numeric_limits<gc_clock::duration::rep>::max()) {
throw malformed_sstable_exception("Too big expiry: " + value);
throw malformed_sstable_exception(format("Too big expiry: {}", value));
}
return gc_clock::time_point(gc_clock::duration(value));
}
inline gc_clock::time_point parse_expiry(const serialization_header& header,
uint32_t delta) {
return parse_expiry(header.min_local_deletion_time.value + delta);
return parse_expiry(header.get_min_local_deletion_time() + delta);
}
}; // namespace sstables

View File

@@ -298,7 +298,7 @@ void write_missing_columns(file_writer& out, const schema& s, const row& row) {
template <typename T>
void write_unsigned_delta_vint(file_writer& out, T value, T base) {
using unsigned_type = std::make_unsigned_t<T>;
unsigned_type delta = value - base;
unsigned_type delta = static_cast<unsigned_type>(value) - base;
write_vint(out, delta);
}

View File

@@ -1014,6 +1014,10 @@ public:
}
std::optional<position_in_partition_view> fast_forward_to(position_range r, db::timeout_clock::time_point) {
if (!_mf_filter) {
return {};
}
return _mf_filter->fast_forward_to(std::move(r));
}

View File

@@ -1173,6 +1173,14 @@ public:
}
case state::COMPLEX_COLUMN_SIZE_2:
_subcolumns_to_read = _u64;
if (_subcolumns_to_read == 0) {
auto id = get_column_id();
move_to_next_column();
if (_consumer.consume_complex_column_end(std::move(id)) != consumer_m::proceed::yes) {
_state = state::COLUMN;
return consumer_m::proceed::no;
}
}
goto column_label;
case state::RANGE_TOMBSTONE_MARKER:
range_tombstone_marker_label:
@@ -1282,6 +1290,13 @@ public:
{ }
void verify_end_state() {
// If reading a partial row (i.e., when we have a clustering row
// filter and using a promoted index), we may be in FLAGS or FLAGS_2
// state instead of PARTITION_START.
if (_state == state::FLAGS || _state == state::FLAGS_2) {
_consumer.consume_partition_end();
return;
}
if (_state != state::PARTITION_START || _prestate != prestate::NONE) {
throw malformed_sstable_exception("end of input, but not end of partition");
}

View File

@@ -1299,11 +1299,7 @@ double sstable::estimate_droppable_tombstone_ratio(gc_clock::time_point gc_befor
}
future<> sstable::read_statistics(const io_priority_class& pc) {
return read_simple<component_type::Statistics>(_components->statistics, pc).then([this] {
if (_version == version_types::mc) {
adjust_serialization_header();
}
});
return read_simple<component_type::Statistics>(_components->statistics, pc);
}
void sstable::write_statistics(const io_priority_class& pc) {
@@ -2118,9 +2114,9 @@ static sstring pk_type_to_string(const schema& s) {
static serialization_header make_serialization_header(const schema& s, const encoding_stats& enc_stats) {
serialization_header header;
header.min_timestamp.value = enc_stats.min_timestamp - encoding_stats::timestamp_epoch;
header.min_local_deletion_time.value = enc_stats.min_local_deletion_time - encoding_stats::deletion_time_epoch;
header.min_ttl.value = enc_stats.min_ttl - encoding_stats::ttl_epoch;
header.min_timestamp_base.value = static_cast<uint64_t>(enc_stats.min_timestamp) - encoding_stats::timestamp_epoch;
header.min_local_deletion_time_base.value = enc_stats.min_local_deletion_time - encoding_stats::deletion_time_epoch;
header.min_ttl_base.value = enc_stats.min_ttl - encoding_stats::ttl_epoch;
header.pk_type_name = to_bytes_array_vint_size(pk_type_to_string(s));
@@ -3759,7 +3755,7 @@ future<> sstable::set_generation(int64_t new_generation) {
}
entry_descriptor entry_descriptor::make_descriptor(sstring sstdir, sstring fname) {
static std::regex la("la-(\\d+)-(\\w+)-(.*)");
static std::regex la_mc("(la|mc)-(\\d+)-(\\w+)-(.*)");
static std::regex ka("(\\w+)-(\\w+)-ka-(\\d+)-(.*)");
static std::regex dir(".*/([^/]*)/(\\w+)-[\\da-fA-F]+(?:/upload|/snapshots/[^/]+)?/?");
@@ -3776,7 +3772,7 @@ entry_descriptor entry_descriptor::make_descriptor(sstring sstdir, sstring fname
sstlog.debug("Make descriptor sstdir: {}; fname: {}", sstdir, fname);
std::string s(fname);
if (std::regex_match(s, match, la)) {
if (std::regex_match(s, match, la_mc)) {
std::string sdir(sstdir);
std::smatch dirmatch;
if (std::regex_match(sdir, dirmatch, dir)) {
@@ -3785,10 +3781,10 @@ entry_descriptor entry_descriptor::make_descriptor(sstring sstdir, sstring fname
} else {
throw malformed_sstable_exception(seastar::sprint("invalid version for file %s with path %s. Path doesn't match known pattern.", fname, sstdir));
}
version = sstable::version_types::la;
generation = match[1].str();
format = sstring(match[2].str());
component = sstring(match[3].str());
version = (match[1].str() == "la") ? sstable::version_types::la : sstable::version_types::mc;
generation = match[2].str();
format = sstring(match[3].str());
component = sstring(match[4].str());
} else if (std::regex_match(s, match, ka)) {
ks = match[1].str();
cf = match[2].str();

View File

@@ -592,9 +592,6 @@ private:
serialization_header& s = *static_cast<serialization_header *>(p.get());
return s;
}
void adjust_serialization_header() {
get_mutable_serialization_header(*_components).adjust();
}
public:
future<> read_toc();

View File

@@ -358,9 +358,9 @@ struct stats_metadata : public metadata_base<stats_metadata> {
using bytes_array_vint_size = disk_string_vint_size;
struct serialization_header : public metadata_base<serialization_header> {
vint<uint64_t> min_timestamp;
vint<uint32_t> min_local_deletion_time;
vint<uint32_t> min_ttl;
vint<uint64_t> min_timestamp_base;
vint<uint32_t> min_local_deletion_time_base;
vint<uint32_t> min_ttl_base;
bytes_array_vint_size pk_type_name;
disk_array_vint_size<bytes_array_vint_size> clustering_key_types_names;
struct column_desc {
@@ -381,9 +381,9 @@ struct serialization_header : public metadata_base<serialization_header> {
switch (v) {
case sstable_version_types::mc:
return f(
min_timestamp,
min_local_deletion_time,
min_ttl,
min_timestamp_base,
min_local_deletion_time_base,
min_ttl_base,
pk_type_name,
clustering_key_types_names,
static_columns,
@@ -397,10 +397,17 @@ struct serialization_header : public metadata_base<serialization_header> {
// Should never reach here - compiler will complain if switch above does not cover all sstable versions
abort();
}
void adjust() {
min_timestamp.value += encoding_stats::timestamp_epoch;
min_local_deletion_time.value += encoding_stats::deletion_time_epoch;
min_ttl.value += encoding_stats::ttl_epoch;
uint64_t get_min_timestamp() const {
return min_timestamp_base.value + encoding_stats::timestamp_epoch;
}
uint32_t get_min_ttl() const {
return min_ttl_base.value + encoding_stats::ttl_epoch;
}
uint32_t get_min_local_deletion_time() const {
return min_local_deletion_time_base.value + encoding_stats::deletion_time_epoch;
}
};

View File

@@ -46,6 +46,7 @@
#include "types.hh"
#include "partition_slice_builder.hh"
#include "schema.hh"
#include "utils/UUID_gen.hh"
using namespace sstables;
@@ -4408,3 +4409,52 @@ SEASTAR_THREAD_TEST_CASE(test_read_table_empty_clustering_key) {
assert_that(sst.read_rows_flat()).produces(keys);
}
/*
* Test for a bug discovered with Scylla's compaction_history tables
* containing complex columns with zero subcolumns.
*/
SEASTAR_THREAD_TEST_CASE(test_complex_column_zero_subcolumns_read) {
using utils::UUID;
const sstring path =
"tests/sstables/3.x/uncompressed/complex_column_zero_subcolumns";
schema_ptr s = schema_builder("test_ks", "test_table")
.with_column("id", uuid_type, column_kind::partition_key)
.with_column("bytes_in", long_type)
.with_column("bytes_out", long_type)
.with_column("columnfamily_name", utf8_type)
.with_column("compacted_at", timestamp_type)
.with_column("keyspace_name", utf8_type)
.with_column("rows_merged", map_type_impl::get_instance(int32_type, long_type, true))
.build();
sstable_assertions sst(s, path);
sst.load();
auto to_pkey = [&s] (const UUID& key) {
auto bytes = uuid_type->decompose(key);
auto pk = partition_key::from_single_value(*s, bytes);
return dht::global_partitioner().decorate_key(*s, pk);
};
std::vector<UUID> keys {
UUID{"09fea990-b320-11e8-83a7-000000000000"},
UUID{"0a310430-b320-11e8-83a7-000000000000"},
UUID{"0a214cc0-b320-11e8-83a7-000000000000"},
UUID{"0a00a560-b320-11e8-83a7-000000000000"},
UUID{"0a0a6960-b320-11e8-83a7-000000000000"},
UUID{"0a147b80-b320-11e8-83a7-000000000000"},
UUID{"0a187320-b320-11e8-83a7-000000000000"},
};
auto rd = sst.read_rows_flat();
rd.set_max_buffer_size(1);
auto r = assert_that(std::move(rd));
for (const auto& key : keys) {
r.produces_partition_start(to_pkey(key))
.produces_row_with_key(clustering_key::make_empty())
.produces_partition_end();
}
r.produces_end_of_stream();
}

View File

@@ -0,0 +1,5 @@
Index.db
Statistics.db
TOC.txt
Data.db
Summary.db