This commit is contained in:
Avi Kivity
2016-03-01 09:52:04 +02:00
8 changed files with 95 additions and 18 deletions

View File

@@ -516,9 +516,9 @@ future<sstables::entry_descriptor> column_family::probe_file(sstring sstdir, sst
});
}
void column_family::update_stats_for_new_sstable(uint64_t new_sstable_data_size) {
_stats.live_disk_space_used += new_sstable_data_size;
_stats.total_disk_space_used += new_sstable_data_size;
void column_family::update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable) {
_stats.live_disk_space_used += disk_space_used_by_sstable;
_stats.total_disk_space_used += disk_space_used_by_sstable;
_stats.live_sstable_count++;
}
@@ -530,7 +530,7 @@ void column_family::add_sstable(lw_shared_ptr<sstables::sstable> sstable) {
auto generation = sstable->generation();
// allow in-progress reads to continue using old list
_sstables = make_lw_shared<sstable_list>(*_sstables);
update_stats_for_new_sstable(sstable->data_size());
update_stats_for_new_sstable(sstable->bytes_on_disk());
_sstables->emplace(generation, std::move(sstable));
}

View File

@@ -172,7 +172,7 @@ private:
class memtable_flush_queue;
std::unique_ptr<memtable_flush_queue> _flush_queue;
private:
void update_stats_for_new_sstable(uint64_t new_sstable_data_size);
void update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable);
void add_sstable(sstables::sstable&& sstable);
void add_sstable(lw_shared_ptr<sstables::sstable> sstable);
void add_memtable();

View File

@@ -44,6 +44,22 @@ commitlog_entry::commitlog_entry(stdx::optional<column_mapping> mapping, const f
, _mutation(mutation)
{ }
commitlog_entry::commitlog_entry(commitlog_entry&& ce)
: _mapping(std::move(ce._mapping))
, _mutation_storage(std::move(ce._mutation_storage))
, _mutation(_mutation_storage ? *_mutation_storage : ce._mutation)
{
}
commitlog_entry& commitlog_entry::operator=(commitlog_entry&& ce)
{
if (this != &ce) {
this->~commitlog_entry();
new (this) commitlog_entry(std::move(ce));
}
return *this;
}
commitlog_entry commitlog_entry_writer::get_entry() const {
if (_with_schema) {
return commitlog_entry(_schema->get_column_mapping(), _mutation);

View File

@@ -35,6 +35,10 @@ class commitlog_entry {
public:
commitlog_entry(stdx::optional<column_mapping> mapping, frozen_mutation&& mutation);
commitlog_entry(stdx::optional<column_mapping> mapping, const frozen_mutation& mutation);
commitlog_entry(commitlog_entry&&);
commitlog_entry(const commitlog_entry&) = delete;
commitlog_entry& operator=(commitlog_entry&&);
commitlog_entry& operator=(const commitlog_entry&) = delete;
const stdx::optional<column_mapping>& mapping() const { return _mapping; }
const frozen_mutation& mutation() const { return _mutation; }
};

View File

@@ -64,4 +64,9 @@ struct writable_variants stub [[writable]] {
boost::variant<writable_vector, simple_compound, writable_final_simple_compound> first;
boost::variant<writable_vector, simple_compound, writable_final_simple_compound> second;
boost::variant<writable_vector, simple_compound, writable_final_simple_compound> third;
};
};
struct compound_with_optional {
std::experimental::optional<simple_compound> first;
simple_compound second;
};

View File

@@ -931,6 +931,14 @@ future<> sstable::open_data() {
return _index_file.size().then([this] (auto size) {
_index_file_size = size;
});
}).then([this] {
// Get disk usage for this sstable (includes all components).
_bytes_on_disk = 0;
return do_for_each(_components, [this] (component_type c) {
return engine().file_size(this->filename(c)).then([this] (uint64_t bytes) {
_bytes_on_disk += bytes;
});
});
});
});
@@ -1446,17 +1454,9 @@ uint64_t sstable::data_size() const {
return _data_file_size;
}
future<uint64_t> sstable::bytes_on_disk() {
if (_bytes_on_disk) {
return make_ready_future<uint64_t>(_bytes_on_disk);
}
return do_for_each(_components, [this] (component_type c) {
return engine().file_size(filename(c)).then([this] (uint64_t bytes) {
_bytes_on_disk += bytes;
});
}).then([this] {
return make_ready_future<uint64_t>(_bytes_on_disk);
});
uint64_t sstable::bytes_on_disk() {
assert(_bytes_on_disk > 0);
return _bytes_on_disk;
}
const bool sstable::has_component(component_type f) const {

View File

@@ -275,7 +275,7 @@ public:
}
// Returns the total bytes of all components.
future<uint64_t> bytes_on_disk();
uint64_t bytes_on_disk();
partition_key get_first_partition_key(const schema& s) const;
partition_key get_last_partition_key(const schema& s) const;

View File

@@ -28,11 +28,14 @@
#include <map>
#include <vector>
#include <experimental/optional>
#include "bytes.hh"
#include "bytes_ostream.hh"
#include "serializer.hh"
namespace stdx = std::experimental;
struct simple_compound {
// TODO: change this to test for #905
uint32_t foo;
@@ -48,6 +51,27 @@ std::ostream& operator<<(std::ostream& os, const simple_compound& sc)
return os << " { foo: " << sc.foo << ", bar: " << sc.bar << " }";
}
struct compound_with_optional {
stdx::optional<simple_compound> first;
simple_compound second;
bool operator==(const compound_with_optional& other) const {
return first == other.first && second == other.second;
}
};
std::ostream& operator<<(std::ostream& os, const compound_with_optional& v)
{
os << " { first: ";
if (v.first) {
os << *v.first;
} else {
os << "<disengaged>";
}
os << ", second: " << v.second << " }";
return os;
}
struct wrapped_vector {
std::vector<simple_compound> vector;
@@ -238,4 +262,32 @@ BOOST_AUTO_TEST_CASE(test_variant)
auto v3 = wv_view.third();
auto&& compound2 = boost::apply_visitor(expect_writable_compound(), v3);
BOOST_REQUIRE_EQUAL(compound2, sc2);
}
BOOST_AUTO_TEST_CASE(test_compound_with_optional)
{
simple_compound foo = { 0xdeadbeef, 0xbadc0ffe };
simple_compound bar = { 0x12345678, 0x87654321 };
compound_with_optional one = { foo, bar };
bytes_ostream buf1;
ser::serialize(buf1, one);
BOOST_REQUIRE_EQUAL(buf1.size(), 29);
auto bv1 = buf1.linearize();
seastar::simple_input_stream in1(reinterpret_cast<const char*>(bv1.data()), bv1.size());
auto deser_one = ser::deserialize(in1, boost::type<compound_with_optional>());
BOOST_REQUIRE_EQUAL(one, deser_one);
compound_with_optional two = { {}, foo };
bytes_ostream buf2;
ser::serialize(buf2, two);
BOOST_REQUIRE_EQUAL(buf2.size(), 17);
auto bv2 = buf2.linearize();
seastar::simple_input_stream in2(reinterpret_cast<const char*>(bv2.data()), bv2.size());
auto deser_two = ser::deserialize(in2, boost::type<compound_with_optional>());
BOOST_REQUIRE_EQUAL(two, deser_two);
}