Merge "Fixes for the reading of missing Summary" from Glauber
"This patchset contains some fixes spotted during post-merged review
by {Nad,}av{,i}. I don't consider any of them a must for backport to 1.0,
but since we haven't yet even backported the main series, might as well backport
everything.
It also includes some unit tests to make sure that they will be kept working
in the future."
This commit is contained in:
@@ -42,10 +42,14 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
// IndexConsumer is a concept that implements:
|
||||
//
|
||||
// bool should_continue();
|
||||
// void consume_entry(index_entry&& ie);
|
||||
template <class IndexConsumer>
|
||||
class index_consume_entry_context: public data_consumer::continuous_data_consumer<index_consume_entry_context<IndexConsumer>> {
|
||||
using proceed = data_consumer::proceed;
|
||||
using parent = data_consumer::continuous_data_consumer<index_consume_entry_context<IndexConsumer>>;
|
||||
using continuous_data_consumer = data_consumer::continuous_data_consumer<index_consume_entry_context<IndexConsumer>>;
|
||||
private:
|
||||
IndexConsumer& _consumer;
|
||||
|
||||
@@ -68,7 +72,7 @@ public:
|
||||
|
||||
bool non_consuming() const {
|
||||
return ((_state == state::CONSUME_ENTRY) || (_state == state::START) ||
|
||||
((_state == state::PROMOTED_BYTES) && (parent::_prestate == parent::prestate::NONE)));
|
||||
((_state == state::PROMOTED_BYTES) && (continuous_data_consumer::_prestate == continuous_data_consumer::prestate::NONE)));
|
||||
}
|
||||
|
||||
proceed process_state(temporary_buffer<char>& data) {
|
||||
@@ -81,32 +85,32 @@ public:
|
||||
_state = state::KEY_SIZE;
|
||||
break;
|
||||
case state::KEY_SIZE:
|
||||
if (parent::read_16(data) != parent::read_status::ready) {
|
||||
if (this->read_16(data) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::KEY_BYTES;
|
||||
break;
|
||||
}
|
||||
case state::KEY_BYTES:
|
||||
if (parent::read_bytes(data, parent::_u16, _key) != parent::read_status::ready) {
|
||||
if (this->read_bytes(data, this->_u16, _key) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::POSITION;
|
||||
break;
|
||||
}
|
||||
case state::POSITION:
|
||||
if (parent::read_64(data) != parent::read_status::ready) {
|
||||
if (this->read_64(data) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::PROMOTED_SIZE;
|
||||
break;
|
||||
}
|
||||
case state::PROMOTED_SIZE:
|
||||
if (parent::read_32(data) != parent::read_status::ready) {
|
||||
if (this->read_32(data) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::PROMOTED_BYTES;
|
||||
break;
|
||||
}
|
||||
case state::PROMOTED_BYTES:
|
||||
if (parent::read_bytes(data, parent::_u32, _promoted) != parent::read_status::ready) {
|
||||
if (this->read_bytes(data, this->_u32, _promoted) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::CONSUME_ENTRY;
|
||||
break;
|
||||
}
|
||||
case state::CONSUME_ENTRY:
|
||||
_consumer.consume_entry(index_entry(std::move(_key), parent::_u64, std::move(_promoted)));
|
||||
_consumer.consume_entry(index_entry(std::move(_key), this->_u64, std::move(_promoted)));
|
||||
_state = state::START;
|
||||
break;
|
||||
default:
|
||||
@@ -117,7 +121,7 @@ public:
|
||||
|
||||
index_consume_entry_context(IndexConsumer& consumer,
|
||||
input_stream<char>&& input, uint64_t maxlen)
|
||||
: parent(std::move(input), maxlen)
|
||||
: continuous_data_consumer(std::move(input), maxlen)
|
||||
, _consumer(consumer)
|
||||
{}
|
||||
|
||||
|
||||
@@ -730,6 +730,7 @@ future<> sstable::read_toc() {
|
||||
try {
|
||||
_components.insert(reverse_map(c, _component_map));
|
||||
} catch (std::out_of_range& oor) {
|
||||
_components.clear(); // so subsequent read_toc will be forced to fail again
|
||||
throw malformed_sstable_exception("Unrecognized TOC component: " + c);
|
||||
}
|
||||
}
|
||||
@@ -964,10 +965,15 @@ future<> sstable::read_summary(const io_priority_class& pc) {
|
||||
}
|
||||
|
||||
return read_toc().then([this, &pc] {
|
||||
// We'll try to keep the main code path exception free, but if an exception does happen
|
||||
// we can try to regenerate the Summary.
|
||||
if (has_component(sstable::component_type::Summary)) {
|
||||
return read_simple<component_type::Summary>(_summary, pc);
|
||||
return read_simple<component_type::Summary>(_summary, pc).handle_exception([this, &pc] (auto ep) {
|
||||
sstlog.warn("Couldn't read summary file %s: %s. Recreating it.", this->filename(component_type::Summary), ep);
|
||||
return this->generate_summary(pc);
|
||||
});
|
||||
} else {
|
||||
return generate_summary(default_priority_class());
|
||||
return generate_summary(pc);
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -1560,6 +1566,11 @@ future<> sstable::generate_summary(const io_priority_class& pc) {
|
||||
seal_summary(_summary, std::move(s.first_key), std::move(s.last_key));
|
||||
});
|
||||
});
|
||||
}).then([index_file] () mutable {
|
||||
return index_file.close().handle_exception([] (auto ep) {
|
||||
sstlog.warn("sstable close index_file failed: {}", ep);
|
||||
general_disk_error();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -175,6 +175,33 @@ SEASTAR_TEST_CASE(big_summary_query_32) {
|
||||
return summary_query<32, 0xc4000, 182>("tests/sstables/bigsummary", 76);
|
||||
}
|
||||
|
||||
// The following two files are just a copy of uncompressed's 1. But the Summary
|
||||
// is removed (and removed from the TOC as well). We should reconstruct it
|
||||
// in this case, so the queries should still go through
|
||||
SEASTAR_TEST_CASE(missing_summary_query_ok) {
|
||||
return summary_query<0, 0, 5>("tests/sstables/uncompressed", 2);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(missing_summary_query_fail) {
|
||||
return summary_query_fail<2, 0, 5>("tests/sstables/uncompressed", 2);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(missing_summary_query_negative_fail) {
|
||||
return summary_query_fail<-2, 0, 5>("tests/sstables/uncompressed", 2);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(missing_summary_first_last_sane) {
|
||||
return reusable_sst("tests/sstables/uncompressed", 2).then([] (sstable_ptr ptr) {
|
||||
auto& summary = sstables::test(ptr).get_summary();
|
||||
BOOST_REQUIRE(summary.header.size == 1);
|
||||
BOOST_REQUIRE(summary.positions.size() == 1);
|
||||
BOOST_REQUIRE(summary.entries.size() == 1);
|
||||
BOOST_REQUIRE(bytes_view(summary.first_key) == as_bytes("vinna"));
|
||||
BOOST_REQUIRE(bytes_view(summary.last_key) == as_bytes("finna"));
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
static future<sstable_ptr> do_write_sst(sstring load_dir, sstring write_dir, unsigned long generation) {
|
||||
auto sst = make_lw_shared<sstable>("ks", "cf", load_dir, generation, la, big);
|
||||
return sst->load().then([sst, write_dir, generation] {
|
||||
|
||||
BIN
tests/sstables/uncompressed/la-2-big-CRC.db
Normal file
BIN
tests/sstables/uncompressed/la-2-big-CRC.db
Normal file
Binary file not shown.
BIN
tests/sstables/uncompressed/la-2-big-Data.db
Normal file
BIN
tests/sstables/uncompressed/la-2-big-Data.db
Normal file
Binary file not shown.
1
tests/sstables/uncompressed/la-2-big-Digest.sha1
Normal file
1
tests/sstables/uncompressed/la-2-big-Digest.sha1
Normal file
@@ -0,0 +1 @@
|
||||
748507322
|
||||
BIN
tests/sstables/uncompressed/la-2-big-Filter.db
Normal file
BIN
tests/sstables/uncompressed/la-2-big-Filter.db
Normal file
Binary file not shown.
BIN
tests/sstables/uncompressed/la-2-big-Index.db
Normal file
BIN
tests/sstables/uncompressed/la-2-big-Index.db
Normal file
Binary file not shown.
BIN
tests/sstables/uncompressed/la-2-big-Statistics.db
Normal file
BIN
tests/sstables/uncompressed/la-2-big-Statistics.db
Normal file
Binary file not shown.
7
tests/sstables/uncompressed/la-2-big-TOC.txt
Normal file
7
tests/sstables/uncompressed/la-2-big-TOC.txt
Normal file
@@ -0,0 +1,7 @@
|
||||
Data.db
|
||||
Filter.db
|
||||
CRC.db
|
||||
Statistics.db
|
||||
Digest.sha1
|
||||
Index.db
|
||||
TOC.txt
|
||||
Reference in New Issue
Block a user