mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-21 09:00:35 +00:00
compression file
Signed-off-by: Glauber Costa <glommer@cloudius-systems.com> Reviewed-by: Nadav Har'El <nyh@cloudius-systems.com>
This commit is contained in:
@@ -207,6 +207,14 @@ future<> parse(file_input_stream& in, disk_hash<Size, Key, Value>& h) {
|
||||
});
|
||||
}
|
||||
|
||||
future<> parse(file_input_stream& in, option& op) {
|
||||
return parse(in, op.key, op.value);
|
||||
}
|
||||
|
||||
future<> parse(file_input_stream& in, compression& c) {
|
||||
return parse(in, c.name, c.options, c.chunk_len, c.data_len, c.offsets);
|
||||
}
|
||||
|
||||
// This is small enough, and well-defined. Easier to just read it all
|
||||
// at once
|
||||
future<> sstable::read_toc() {
|
||||
@@ -269,8 +277,40 @@ future<> sstable::read_toc() {
|
||||
|
||||
}
|
||||
|
||||
template <typename T, sstable::component_type Type, T sstable::* Comptr>
|
||||
future<> sstable::read_simple() {
|
||||
|
||||
auto file_path = filename(Type);
|
||||
sstlog.debug(("Reading " + _component_map[Type] + " file {} ").c_str(), file_path);
|
||||
return engine().open_file_dma(file_path, open_flags::ro).then([this] (file f) {
|
||||
|
||||
auto r = std::make_unique<file_input_stream>(std::move(f), 4096);
|
||||
auto fut = parse(*r, *this.*Comptr);
|
||||
return fut.then([r = std::move(r)] {});
|
||||
}).rescue([this, file_path] (auto get_ex) {
|
||||
try {
|
||||
get_ex();
|
||||
} catch (std::system_error& e) {
|
||||
if (e.code() == std::error_code(ENOENT, std::system_category())) {
|
||||
throw malformed_sstable_exception(file_path + ": file not found");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<> sstable::read_compression() {
|
||||
// FIXME: If there is no compression, we should expect a CRC file to be present.
|
||||
if (!has_component(sstable::component_type::CompressionInfo)) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
return read_simple<compression, component_type::CompressionInfo, &sstable::_compression>();
|
||||
}
|
||||
|
||||
future<> sstable::load() {
|
||||
return read_toc();
|
||||
return read_toc().then([this] {
|
||||
return read_compression();
|
||||
});
|
||||
}
|
||||
|
||||
const bool sstable::has_component(component_type f) {
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include "core/enum.hh"
|
||||
#include <unordered_set>
|
||||
#include <unordered_map>
|
||||
#include "types.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
@@ -49,6 +50,8 @@ private:
|
||||
|
||||
std::unordered_set<component_type> _components;
|
||||
|
||||
compression _compression;
|
||||
|
||||
sstring _dir;
|
||||
unsigned long _epoch = 0;
|
||||
version_types _version;
|
||||
@@ -59,6 +62,11 @@ private:
|
||||
const sstring filename(component_type f);
|
||||
future<> read_toc();
|
||||
|
||||
template <typename T, sstable::component_type Type, T sstable::* Comptr>
|
||||
future<> read_simple();
|
||||
|
||||
future<> read_compression();
|
||||
|
||||
public:
|
||||
sstable(sstring dir, unsigned long epoch, version_types v, format_types f) : _dir(dir), _epoch(epoch), _version(v), _format(f) {}
|
||||
sstable& operator=(const sstable&) = delete;
|
||||
|
||||
@@ -28,4 +28,17 @@ template <typename Size, typename Key, typename Value>
|
||||
struct disk_hash {
|
||||
std::unordered_map<Key, Value, std::hash<Key>> map;
|
||||
};
|
||||
|
||||
struct option {
|
||||
disk_string<uint16_t> key;
|
||||
disk_string<uint16_t> value;
|
||||
};
|
||||
|
||||
struct compression {
|
||||
disk_string<uint16_t> name;
|
||||
disk_array<uint32_t, option> options;
|
||||
uint32_t chunk_len;
|
||||
uint64_t data_len;
|
||||
disk_array<uint32_t, uint64_t> offsets;
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user