Merge "Fixes and improvements for sstables write path"

From Glauber:

"Here are some fixes for the sstables write path. The code is made
simpler by mainly:

- taking advantage of the fact that we don't have to chain futures
  that will do nothing more than write a value to the output stream.
  The compiler will do that for us if we use the recursive template
  interface,

- moving most of the composite logic to sstable/key.cc.

The last one has the interesting side effect of making the code correct.
A nice bonus."
This commit is contained in:
Avi Kivity
2015-05-26 09:50:06 +03:00
4 changed files with 51 additions and 63 deletions

View File

@@ -119,10 +119,21 @@ key key::from_partition_key(const schema& s, const partition_key& pk) {
return from_components(s, pk.begin(s), pk.end(s));
}
bytes composite_from_clustering_key(const schema& s, const clustering_key& ck) {
composite composite::from_clustering_key(const schema& s, const clustering_key& ck) {
return from_components(ck.begin(s), ck.end(s), s.clustering_key_type()->types(), true);
}
composite composite::from_exploded(const std::vector<bytes_view>& v) {
return from_components(v.begin(), v.end(), std::vector<data_type>(v.size(), bytes_type), true);
}
composite composite::static_prefix(const schema& s) {
static bytes static_marker(size_t(2), bytes::value_type(0xff));
std::vector<bytes_view> sv(s.clustering_key_size());
return static_marker + from_components(sv.begin(), sv.end(), std::vector<data_type>(sv.size(), bytes_type), true);
}
inline
std::vector<bytes> explode_composite(bytes_view _bytes) {
std::vector<bytes> ret;

View File

@@ -96,8 +96,6 @@ inline key maximum_key() {
return key(key::kind::after_all_keys);
};
bytes composite_from_clustering_key(const schema& s, const clustering_key& ck);
class composite_view {
bytes_view _bytes;
public:
@@ -109,4 +107,20 @@ public:
return _bytes;
}
};
class composite {
bytes _bytes;
public:
composite (bytes&& b) : _bytes(std::move(b)) {}
static composite from_bytes(bytes b) { return composite(std::move(b)); }
static composite from_clustering_key(const schema& s, const clustering_key& ck);
static composite from_exploded(const std::vector<bytes_view>& v);
static composite static_prefix(const schema& s);
explicit operator bytes_view() const {
return _bytes;
}
operator composite_view() const {
return composite_view(_bytes);
}
};
}

View File

@@ -878,59 +878,23 @@ future<> sstable::store() {
});
}
static future<> write_composite_component(file_writer& out, disk_string<uint16_t>&& column_name,
int8_t end_of_component = 0) {
// NOTE: end of component can also be -1 and 1 to represent ranges.
return do_with(std::move(column_name), [&out, end_of_component] (auto& column_name) {
return write(out, column_name, end_of_component);
});
}
// @clustering_key: it's expected that clustering key is already in its composite form.
// NOTE: empty clustering key means that there is no clustering key.
static future<> write_column_name(file_writer& out, bytes& clustering_key, const bytes& column_name) {
static future<> write_column_name(file_writer& out, const composite& clustering_key, const std::vector<bytes_view>& column_names) {
// FIXME: This code assumes name is always composite, but it wouldn't if "WITH COMPACT STORAGE"
// was defined in the schema, for example.
composite c = composite::from_exploded(column_names);
uint16_t sz = bytes_view(clustering_key).size() + bytes_view(c).size();
// uint16_t and int8_t are about the 16-bit length and end of component, respectively.
uint16_t size = clustering_key.size() + column_name.size() + sizeof(uint16_t) + sizeof(int8_t);
return write(out, size).then([&out, &clustering_key] {
if (!clustering_key.size()) {
return make_ready_future<>();
}
return write(out, clustering_key);
}).then([&out, &column_name] {
// if size of column_name is zero, then column_name is a row marker.
disk_string<uint16_t> c_name;
c_name.value = column_name;
return write_composite_component(out, std::move(c_name));
});
return write(out, sz, bytes_view(clustering_key), bytes_view(c));
}
// magic value for identifying a static column.
static constexpr uint16_t STATIC_MARKER = 0xffff;
static future<> write_static_column_name(file_writer& out, const schema& schema, const std::vector<bytes_view>& column_names) {
composite sp = composite::static_prefix(schema);
composite c = composite::from_exploded(column_names);
uint16_t sz = bytes_view(sp).size() + bytes_view(c).size();
static future<> write_static_column_name(file_writer& out, const bytes& column_name) {
// first component of static column name is composed of the 16-bit magic value ff ff,
// followed by a null composite, i.e. three null bytes.
uint16_t first_component_size = sizeof(uint16_t) + sizeof(uint16_t) + sizeof(int8_t);
uint16_t second_component_size = column_name.size() + sizeof(uint16_t) + sizeof(int8_t);
uint16_t total_size = first_component_size + second_component_size;
return write(out, total_size).then([&out] {
return write(out, STATIC_MARKER).then([&out] {
return write_composite_component(out, {});
});
}).then([&out, &column_name] {
disk_string<uint16_t> c_name;
c_name.value = column_name;
return write_composite_component(out, std::move(c_name));
});
return write(out, sz, bytes_view(sp), bytes_view(c));
}
// Intended to write all cell components that follow column name.
@@ -970,14 +934,14 @@ static future<> write_cell(file_writer& out, atomic_cell_view cell) {
}
}
static future<> write_row_marker(file_writer& out, const rows_entry& clustered_row, bytes& clustering_key) {
static future<> write_row_marker(file_writer& out, const rows_entry& clustered_row, const composite& clustering_key) {
// Missing created_at (api::missing_timestamp) means no row marker.
if (clustered_row.row().created_at() == api::missing_timestamp) {
return make_ready_future<>();
}
// Write row mark cell to the beginning of clustered row.
return write_column_name(out, clustering_key, {}).then([&out, &clustered_row] {
return write_column_name(out, clustering_key, { bytes_view() }).then([&out, &clustered_row] {
column_mask mask = column_mask::none;
uint64_t timestamp = clustered_row.row().created_at();
uint32_t value_length = 0;
@@ -989,7 +953,7 @@ static future<> write_row_marker(file_writer& out, const rows_entry& clustered_r
// write_datafile_clustered_row() is about writing a clustered_row to data file according to SSTables format.
// clustered_row contains a set of cells sharing the same clustering key.
static future<> write_clustered_row(file_writer& out, schema_ptr schema, const rows_entry& clustered_row) {
bytes clustering_key = composite_from_clustering_key(*schema, clustered_row.key());
auto clustering_key = composite::from_clustering_key(*schema, clustered_row.key());
return do_with(std::move(clustering_key), [&out, schema, &clustered_row] (auto& clustering_key) {
return write_row_marker(out, clustered_row, clustering_key).then(
@@ -1010,7 +974,7 @@ static future<> write_clustered_row(file_writer& out, schema_ptr schema, const r
atomic_cell_view cell = value.second.as_atomic_cell();
const bytes& column_name = column_definition.name();
return write_column_name(out, clustering_key, column_name).then([&out, cell] {
return write_column_name(out, clustering_key, { bytes_view(column_name) }).then([&out, cell] {
return write_cell(out, cell);
});
});
@@ -1027,9 +991,7 @@ static future<> write_static_row(file_writer& out, schema_ptr schema, const row&
}
assert(column_definition.is_static());
atomic_cell_view cell = value.second.as_atomic_cell();
const bytes& column_name = column_definition.name();
return write_static_column_name(out, column_name).then([&out, cell] {
return write_static_column_name(out, *schema, { bytes_view(column_definition.name()) }).then([&out, cell] {
return write_cell(out, cell);
});
});

View File

@@ -32,6 +32,7 @@ static inline future<> remove_files(sstring dir, unsigned long generation) {
return when_all(remove_file(sstable::filename(dir, la, generation, big, sstable::component_type::Data)),
remove_file(sstable::filename(dir, la, generation, big, sstable::component_type::Index)),
remove_file(sstable::filename(dir, la, generation, big, sstable::component_type::Summary)),
remove_file(sstable::filename(dir, la, generation, big, sstable::component_type::Filter)),
remove_file(sstable::filename(dir, la, generation, big, sstable::component_type::TOC))).then([] (auto t) {});
}
@@ -98,7 +99,7 @@ SEASTAR_TEST_CASE(datafile_generation_01) {
offset += end_of_row.size();
BOOST_REQUIRE(size == offset);
});
}).then([] {
}).finally([] {
return remove_files("tests/urchin/sstables", 1);
});
});
@@ -169,7 +170,7 @@ SEASTAR_TEST_CASE(datafile_generation_02) {
offset += end_of_row.size();
BOOST_REQUIRE(size == offset);
});
}).then([] {
}).finally([] {
return remove_files("tests/urchin/sstables", 2);
});
});
@@ -240,7 +241,7 @@ SEASTAR_TEST_CASE(datafile_generation_03) {
offset += end_of_row.size();
BOOST_REQUIRE(size == offset);
});
}).then([] {
}).finally([] {
return remove_files("tests/urchin/sstables", 3);
});
});
@@ -317,7 +318,7 @@ SEASTAR_TEST_CASE(datafile_generation_04) {
offset += end_of_row.size();
BOOST_REQUIRE(size == offset);
});
}).then([] {
}).finally([] {
return remove_files("tests/urchin/sstables", 4);
});
});
@@ -384,7 +385,7 @@ SEASTAR_TEST_CASE(datafile_generation_05) {
offset += end_of_row.size();
BOOST_REQUIRE(size == offset);
});
}).then([] {
}).finally([] {
return remove_files("tests/urchin/sstables", 5);
});
});
@@ -458,7 +459,7 @@ SEASTAR_TEST_CASE(datafile_generation_06) {
offset += end_of_row.size();
BOOST_REQUIRE(size == offset);
});
}).then([] {
}).finally([] {
return remove_files("tests/urchin/sstables", 6);
});
});
@@ -521,7 +522,7 @@ SEASTAR_TEST_CASE(datafile_generation_07) {
offset += key2.size();
BOOST_REQUIRE(size == offset);
});
}).then([] {
}).finally([] {
return remove_files("tests/urchin/sstables", 7);
});
});
@@ -598,7 +599,7 @@ SEASTAR_TEST_CASE(datafile_generation_08) {
BOOST_REQUIRE(size == offset);
});
}).then([] {
}).finally([] {
return remove_files("tests/urchin/sstables", 8);
});
});