diff --git a/alternator/ttl.cc b/alternator/ttl.cc index 6aa5fcad7f..39b90c5233 100644 --- a/alternator/ttl.cc +++ b/alternator/ttl.cc @@ -767,7 +767,7 @@ static future scan_table( // by tasking another node to take over scanning of the dead node's primary // ranges. What we do here is that this node will also check expiration // on its *secondary* ranges - but only those whose primary owner is down. - auto tablet_secondary_replica = tablet_map.get_secondary_replica(*tablet); // throws if no secondary replica + auto tablet_secondary_replica = tablet_map.get_secondary_replica(*tablet, erm->get_topology()); // throws if no secondary replica if (tablet_secondary_replica.host == my_host_id && tablet_secondary_replica.shard == this_shard_id()) { if (!gossiper.is_alive(tablet_primary_replica.host)) { co_await scan_tablet(*tablet, proxy, abort_source, page_sem, expiration_stats, scan_ctx, tablet_map); diff --git a/locator/tablets.cc b/locator/tablets.cc index be1d247a73..a2db5161ef 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -616,12 +616,16 @@ tablet_replica tablet_map::get_primary_replica(tablet_id id, const locator::topo return maybe_get_primary_replica(id, replicas, topo, [&] (const auto& _) { return true; }).value(); } -tablet_replica tablet_map::get_secondary_replica(tablet_id id) const { - if (get_tablet_info(id).replicas.size() < 2) { +tablet_replica tablet_map::get_secondary_replica(tablet_id id, const locator::topology& topo) const { + const auto& orig_replicas = get_tablet_info(id).replicas; + if (orig_replicas.size() < 2) { throw std::runtime_error(format("No secondary replica for tablet id {}", id)); } - const auto& replicas = get_tablet_info(id).replicas; - return replicas.at((size_t(id)+1) % replicas.size()); + tablet_replica_set replicas = orig_replicas; + std::ranges::sort(replicas, tablet_replica_comparator(topo)); + // This formula must match the one in get_primary_replica(), + // just with + 1. + return replicas.at((size_t(id) + size_t(id) / replicas.size() + 1) % replicas.size()); } std::optional tablet_map::maybe_get_selected_replica(tablet_id id, const topology& topo, const tablet_task_info& tablet_task_info) const { diff --git a/locator/tablets.hh b/locator/tablets.hh index 5f5a8c4ded..e38637bdf1 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -648,9 +648,10 @@ public: /// Returns the primary replica for the tablet tablet_replica get_primary_replica(tablet_id id, const locator::topology& topo) const; - /// Returns the secondary replica for the tablet, which is assumed to be directly following the primary replica in the replicas vector + /// Returns the secondary replica for the tablet: the replica that immediately follows the primary + /// replica in the topology-sorted replica list. /// \throws std::runtime_error if the tablet has less than 2 replicas. - tablet_replica get_secondary_replica(tablet_id id) const; + tablet_replica get_secondary_replica(tablet_id id, const locator::topology& topo) const; // Returns the replica that matches hosts and dcs filters for tablet_task_info. std::optional maybe_get_selected_replica(tablet_id id, const topology& topo, const tablet_task_info& tablet_task_info) const; diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index 364b20226a..ae0e7ad689 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -6096,4 +6096,83 @@ SEASTAR_THREAD_TEST_CASE(test_tablet_manual_repair_rf1_auto_repair_on) { do_with_cql_env_thread(run_tablet_manual_repair_rf1, std::move(cfg_in)).get(); } +// Test for tablet_map::get_secondary_replica() and specifically how it +// relates to get_primary_replica(). +// We never officially documented given a list of replicas, which replica +// is to be considered the "primary" - it's not simply the first replica in +// the list but the first in some reshuffling of the list, reshuffling whose +// details changed in commits like 817fdad and d88036d. So this patch doesn't +// enshrine what get_primary_replica() or get_secondary_replica() should +// return. It just verifies that get_secondary_replica() returns a *different* +// replica than get_primary_replica() if there are 2 or more replicas, or +// throws an error when there's just one replica. +// Reproduces SCYLLADB-777. +SEASTAR_THREAD_TEST_CASE(test_get_secondary_replica) { + auto h1 = host_id(utils::UUID_gen::get_time_UUID()); + auto h2 = host_id(utils::UUID_gen::get_time_UUID()); + auto h3 = host_id(utils::UUID_gen::get_time_UUID()); + + locator::topology::config cfg = { + .this_endpoint = inet_address("127.0.0.1"), + .this_host_id = h1, + .local_dc_rack = endpoint_dc_rack::default_location, + }; + auto topo = locator::topology(cfg); + topo.add_or_update_endpoint(h1, endpoint_dc_rack::default_location, node::state::normal); + topo.add_or_update_endpoint(h2, endpoint_dc_rack::default_location, node::state::normal); + topo.add_or_update_endpoint(h3, endpoint_dc_rack::default_location, node::state::normal); + + // With 1 replica, get_secondary_replica should throw. + { + tablet_map tmap(1); + auto tid = tmap.first_tablet(); + tmap.set_tablet(tid, tablet_info { + tablet_replica_set { + tablet_replica {h1, 0}, + } + }); + BOOST_REQUIRE_THROW(tmap.get_secondary_replica(tid, topo), std::runtime_error); + } + + // With 2 replicas, get_secondary_replica should return a different replica + // than get_primary_replica for every tablet. + { + tablet_map tmap(4); + for (auto tid : tmap.tablet_ids()) { + tmap.set_tablet(tid, tablet_info { + tablet_replica_set { + tablet_replica {h1, 0}, + tablet_replica {h2, 0}, + } + }); + } + for (auto tid : tmap.tablet_ids()) { + auto primary = tmap.get_primary_replica(tid, topo); + auto secondary = tmap.get_secondary_replica(tid, topo); + BOOST_REQUIRE(primary != secondary); + } + } + + // With 3 replicas, same check. + { + tablet_map tmap(4); + for (auto tid : tmap.tablet_ids()) { + tmap.set_tablet(tid, tablet_info { + tablet_replica_set { + tablet_replica {h1, 0}, + tablet_replica {h2, 0}, + tablet_replica {h3, 0}, + } + }); + } + for (auto tid : tmap.tablet_ids()) { + auto primary = tmap.get_primary_replica(tid, topo); + auto secondary = tmap.get_secondary_replica(tid, topo); + BOOST_REQUIRE(primary != secondary); + } + } + + topo.clear_gently().get(); +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/test/cluster/test_alternator.py b/test/cluster/test_alternator.py index 4c684c024e..2cb92518aa 100644 --- a/test/cluster/test_alternator.py +++ b/test/cluster/test_alternator.py @@ -183,6 +183,72 @@ async def test_alternator_ttl_scheduling_group(manager: ManagerClient): table.delete() +@pytest.mark.parametrize("with_down_node", [False, True], ids=["all_nodes_up", "one_node_down"]) +async def test_alternator_ttl_multinode_expiration(manager: ManagerClient, with_down_node): + """When the cluster has multiple nodes, different nodes are responsible + for checking expiration in different token ranges - each node is + responsible for its "primary ranges". Let's check that this expiration + really does happen - for the entire token range - by writing many + partitions that will span the entire token range, and seeing that they + all expire. We don't check that nodes don't do more work than they + should - an inefficient implementation where every node scans the + entire data set will also pass this test. + When the test is run a second time with with_down_node=True, we verify + that TTL expiration works correctly even when one of the nodes is + brought down. This node's TTL scanner is responsible for scanning part + of the token range, so when this node is down, part of the data might + not get expired. At that point - other node(s) should take over + expiring data in that range - and this test verifies that this indeed + happens. Reproduces issue #9787 and SCYLLADB-777. + """ + servers = await manager.servers_add(3, config=alternator_config, auto_rack_dc='dc1') + alternator = get_alternator(servers[0].ip_addr) + + if with_down_node: + # Bring down one of nodes. Everything we do below, like creating a + # table, reading and writing, should continue to work with one node + # down. + await manager.server_stop_gracefully(servers[2].server_id) + + table = alternator.create_table(TableName=unique_table_name(), + BillingMode='PAY_PER_REQUEST', + KeySchema=[ + {'AttributeName': 'p', 'KeyType': 'HASH' }, + ], + AttributeDefinitions=[ + {'AttributeName': 'p', 'AttributeType': 'N' }, + ]) + # Set the "expiration" column to mark item's expiration time + table.meta.client.update_time_to_live(TableName=table.name, TimeToLiveSpecification={'AttributeName': 'expiration', 'Enabled': True}) + + # Insert 50 rows, in different partitions, so the murmur3 hash maps them + # all over the token space so different nodes would be responsible for + # expiring them. All items are marked to expire 10 seconds in the past, + # so should all expire as soon as possible, during this test. + expiration = int(time.time()) - 10 + with table.batch_writer() as batch: + for p in range(50): + batch.put_item({'p': p, 'expiration': expiration}) + # Expect that after a short delay, all items in the table will have + # expired - so a scan should return no responses. This should happen + # even though one of the nodes is down and not doing its usual + # expiration-scanning work. + timeout = time.time() + 60 + items = -1 + while items != 0 and time.time() < timeout: + response = table.scan(ConsistentRead=True) + items = len(response['Items']) + # In theory (though probably not in practice in this test), a scan() + # can return zero items but have more pages - so we need to be more + # diligent and scan all pages to check it's completely empty. + while items == 0 and 'LastEvaluatedKey' in response: + response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'], ConsistentRead=True) + items += len(response['Items']) + if items == 0: + break + time.sleep(0.1) + assert items == 0 + @pytest.mark.asyncio async def test_localnodes_broadcast_rpc_address(manager: ManagerClient): """Test that if the "broadcast_rpc_address" of a node is set, the