Merge "rework sstable write functions

From Glauber:

"Up until now, we were still generating one future per element that we write.
Now that we have new infrastructure, we can avoid that, and generate only the
ones we really need to. This has the added advantage of lifting the need to do
lambda captures and allowing for a more straightfoward forwarding of rest...
parameters"
This commit is contained in:
Avi Kivity
2015-06-08 16:13:53 +03:00
5 changed files with 138 additions and 164 deletions

View File

@@ -94,7 +94,7 @@ struct compression {
disk_array<uint32_t, uint64_t> offsets;
template <typename Describer>
future<> describe_type(Describer f) { return f(name, options, chunk_len, data_len, offsets); }
auto describe_type(Describer f) { return f(name, options, chunk_len, data_len, offsets); }
private:
// Variables determined from the above deserialized values, held for convenience:

View File

@@ -114,7 +114,7 @@ class composite {
public:
composite (bytes&& b) : _bytes(std::move(b)) {}
template <typename Describer>
future<> describe_type(Describer f) const { return f(const_cast<bytes&>(_bytes)); }
auto describe_type(Describer f) const { return f(const_cast<bytes&>(_bytes)); }
static composite from_bytes(bytes b) { return composite(std::move(b)); }
template <typename ClusteringElement>

View File

@@ -158,15 +158,12 @@ parse(random_access_reader& in, T& i) {
}
template <typename T>
typename std::enable_if_t<std::is_integral<T>::value, future<>>
inline typename std::enable_if_t<std::is_integral<T>::value, void>
write(file_writer& out, T i) {
auto *nr = reinterpret_cast<const net::packed<T> *>(&i);
i = net::hton(*nr);
auto p = reinterpret_cast<const char*>(&i);
return out.write(p, sizeof(T)).then([&out] (...) -> future<> {
// TODO: handle result
return make_ready_future<>();
});
out.write(p, sizeof(T)).get();
}
template <typename T>
@@ -176,17 +173,17 @@ parse(random_access_reader& in, T& i) {
}
template <typename T>
typename std::enable_if_t<std::is_enum<T>::value, future<>>
inline typename std::enable_if_t<std::is_enum<T>::value, void>
write(file_writer& out, T i) {
return write(out, static_cast<typename std::underlying_type<T>::type>(i));
write(out, static_cast<typename std::underlying_type<T>::type>(i));
}
future<> parse(random_access_reader& in, bool& i) {
return parse(in, reinterpret_cast<uint8_t&>(i));
}
future<> write(file_writer& out, bool i) {
return write(out, static_cast<uint8_t>(i));
inline void write(file_writer& out, bool i) {
write(out, static_cast<uint8_t>(i));
}
template <typename To, typename From>
@@ -211,14 +208,11 @@ future<> parse(random_access_reader& in, double& d) {
});
}
future<> write(file_writer& out, double d) {
inline void write(file_writer& out, double d) {
auto *nr = reinterpret_cast<const net::packed<unsigned long> *>(&d);
auto tmp = net::hton(*nr);
auto p = reinterpret_cast<const char*>(&tmp);
return out.write(p, sizeof(unsigned long)).then([&out] (...) -> future<> {
// TODO: handle result
return make_ready_future<>();
});
out.write(p, sizeof(unsigned long)).get();
}
template <typename T>
@@ -230,15 +224,12 @@ future<> parse(random_access_reader& in, T& len, bytes& s) {
});
}
future<> write(file_writer& out, bytes& s) {
return out.write(s).then([&out, &s] (...) -> future<> {
// TODO: handle result
return make_ready_future<>();
});
inline void write(file_writer& out, bytes& s) {
out.write(s).get();
}
future<> write(file_writer& out, bytes_view s) {
return out.write(reinterpret_cast<const char*>(s.data()), s.size());
inline void write(file_writer& out, bytes_view s) {
out.write(reinterpret_cast<const char*>(s.data()), s.size()).get();
}
// All composite parsers must come after this
@@ -250,10 +241,9 @@ future<> parse(random_access_reader& in, First& first, Rest&&... rest) {
}
template<typename First, typename... Rest>
future<> write(file_writer& out, First& first, Rest&&... rest) {
return write(out, first).then([&out, &rest...] {
return write(out, rest...);
});
inline void write(file_writer& out, First& first, Rest&&... rest) {
write(out, first);
write(out, std::forward<Rest>(rest)...);
}
// Intended to be used for a type that describes itself through describe_type().
@@ -266,10 +256,10 @@ parse(random_access_reader& in, T& t) {
}
template <class T>
typename std::enable_if_t<!std::is_integral<T>::value && !std::is_enum<T>::value, future<>>
inline typename std::enable_if_t<!std::is_integral<T>::value && !std::is_enum<T>::value, void>
write(file_writer& out, T& t) {
return t.describe_type([&out] (auto&&... what) -> future<> {
return write(out, what...);
t.describe_type([&out] (auto&&... what) -> void {
write(out, std::forward<decltype(what)>(what)...);
});
}
@@ -288,19 +278,18 @@ future<> parse(random_access_reader& in, disk_string<Size>& s) {
}
template <typename Size>
future<> write(file_writer& out, disk_string<Size>& s) {
inline void write(file_writer& out, disk_string<Size>& s) {
Size len = 0;
check_truncate_and_assign(len, s.value.size());
return write(out, len).then([&out, &s] {
return write(out, s.value);
});
write(out, len);
write(out, s.value);
}
template <typename Size>
future<> write(file_writer& out, disk_string_view<Size>& s) {
inline void write(file_writer& out, disk_string_view<Size>& s) {
Size len;
check_truncate_and_assign(len, s.value.size());
return write(out, len, s.value);
write(out, len, s.value);
}
// We cannot simply read the whole array at once, because we don't know its
@@ -350,42 +339,34 @@ future<> parse(random_access_reader& in, disk_array<Size, Members>& arr) {
}
template <typename Members>
typename std::enable_if_t<!std::is_integral<Members>::value, future<>>
inline typename std::enable_if_t<!std::is_integral<Members>::value, void>
write(file_writer& out, std::vector<Members>& arr) {
auto count = make_lw_shared<size_t>(0);
auto eoarr = [count, &arr] { return *count == arr.size(); };
return do_until(eoarr, [count, &out, &arr] {
return write(out, arr[(*count)++]);
});
for (auto& a : arr) {
write(out, a);
}
}
template <typename Members>
typename std::enable_if_t<std::is_integral<Members>::value, future<>>
inline typename std::enable_if_t<std::is_integral<Members>::value, void>
write(file_writer& out, std::vector<Members>& arr) {
return do_with(std::vector<Members>(), [&out, &arr] (auto& tmp) {
tmp.resize(arr.size());
// copy arr into tmp converting each entry into big-endian representation.
auto *nr = reinterpret_cast<const net::packed<Members> *>(arr.data());
for (size_t i = 0; i < arr.size(); i++) {
tmp[i] = net::hton(nr[i]);
}
auto p = reinterpret_cast<const char*>(tmp.data());
auto bytes = tmp.size() * sizeof(Members);
return out.write(p, bytes).then([&out] (...) -> future<> {
return make_ready_future<>();
});
});
std::vector<Members> tmp;
tmp.resize(arr.size());
// copy arr into tmp converting each entry into big-endian representation.
auto *nr = reinterpret_cast<const net::packed<Members> *>(arr.data());
for (size_t i = 0; i < arr.size(); i++) {
tmp[i] = net::hton(nr[i]);
}
auto p = reinterpret_cast<const char*>(tmp.data());
auto bytes = tmp.size() * sizeof(Members);
out.write(p, bytes).get();
}
template <typename Size, typename Members>
future<> write(file_writer& out, disk_array<Size, Members>& arr) {
inline void write(file_writer& out, disk_array<Size, Members>& arr) {
Size len = 0;
check_truncate_and_assign(len, arr.elements.size());
return write(out, len).then([&out, &arr] {
return write(out, arr.elements);
});
write(out, len);
write(out, arr.elements);
}
template <typename Size, typename Key, typename Value>
@@ -417,19 +398,18 @@ future<> parse(random_access_reader& in, disk_hash<Size, Key, Value>& h) {
}
template <typename Key, typename Value>
future<> write(file_writer& out, std::unordered_map<Key, Value>& map) {
return do_for_each(map.begin(), map.end(), [&out, &map] (auto& val) {
return write(out, val.first, val.second);
});
inline void write(file_writer& out, std::unordered_map<Key, Value>& map) {
for (auto& val: map) {
write(out, val.first, val.second);
};
}
template <typename Size, typename Key, typename Value>
future<> write(file_writer& out, disk_hash<Size, Key, Value>& h) {
inline void write(file_writer& out, disk_hash<Size, Key, Value>& h) {
Size len = 0;
check_truncate_and_assign(len, h.map.size());
return write(out, len).then([&out, &h] {
return write(out, h.map);
});
write(out, len);
write(out, h.map);
}
future<> parse(random_access_reader& in, summary& s) {
@@ -491,33 +471,28 @@ future<> parse(random_access_reader& in, summary& s) {
});
}
future<> write(file_writer& out, summary_entry& entry)
{
inline void write(file_writer& out, summary_entry& entry) {
// FIXME: summary entry is supposedly written in memory order, but that
// would prevent portability of summary file between machines of different
// endianness. We can treat it as little endian to preserve portability.
return write(out, entry.key).then([&out, &entry] {
auto p = reinterpret_cast<const char*>(&entry.position);
return out.write(p, sizeof(uint64_t));
});
write(out, entry.key);
auto p = reinterpret_cast<const char*>(&entry.position);
out.write(p, sizeof(uint64_t)).get();
}
future<> write(file_writer& out, summary& s) {
inline void write(file_writer& out, summary& s) {
using pos_type = typename decltype(summary::positions)::value_type;
// NOTE: positions and entries must be stored in NATIVE BYTE ORDER, not BIG-ENDIAN.
return write(out, s.header.min_index_interval,
s.header.size,
s.header.memory_size,
s.header.sampling_level,
s.header.size_at_full_sampling).then([&out, &s] {
auto p = reinterpret_cast<const char*>(s.positions.data());
return out.write(p, sizeof(pos_type) * s.positions.size());
}).then([&out, &s] {
return write(out, s.entries);
}).then([&out, &s] {
return write(out, s.first_key, s.last_key);
});
write(out, s.header.min_index_interval,
s.header.size,
s.header.memory_size,
s.header.sampling_level,
s.header.size_at_full_sampling);
auto p = reinterpret_cast<const char*>(s.positions.data());
out.write(p, sizeof(pos_type) * s.positions.size()).get();
write(out, s.entries);
write(out, s.first_key, s.last_key);
}
future<summary_entry&> sstable::read_summary_entry(size_t i) {
@@ -544,8 +519,8 @@ future<> parse(random_access_reader& in, std::unique_ptr<metadata>& p) {
}
template <typename Child>
future<> write(file_writer& out, std::unique_ptr<metadata>& p) {
return write(out, *static_cast<Child *>(p.get()));
inline void write(file_writer& out, std::unique_ptr<metadata>& p) {
write(out, *static_cast<Child *>(p.get()));
}
future<> parse(random_access_reader& in, statistics& s) {
@@ -568,35 +543,37 @@ future<> parse(random_access_reader& in, statistics& s) {
});
}
future<> write(file_writer& out, statistics& s) {
return write(out, s.hash).then([&out, &s] {
struct kv {
metadata_type key;
uint32_t value;
};
// sort map by file offset value and store the result into a vector.
// this is indeed needed because output stream cannot afford random writes.
auto v = make_shared<std::vector<kv>>();
v->reserve(s.hash.map.size());
for (auto val : s.hash.map) {
kv tmp = { val.first, val.second };
v->push_back(tmp);
}
std::sort(v->begin(), v->end(), [] (kv i, kv j) { return i.value < j.value; });
return do_for_each(v->begin(), v->end(), [&out, &s, v] (auto val) mutable {
switch (val.key) {
case metadata_type::Validation:
return write<validation_metadata>(out, s.contents[val.key]);
case metadata_type::Compaction:
return write<compaction_metadata>(out, s.contents[val.key]);
case metadata_type::Stats:
return write<stats_metadata>(out, s.contents[val.key]);
default:
sstlog.warn("Invalid metadata type at Statistics file: {} ", int(val.key));
return make_ready_future<>();
}
});
});
inline void write(file_writer& out, statistics& s) {
write(out, s.hash);
struct kv {
metadata_type key;
uint32_t value;
};
// sort map by file offset value and store the result into a vector.
// this is indeed needed because output stream cannot afford random writes.
auto v = make_shared<std::vector<kv>>();
v->reserve(s.hash.map.size());
for (auto val : s.hash.map) {
kv tmp = { val.first, val.second };
v->push_back(tmp);
}
std::sort(v->begin(), v->end(), [] (kv i, kv j) { return i.value < j.value; });
for (auto& val: *v) {
switch (val.key) {
case metadata_type::Validation:
write<validation_metadata>(out, s.contents[val.key]);
break;
case metadata_type::Compaction:
write<compaction_metadata>(out, s.contents[val.key]);
break;
case metadata_type::Stats:
write<stats_metadata>(out, s.contents[val.key]);
break;
default:
sstlog.warn("Invalid metadata type at Statistics file: {} ", int(val.key));
return; // FIXME: should throw
}
}
}
future<> parse(random_access_reader& in, estimated_histogram& eh) {
@@ -625,31 +602,28 @@ future<> parse(random_access_reader& in, estimated_histogram& eh) {
});
}
future<> write(file_writer& out, estimated_histogram& eh) {
inline void write(file_writer& out, estimated_histogram& eh) {
uint32_t len = 0;
check_truncate_and_assign(len, eh.buckets.size());
return write(out, len).then([&out, &eh] {
struct element {
uint64_t offsets;
uint64_t buckets;
};
std::vector<element> elements;
elements.resize(eh.buckets.size());
write(out, len);
struct element {
uint64_t offsets;
uint64_t buckets;
};
std::vector<element> elements;
elements.resize(eh.buckets.size());
auto *offsets_nr = reinterpret_cast<const net::packed<uint64_t> *>(eh.bucket_offsets.data());
auto *buckets_nr = reinterpret_cast<const net::packed<uint64_t> *>(eh.buckets.data());
for (size_t i = 0; i < eh.buckets.size(); i++) {
elements[i].offsets = net::hton(offsets_nr[i == 0 ? 0 : i - 1]);
elements[i].buckets = net::hton(buckets_nr[i]);
}
auto *offsets_nr = reinterpret_cast<const net::packed<uint64_t> *>(eh.bucket_offsets.data());
auto *buckets_nr = reinterpret_cast<const net::packed<uint64_t> *>(eh.buckets.data());
for (size_t i = 0; i < eh.buckets.size(); i++) {
elements[i].offsets = net::hton(offsets_nr[i == 0 ? 0 : i - 1]);
elements[i].buckets = net::hton(buckets_nr[i]);
}
return do_with(std::move(elements), [&out] (auto& elements) {
auto p = reinterpret_cast<const char*>(elements.data());
auto bytes = elements.size() * sizeof(element);
return out.write(p, bytes);
});
});
auto p = reinterpret_cast<const char*>(elements.data());
auto bytes = elements.size() * sizeof(element);
out.write(p, bytes).get();
}
// This is small enough, and well-defined. Easier to just read it all
@@ -718,7 +692,7 @@ future<> sstable::write_toc() {
// new line character is appended to the end of each component name.
auto value = _component_map[key] + "\n";
bytes b = bytes(reinterpret_cast<const bytes::value_type *>(value.c_str()), value.size());
return write(*w, b);
return seastar::async([w, b = std::move(b)] () mutable { write(*w, b); });
}).then([w] {
return w->flush().then([w] {
return w->close().then([w] {});
@@ -735,7 +709,7 @@ future<> write_crc(const sstring file_path, checksum& c) {
auto out = file_writer(make_lw_shared<file>(std::move(f)), 4096);
auto w = make_shared<file_writer>(std::move(out));
return write(*w, c).then([w] {
return seastar::async([w, &c] () { write(*w, c); }).then([w] {
return w->close().then([w] {});
});
});
@@ -751,7 +725,7 @@ future<> write_digest(const sstring file_path, uint32_t full_checksum) {
auto w = make_shared<file_writer>(std::move(out));
return do_with(to_sstring<bytes>(full_checksum), [w] (bytes& digest) {
return write(*w, digest).then([w] {
return seastar::async([w, &digest] { write(*w, digest); }).then([w] {
return w->close().then([w] {});
});
});
@@ -841,7 +815,7 @@ future<> sstable::write_simple(T& component) {
auto out = file_writer(make_lw_shared<file>(std::move(f)), 4096);
auto w = make_shared<file_writer>(std::move(out));
auto fut = write(*w, component);
auto fut = seastar::async([w, &component] () mutable { write(*w, component); });
return fut.then([w] {
return w->flush().then([w] {
return w->close().then([w] {}); // the underlying file is synced here.
@@ -959,7 +933,7 @@ void sstable::write_column_name(file_writer& out, const composite& clustering_ke
ck_bview.remove_suffix(1);
}
uint16_t sz = ck_bview.size() + c.size();
write(out, sz, ck_bview, c).get();
write(out, sz, ck_bview, c);
}
static inline void update_cell_stats(column_stats& c_stats, uint64_t timestamp) {
@@ -984,7 +958,7 @@ void sstable::write_cell(file_writer& out, atomic_cell_view cell) {
uint32_t expiration = cell.expiry().time_since_epoch().count();
disk_string_view<uint32_t> cell_value { cell.value() };
write(out, mask, ttl, expiration, timestamp, cell_value).get();
write(out, mask, ttl, expiration, timestamp, cell_value);
} else if (cell.is_dead()) {
// tombstone cell
@@ -994,14 +968,14 @@ void sstable::write_cell(file_writer& out, atomic_cell_view cell) {
_c_stats.tombstone_histogram.update(deletion_time);
write(out, mask, timestamp, deletion_time_size, deletion_time).get();
write(out, mask, timestamp, deletion_time_size, deletion_time);
} else {
// regular cell
column_mask mask = column_mask::none;
disk_string_view<uint32_t> cell_value { cell.value() };
write(out, mask, timestamp, cell_value).get();
write(out, mask, timestamp, cell_value);
}
}
@@ -1019,7 +993,7 @@ void sstable::write_row_marker(file_writer& out, const rows_entry& clustered_row
update_cell_stats(_c_stats, timestamp);
write(out, mask, timestamp, value_length).get();
write(out, mask, timestamp, value_length);
}
void sstable::write_range_tombstone(file_writer& out, const composite& clustering_prefix, std::vector<bytes_view> suffix, const tombstone t) {
@@ -1029,12 +1003,12 @@ void sstable::write_range_tombstone(file_writer& out, const composite& clusterin
write_column_name(out, clustering_prefix, suffix, composite_marker::start_range);
column_mask mask = column_mask::range_tombstone;
write(out, mask).get();
write(out, mask);
write_column_name(out, clustering_prefix, suffix, composite_marker::end_range);
uint64_t timestamp = t.timestamp;
uint32_t deletion_time = t.deletion_time.time_since_epoch().count();
write(out, deletion_time, timestamp).get();
write(out, deletion_time, timestamp);
}
void sstable::write_collection(file_writer& out, const composite& clustering_key, const column_definition& cdef, collection_mutation::view collection) {
@@ -1098,7 +1072,7 @@ static void write_index_entry(file_writer& out, disk_string_view<uint16_t>& key,
// FIXME: support promoted indexes.
uint32_t promoted_index_size = 0;
write(out, key, pos, promoted_index_size).get();
write(out, key, pos, promoted_index_size);
}
static constexpr int BASE_SAMPLING_LEVEL = 128;
@@ -1223,7 +1197,7 @@ void sstable::do_write_components(const memtable& mt) {
write_index_entry(*index, p_key, w->offset());
// Write partition key into data file.
write(*w, p_key).get();
write(*w, p_key);
auto tombstone = partition_entry.second.partition_tombstone();
deletion_time d;
@@ -1241,7 +1215,7 @@ void sstable::do_write_components(const memtable& mt) {
d.local_deletion_time = std::numeric_limits<int32_t>::max();
d.marked_for_delete_at = std::numeric_limits<int64_t>::min();
}
write(*w, d).get();
write(*w, d);
auto& partition = partition_entry.second;
auto& static_row = partition.static_row();
@@ -1257,7 +1231,7 @@ void sstable::do_write_components(const memtable& mt) {
write_clustered_row(*w, *mt.schema(), clustered_row);
}
int16_t end_of_row = 0;
write(*w, end_of_row).get();
write(*w, end_of_row);
// compute size of the current row.
_c_stats.row_size = w->offset() - _c_stats.start_offset;

View File

@@ -131,7 +131,7 @@ struct streaming_histogram {
* Function used to describe the type.
*/
template <typename Describer>
future<> describe_type(Describer f) { return f(max_bin_size, bin); }
auto describe_type(Describer f) { return f(max_bin_size, bin); }
// FIXME: convert Java code below.
#if 0

View File

@@ -23,7 +23,7 @@ struct option {
disk_string<uint16_t> value;
template <typename Describer>
future<> describe_type(Describer f) { return f(key, value); }
auto describe_type(Describer f) { return f(key, value); }
};
struct filter {
@@ -31,7 +31,7 @@ struct filter {
disk_array<uint32_t, uint64_t> buckets;
template <typename Describer>
future<> describe_type(Describer f) { return f(hashes, buckets); }
auto describe_type(Describer f) { return f(hashes, buckets); }
// Create an always positive filter if nothing else is specified.
filter() : hashes(0), buckets({}) {}
@@ -119,7 +119,7 @@ struct replay_position {
}
template <typename Describer>
future<> describe_type(Describer f) { return f(segment, position); }
auto describe_type(Describer f) { return f(segment, position); }
};
struct metadata {
@@ -134,7 +134,7 @@ struct validation_metadata : public metadata {
}
template <typename Describer>
future<> describe_type(Describer f) { return f(partitioner, filter_chance); }
auto describe_type(Describer f) { return f(partitioner, filter_chance); }
};
struct compaction_metadata : public metadata {
@@ -142,7 +142,7 @@ struct compaction_metadata : public metadata {
disk_array<uint32_t, uint8_t> cardinality;
template <typename Describer>
future<> describe_type(Describer f) { return f(ancestors, cardinality); }
auto describe_type(Describer f) { return f(ancestors, cardinality); }
};
struct la_stats_metadata : public metadata {
@@ -161,7 +161,7 @@ struct la_stats_metadata : public metadata {
bool has_legacy_counter_shards;
template <typename Describer>
future<> describe_type(Describer f) {
auto describe_type(Describer f) {
return f(
estimated_row_size,
estimated_column_count,
@@ -200,7 +200,7 @@ struct checksum {
std::vector<uint32_t> checksums;
template <typename Describer>
future<> describe_type(Describer f) { return f(chunk_size, checksums); }
auto describe_type(Describer f) { return f(chunk_size, checksums); }
};
}
@@ -228,7 +228,7 @@ struct deletion_time {
int64_t marked_for_delete_at;
template <typename Describer>
future<> describe_type(Describer f) { return f(local_deletion_time, marked_for_delete_at); }
auto describe_type(Describer f) { return f(local_deletion_time, marked_for_delete_at); }
bool live() const {
return (local_deletion_time == std::numeric_limits<int32_t>::max()) &&