Merge branch 'tgrabiec/store-ttl-in-atomic-cell' of github.com:cloudius-systems/seastar-dev into db

Store both ttl and expiry time in atomic_cell, from Tomasz.
This commit is contained in:
Avi Kivity
2015-05-07 10:23:52 +03:00
17 changed files with 229 additions and 92 deletions

View File

@@ -31,25 +31,27 @@ class atomic_cell_or_collection;
*
* Layout:
*
* <live> := <int8_t:flags><int64_t:timestamp><int32_t:ttl>?<value>
* <dead> := <int8_t: 0><int64_t:timestamp><int32_t:ttl>
* <live> := <int8_t:flags><int64_t:timestamp>(<int32_t:expiry><int32_t:ttl>)?<value>
* <dead> := <int8_t: 0><int64_t:timestamp><int32_t:expiry>
*/
class atomic_cell_type final {
private:
static constexpr int8_t DEAD_FLAGS = 0;
static constexpr int8_t LIVE_FLAG = 0x01;
static constexpr int8_t TTL_FLAG = 0x02; // When present, TTL field is present. Set only for live cells
static constexpr int8_t EXPIRY_FLAG = 0x02; // When present, expiry field is present. Set only for live cells
static constexpr unsigned flags_size = 1;
static constexpr unsigned timestamp_offset = flags_size;
static constexpr unsigned timestamp_size = 8;
static constexpr unsigned ttl_offset = timestamp_offset + timestamp_size;
static constexpr unsigned expiry_offset = timestamp_offset + timestamp_size;
static constexpr unsigned expiry_size = 4;
static constexpr unsigned ttl_offset = expiry_offset + expiry_size;
static constexpr unsigned ttl_size = 4;
private:
static bool is_live(const bytes_view& cell) {
return cell[0] != DEAD_FLAGS;
}
static bool is_live_and_has_ttl(const bytes_view& cell) {
return cell[0] & TTL_FLAG;
return cell[0] & EXPIRY_FLAG;
}
static bool is_dead(const bytes_view& cell) {
return cell[0] == DEAD_FLAGS;
@@ -60,35 +62,47 @@ private:
}
// Can be called on live cells only
static bytes_view value(bytes_view cell) {
auto ttl_field_size = bool(cell[0] & TTL_FLAG) * ttl_size;
auto value_offset = flags_size + timestamp_size + ttl_field_size;
auto expiry_field_size = bool(cell[0] & EXPIRY_FLAG) * (expiry_size + ttl_size);
auto value_offset = flags_size + timestamp_size + expiry_field_size;
cell.remove_prefix(value_offset);
return cell;
}
// Can be called on live and dead cells. For dead cells, the result is never empty.
static ttl_opt ttl(const bytes_view& cell) {
static expiry_opt expiry(const bytes_view& cell) {
auto flags = cell[0];
if (flags == DEAD_FLAGS || (flags & TTL_FLAG)) {
auto ttl = get_field<int32_t>(cell, ttl_offset);
return {gc_clock::time_point(gc_clock::duration(ttl))};
if (flags == DEAD_FLAGS || (flags & EXPIRY_FLAG)) {
auto expiry = get_field<int32_t>(cell, expiry_offset);
return {gc_clock::time_point(gc_clock::duration(expiry))};
}
return {};
}
static bytes make_dead(api::timestamp_type timestamp, gc_clock::time_point ttl) {
bytes b(bytes::initialized_later(), flags_size + timestamp_size + ttl_size);
// Can be called only when is_live_and_has_ttl() is true.
static gc_clock::duration ttl(const bytes_view& cell) {
assert(is_live_and_has_ttl(cell));
return gc_clock::duration(get_field<int32_t>(cell, ttl_offset));
}
static bytes make_dead(api::timestamp_type timestamp, gc_clock::time_point expiry) {
bytes b(bytes::initialized_later(), flags_size + timestamp_size + expiry_size);
b[0] = DEAD_FLAGS;
set_field(b, timestamp_offset, timestamp);
set_field(b, ttl_offset, ttl.time_since_epoch().count());
set_field(b, expiry_offset, expiry.time_since_epoch().count());
return b;
}
static bytes make_live(api::timestamp_type timestamp, ttl_opt ttl, bytes_view value) {
auto value_offset = flags_size + timestamp_size + bool(ttl) * ttl_size;
static bytes make_live(api::timestamp_type timestamp, bytes_view value) {
auto value_offset = flags_size + timestamp_size;
bytes b(bytes::initialized_later(), value_offset + value.size());
b[0] = (ttl ? TTL_FLAG : 0) | LIVE_FLAG;
b[0] = LIVE_FLAG;
set_field(b, timestamp_offset, timestamp);
if (ttl) {
set_field(b, ttl_offset, ttl->time_since_epoch().count());
}
std::copy_n(value.begin(), value.size(), b.begin() + value_offset);
return b;
}
static bytes make_live(api::timestamp_type timestamp, bytes_view value, gc_clock::time_point expiry, gc_clock::duration ttl) {
auto value_offset = flags_size + timestamp_size + expiry_size + ttl_size;
bytes b(bytes::initialized_later(), value_offset + value.size());
b[0] = EXPIRY_FLAG | LIVE_FLAG;
set_field(b, timestamp_offset, timestamp);
set_field(b, expiry_offset, expiry.time_since_epoch().count());
set_field(b, ttl_offset, ttl.count());
std::copy_n(value.begin(), value.size(), b.begin() + value_offset);
return b;
}
@@ -123,7 +137,11 @@ public:
return atomic_cell_type::value(_data);
}
// Can be called on live and dead cells. For dead cells, the result is never empty.
ttl_opt ttl() const {
expiry_opt expiry() const {
return atomic_cell_type::expiry(_data);
}
// Can be called only when is_live_and_has_ttl()
gc_clock::duration ttl() const {
return atomic_cell_type::ttl(_data);
}
bytes_view serialize() const {
@@ -164,7 +182,11 @@ public:
return atomic_cell_type::value(_data);
}
// Can be called on live and dead cells. For dead cells, the result is never empty.
ttl_opt ttl() const {
expiry_opt expiry() const {
return atomic_cell_type::expiry(_data);
}
// Can be called only when is_live_and_has_ttl()
gc_clock::duration ttl() const {
return atomic_cell_type::ttl(_data);
}
bytes_view serialize() const {
@@ -173,11 +195,23 @@ public:
operator atomic_cell_view() const {
return atomic_cell_view(_data);
}
static atomic_cell make_dead(api::timestamp_type timestamp, gc_clock::time_point ttl) {
return atomic_cell_type::make_dead(timestamp, ttl);
static atomic_cell make_dead(api::timestamp_type timestamp, gc_clock::time_point expiry) {
return atomic_cell_type::make_dead(timestamp, expiry);
}
static atomic_cell make_live(api::timestamp_type timestamp, ttl_opt ttl, bytes_view value) {
return atomic_cell_type::make_live(timestamp, ttl, value);
static atomic_cell make_live(api::timestamp_type timestamp, bytes_view value) {
return atomic_cell_type::make_live(timestamp, value);
}
static atomic_cell make_live(api::timestamp_type timestamp, bytes_view value,
gc_clock::time_point expiry, gc_clock::duration ttl)
{
return atomic_cell_type::make_live(timestamp, value, expiry, ttl);
}
static atomic_cell make_live(api::timestamp_type timestamp, bytes_view value, ttl_opt ttl) {
if (!ttl) {
return atomic_cell_type::make_live(timestamp, value);
} else {
return atomic_cell_type::make_live(timestamp, value, gc_clock::now() + *ttl, *ttl);
}
}
friend class atomic_cell_or_collection;
friend std::ostream& operator<<(std::ostream& os, const atomic_cell& ac);

View File

@@ -104,9 +104,9 @@ public:
throw exceptions::invalid_request_exception("A TTL must be greater or equal to 0");
}
if (ttl > max_ttl.time_since_epoch().count()) {
if (ttl > max_ttl.count()) {
throw exceptions::invalid_request_exception("ttl is too large. requested (" + std::to_string(ttl) +
") maximum (" + std::to_string(max_ttl.time_since_epoch().count()) + ")");
") maximum (" + std::to_string(max_ttl.count()) + ")");
}
return ttl;

View File

@@ -46,7 +46,7 @@ selection::selection(schema_ptr schema,
query::partition_slice::option_set selection::get_query_options() {
query::partition_slice::option_set opts;
opts.set_if<query::partition_slice::option::send_timestamp_and_ttl>(_collect_timestamps || _collect_TTLs);
opts.set_if<query::partition_slice::option::send_timestamp_and_expiry>(_collect_timestamps || _collect_TTLs);
opts.set_if<query::partition_slice::option::send_partition_key>(
std::any_of(_columns.begin(), _columns.end(),
@@ -261,12 +261,12 @@ void result_set_builder::add(const column_definition& def, const query::result_a
_timestamps[current->size() - 1] = c.timestamp();
}
if (!_ttls.empty()) {
gc_clock::duration ttl(-1);
auto maybe_ttl = c.ttl();
if (maybe_ttl) {
ttl = *maybe_ttl - to_gc_clock(_now);
gc_clock::duration ttl_left(-1);
expiry_opt e = c.expiry();
if (e) {
ttl_left = *e - to_gc_clock(_now);
}
_ttls[current->size() - 1] = ttl.count();
_ttls[current->size() - 1] = ttl_left.count();
}
}

View File

@@ -116,8 +116,11 @@ public:
ttl = _schema->default_time_to_live();
}
return atomic_cell::make_live(_timestamp,
ttl.count() > 0 ? ttl_opt{_local_deletion_time + ttl} : ttl_opt{}, value);
if (ttl.count() > 0) {
return atomic_cell::make_live(_timestamp, value, _local_deletion_time + ttl, ttl);
} else {
return atomic_cell::make_live(_timestamp, value);
}
};
#if 0

View File

@@ -537,7 +537,10 @@ memtable::apply(const mutation& m) {
p.apply(_schema, m.partition());
}
// Based on org.apache.cassandra.db.AbstractCell#reconcile()
// Based on:
// - org.apache.cassandra.db.AbstractCell#reconcile()
// - org.apache.cassandra.db.BufferExpiringCell#reconcile()
// - org.apache.cassandra.db.BufferDeletedCell#reconcile()
int
compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) {
if (left.timestamp() != right.timestamp()) {
@@ -547,15 +550,28 @@ compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) {
return left.is_live() ? -1 : 1;
}
if (left.is_live()) {
return compare_unsigned(left.value(), right.value());
} else {
if (*left.ttl() != *right.ttl()) {
// Origin compares big-endian serialized TTL
return (uint32_t)left.ttl()->time_since_epoch().count()
< (uint32_t)right.ttl()->time_since_epoch().count() ? -1 : 1;
auto c = compare_unsigned(left.value(), right.value());
if (c != 0) {
return c;
}
if (left.is_live_and_has_ttl()
&& right.is_live_and_has_ttl()
&& *left.expiry() != *right.expiry())
{
return left.expiry() < right.expiry() ? -1 : 1;
}
} else {
// Both are deleted
if (*left.expiry() != *right.expiry()) {
// Origin compares big-endian serialized expiry time. That's because it
// delegates to AbstractCell.reconcile() which compares values after
// comparing timestamps, which in case of deleted cells will hold
// serialized expiry.
return (uint32_t) left.expiry()->time_since_epoch().count()
< (uint32_t) right.expiry()->time_since_epoch().count() ? -1 : 1;
}
return 0;
}
return 0;
}
void
@@ -727,10 +743,10 @@ operator<<(std::ostream& os, const exploded_clustering_prefix& ecp) {
std::ostream&
operator<<(std::ostream& os, const atomic_cell_view& acv) {
return fprint(os, "atomic_cell{%s;ts=%d;ttl=%d}",
return fprint(os, "atomic_cell{%s;ts=%d;expiry=%d}",
(acv.is_live() ? to_hex(acv.value()) : sstring("DEAD")),
acv.timestamp(),
acv.is_live_and_has_ttl() ? acv.ttl()->time_since_epoch().count() : -1);
acv.is_live_and_has_ttl() ? acv.expiry()->time_since_epoch().count() : -1);
}
std::ostream&

View File

@@ -22,8 +22,8 @@ public:
};
using ttl_opt = std::experimental::optional<gc_clock::time_point>;
using expiry_opt = std::experimental::optional<gc_clock::time_point>;
using ttl_opt = std::experimental::optional<gc_clock::duration>;
// 20 years in seconds
static constexpr gc_clock::time_point max_ttl = gc_clock::time_point{
gc_clock::duration{20 * 365 * 24 * 60 * 60}};
static constexpr gc_clock::duration max_ttl = gc_clock::duration{20 * 365 * 24 * 60 * 60};

View File

@@ -31,7 +31,7 @@ void mutation::set_clustered_cell(const clustering_key& key, const bytes& name,
if (!column_def) {
throw std::runtime_error(sprint("no column definition found for '%s'", name));
}
return set_clustered_cell(key, *column_def, atomic_cell::make_live(timestamp, ttl, column_def->type->decompose(value)));
return set_clustered_cell(key, *column_def, atomic_cell::make_live(timestamp, column_def->type->decompose(value), ttl));
}
void mutation::set_clustered_cell(const clustering_key& key, const column_definition& def, atomic_cell_or_collection value) {
@@ -45,7 +45,7 @@ void mutation::set_cell(const exploded_clustering_prefix& prefix, const bytes& n
if (!column_def) {
throw std::runtime_error(sprint("no column definition found for '%s'", name));
}
return set_cell(prefix, *column_def, atomic_cell::make_live(timestamp, ttl, column_def->type->decompose(value)));
return set_cell(prefix, *column_def, atomic_cell::make_live(timestamp, column_def->type->decompose(value), ttl));
}
void mutation::set_cell(const exploded_clustering_prefix& prefix, const column_definition& def, atomic_cell_or_collection value) {

View File

@@ -121,11 +121,11 @@ using clustering_range = range<clustering_key_prefix>;
class partition_slice {
public:
enum class option { send_clustering_key, send_partition_key, send_timestamp_and_ttl };
enum class option { send_clustering_key, send_partition_key, send_timestamp_and_expiry };
using option_set = enum_set<super_enum<option,
option::send_clustering_key,
option::send_partition_key,
option::send_timestamp_and_ttl>>;
option::send_timestamp_and_expiry>>;
public:
std::vector<clustering_range> row_ranges;
std::vector<column_id> static_columns; // TODO: consider using bitmap

View File

@@ -14,18 +14,18 @@ namespace query {
class result_atomic_cell_view {
api::timestamp_type _timestamp;
ttl_opt _ttl;
expiry_opt _expiry;
bytes_view _value;
public:
result_atomic_cell_view(api::timestamp_type timestamp, ttl_opt ttl, bytes_view value)
: _timestamp(timestamp), _ttl(ttl), _value(value) { }
result_atomic_cell_view(api::timestamp_type timestamp, expiry_opt expiry, bytes_view value)
: _timestamp(timestamp), _expiry(expiry), _value(value) { }
api::timestamp_type timestamp() const {
return _timestamp;
}
ttl_opt ttl() const {
return _ttl;
expiry_opt expiry() const {
return _expiry;
}
bytes_view value() const {
@@ -55,16 +55,16 @@ public:
return {};
}
api::timestamp_type timestamp = api::missing_timestamp;
ttl_opt ttl_;
if (_slice.options.contains<partition_slice::option::send_timestamp_and_ttl>()) {
expiry_opt expiry_;
if (_slice.options.contains<partition_slice::option::send_timestamp_and_expiry>()) {
timestamp = _in.read <api::timestamp_type> ();
auto ttl_rep = _in.read<gc_clock::rep>();
if (ttl_rep != std::numeric_limits<gc_clock::rep>::max()) {
ttl_ = gc_clock::time_point(gc_clock::duration(ttl_rep));
auto expiry_rep = _in.read<gc_clock::rep>();
if (expiry_rep != std::numeric_limits<gc_clock::rep>::max()) {
expiry_ = gc_clock::time_point(gc_clock::duration(expiry_rep));
}
}
auto value = _in.read_view_to_blob<uint32_t>();
return {result_atomic_cell_view(timestamp, ttl_, value)};
return {result_atomic_cell_view(timestamp, expiry_, value)};
}
std::experimental::optional<collection_mutation::view> next_collection_cell() {
auto present = _in.read<int8_t>();

View File

@@ -43,10 +43,10 @@ public:
// FIXME: store this in a bitmap
_w.write<int8_t>(true);
assert(c.is_live());
if (_slice.options.contains<partition_slice::option::send_timestamp_and_ttl>()) {
if (_slice.options.contains<partition_slice::option::send_timestamp_and_expiry>()) {
_w.write(c.timestamp());
if (c.ttl()) {
_w.write<gc_clock::rep>(c.ttl()->time_since_epoch().count());
if (c.expiry()) {
_w.write<gc_clock::rep>(c.expiry()->time_since_epoch().count());
} else {
_w.write<gc_clock::rep>(std::numeric_limits<gc_clock::rep>::max());
}

View File

@@ -46,7 +46,7 @@ namespace query {
// client, because they are already specified in the query request, and not
// queried for. The query results hold keys optionally.
//
// Also, meta-data like cell timestamp and ttl is optional. It is only needed
// Also, meta-data like cell timestamp and expiry is optional. It is only needed
// if the query has writetime() or ttl() functions in it, which it typically
// won't have.
//
@@ -62,13 +62,13 @@ namespace query {
// <static-row> ::= <row>
// <row> ::= <row-length> <cell>+
// <cell> ::= <atomic-cell> | <collection-cell>
// <atomic-cell> ::= <present-byte> [ <timestamp> <ttl> ] <value>
// <atomic-cell> ::= <present-byte> [ <timestamp> <expiry> ] <value>
// <collection-cell> ::= <blob>
//
// <value> ::= <blob>
// <blob> ::= <blob-length> <uint8_t>*
// <timestamp> ::= <uint64_t>
// <ttl> ::= <int32_t>
// <expiry> ::= <int32_t>
// <present-byte> ::= <int8_t>
// <row-length> ::= <uint32_t>
// <row-count> ::= <uint32_t>

View File

@@ -145,6 +145,15 @@ public:
}
}
atomic_cell make_atomic_cell(uint64_t timestamp, bytes_view value, uint32_t ttl, uint32_t expiration) {
if (ttl) {
return atomic_cell::make_live(timestamp, value,
gc_clock::time_point(gc_clock::duration(expiration)), gc_clock::duration(ttl));
} else {
return atomic_cell::make_live(timestamp, value);
}
}
virtual void consume_cell(bytes_view col_name, bytes_view value, uint64_t timestamp, uint32_t ttl, uint32_t expiration) override {
static bytes cql_row_marker(3, bytes::value_type(0x0));
@@ -163,21 +172,7 @@ public:
throw malformed_sstable_exception("wrong number of clustering columns");
}
ttl_opt opt;
if (ttl) {
gc_clock::duration secs(expiration);
auto tp = gc_clock::time_point(secs);
if (tp < gc_clock::now()) {
consume_deleted_cell(col, timestamp, tp);
return;
}
opt = ttl_opt(tp);
} else {
opt = {};
}
auto ac = atomic_cell::make_live(timestamp, opt, value);
auto ac = make_atomic_cell(timestamp, value, ttl, expiration);
if (col.is_static) {
mut->set_static_cell(*(col.cdef), ac);

View File

@@ -2,7 +2,7 @@
#include "perf.hh"
static atomic_cell make_atomic_cell(bytes value) {
return atomic_cell::make_live(0, ttl_opt{}, value);
return atomic_cell::make_live(0, value);
};
int main(int argc, char* argv[]) {

View File

@@ -15,7 +15,7 @@ static sstring some_keyspace("ks");
static sstring some_column_family("cf");
static atomic_cell make_atomic_cell(bytes value) {
return atomic_cell::make_live(0, ttl_opt{}, std::move(value));
return atomic_cell::make_live(0, std::move(value));
};
BOOST_AUTO_TEST_CASE(test_mutation_is_applied) {
@@ -248,7 +248,7 @@ BOOST_AUTO_TEST_CASE(test_multiple_memtables_multiple_partitions) {
auto key = partition_key::from_exploded(*s, {int32_type->decompose(p1)});
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(c1)});
mutation m(key, s);
m.set_clustered_cell(c_key, r1_col, atomic_cell::make_live(ts++, std::experimental::nullopt, int32_type->decompose(r1)));
m.set_clustered_cell(c_key, r1_col, atomic_cell::make_live(ts++, int32_type->decompose(r1)));
cf.apply(std::move(m));
shadow[p1][c1] = r1;
};
@@ -277,3 +277,89 @@ BOOST_AUTO_TEST_CASE(test_multiple_memtables_multiple_partitions) {
BOOST_REQUIRE(shadow == result);
}
BOOST_AUTO_TEST_CASE(test_cell_ordering) {
auto now = gc_clock::now();
auto ttl_1 = gc_clock::duration(1);
auto ttl_2 = gc_clock::duration(2);
auto expiry_1 = now + ttl_1;
auto expiry_2 = now + ttl_2;
auto assert_order = [] (atomic_cell_view first, atomic_cell_view second) {
if (compare_atomic_cell_for_merge(first, second) >= 0) {
BOOST_FAIL(sprint("Expected %s < %s", first, second));
}
if (compare_atomic_cell_for_merge(second, first) <= 0) {
BOOST_FAIL(sprint("Expected %s < %s", second, first));
}
};
auto assert_equal = [] (atomic_cell_view c1, atomic_cell_view c2) {
BOOST_REQUIRE(compare_atomic_cell_for_merge(c1, c2) == 0);
BOOST_REQUIRE(compare_atomic_cell_for_merge(c2, c1) == 0);
};
assert_equal(
atomic_cell::make_live(0, bytes("value")),
atomic_cell::make_live(0, bytes("value")));
assert_equal(
atomic_cell::make_live(1, bytes("value"), expiry_1, ttl_1),
atomic_cell::make_live(1, bytes("value")));
assert_equal(
atomic_cell::make_dead(1, expiry_1),
atomic_cell::make_dead(1, expiry_1));
// If one cell doesn't have an expiry, Origin considers them equal.
assert_equal(
atomic_cell::make_live(1, bytes(), expiry_2, ttl_2),
atomic_cell::make_live(1, bytes()));
// Origin doesn't compare ttl (is it wise?)
assert_equal(
atomic_cell::make_live(1, bytes("value"), expiry_1, ttl_1),
atomic_cell::make_live(1, bytes("value"), expiry_1, ttl_2));
assert_order(
atomic_cell::make_live(0, bytes("value1")),
atomic_cell::make_live(0, bytes("value2")));
assert_order(
atomic_cell::make_live(0, bytes("value12")),
atomic_cell::make_live(0, bytes("value2")));
// Live cells are ordered first by timestamp...
assert_order(
atomic_cell::make_live(0, bytes("value2")),
atomic_cell::make_live(1, bytes("value1")));
// ..then by value
assert_order(
atomic_cell::make_live(1, bytes("value1"), expiry_2, ttl_2),
atomic_cell::make_live(1, bytes("value2"), expiry_1, ttl_1));
// ..then by expiry
assert_order(
atomic_cell::make_live(1, bytes(), expiry_1, ttl_1),
atomic_cell::make_live(1, bytes(), expiry_2, ttl_1));
// Dead wins
assert_order(
atomic_cell::make_live(1, bytes("value")),
atomic_cell::make_dead(1, expiry_1));
// Dead wins with expiring cell
assert_order(
atomic_cell::make_live(1, bytes("value"), expiry_2, ttl_2),
atomic_cell::make_dead(1, expiry_1));
// Deleted cells are ordered first by timestamp
assert_order(
atomic_cell::make_dead(1, expiry_2),
atomic_cell::make_dead(2, expiry_1));
// ...then by expiry
assert_order(
atomic_cell::make_dead(1, expiry_1),
atomic_cell::make_dead(1, expiry_2));
}

View File

@@ -140,7 +140,7 @@ static sstring some_keyspace("ks");
static sstring some_column_family("cf");
static atomic_cell make_atomic_cell(bytes value) {
return atomic_cell::make_live(0, ttl_opt{}, std::move(value));
return atomic_cell::make_live(0, std::move(value));
}
SEASTAR_TEST_CASE(test_mutation){

View File

@@ -296,7 +296,7 @@ static sstring some_keyspace("ks");
static sstring some_column_family("cf");
static atomic_cell make_atomic_cell(bytes value) {
return atomic_cell::make_live(0, ttl_opt{}, std::move(value));
return atomic_cell::make_live(0, std::move(value));
};
SEASTAR_TEST_CASE(datafile_generation_01) {

View File

@@ -275,9 +275,12 @@ public:
if (ttl.count() <= 0) {
ttl = cf.schema()->default_time_to_live();
}
auto ttl_option = ttl.count() > 0 ? ttl_opt(gc_clock::now() + ttl) : ttl_opt();
ttl_opt maybe_ttl;
if (ttl.count() > 0) {
maybe_ttl = ttl;
}
m_to_apply.set_clustered_cell(empty_clustering_key, *def,
atomic_cell::make_live(col.timestamp, ttl_option, to_bytes(col.value)));
atomic_cell::make_live(col.timestamp, to_bytes(col.value), maybe_ttl));
} else if (cosc.__isset.super_column) {
// FIXME: implement
} else if (cosc.__isset.counter_column) {