mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-31 20:16:43 +00:00
Consider: - n1 and n2 in the cluster - n3 bootstraps to join - n1 does not hear gossip update from n3 due to network issue - n1 removes n3 from gossip and pending node list - stream between n1 and n3 fails - n1 and n3 network issue is fixed - n3 retry the stream with n1 - n3 finishes the stream with n1 - n3 advertises normal to join the cluster The problem is that n1 will not treat n3 as the pending node so writes will not route to n3 once n1 removes n3. Another problem is that when n1 gets normal gossip status update from n3. The gossip listener will fail because n1 has removed n3 so n1 could not find the host id for n3. This will cause n1 to abort. To fix, disable the retry logic in range_streamer so that once a stream with existing fails the bootstrap fails. The downside is that we lose the ability to restream caused by temporary network issue but since we have repair based node operation. We can use it to resume the previous failed node operations. Fixes: #9805 Closes #9806
165 lines
6.3 KiB
C++
165 lines
6.3 KiB
C++
/*
|
|
*
|
|
* Modified by ScyllaDB
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include "locator/token_metadata.hh"
|
|
#include "locator/snitch_base.hh"
|
|
#include "streaming/stream_plan.hh"
|
|
#include "streaming/stream_state.hh"
|
|
#include "streaming/stream_reason.hh"
|
|
#include "gms/inet_address.hh"
|
|
#include "range.hh"
|
|
#include <seastar/core/distributed.hh>
|
|
#include <seastar/core/abort_source.hh>
|
|
#include <unordered_map>
|
|
#include <memory>
|
|
|
|
namespace replica {
|
|
class database;
|
|
}
|
|
|
|
namespace gms { class gossiper; }
|
|
|
|
namespace dht {
|
|
/**
|
|
* Assists in streaming ranges to a node.
|
|
*/
|
|
class range_streamer {
|
|
public:
|
|
using inet_address = gms::inet_address;
|
|
using token_metadata = locator::token_metadata;
|
|
using token_metadata_ptr = locator::token_metadata_ptr;
|
|
using stream_plan = streaming::stream_plan;
|
|
using stream_state = streaming::stream_state;
|
|
public:
|
|
/**
|
|
* A filter applied to sources to stream from when constructing a fetch map.
|
|
*/
|
|
class i_source_filter {
|
|
public:
|
|
virtual bool should_include(inet_address endpoint) = 0;
|
|
virtual ~i_source_filter() {}
|
|
};
|
|
|
|
/**
|
|
* Source filter which excludes any endpoints that are not alive according to a
|
|
* failure detector.
|
|
*/
|
|
class failure_detector_source_filter : public i_source_filter {
|
|
private:
|
|
std::set<gms::inet_address> _down_nodes;
|
|
public:
|
|
failure_detector_source_filter(std::set<gms::inet_address> down_nodes) : _down_nodes(std::move(down_nodes)) { }
|
|
virtual bool should_include(inet_address endpoint) override { return !_down_nodes.contains(endpoint); }
|
|
};
|
|
|
|
/**
|
|
* Source filter which excludes any endpoints that are not in a specific data center.
|
|
*/
|
|
class single_datacenter_filter : public i_source_filter {
|
|
private:
|
|
sstring _source_dc;
|
|
public:
|
|
single_datacenter_filter(const sstring& source_dc)
|
|
: _source_dc(source_dc) {
|
|
}
|
|
virtual bool should_include(inet_address endpoint) override {
|
|
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
|
|
return snitch_ptr->get_datacenter(endpoint) == _source_dc;
|
|
}
|
|
};
|
|
|
|
range_streamer(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, const token_metadata_ptr tmptr, abort_source& abort_source, std::unordered_set<token> tokens, inet_address address, sstring description, streaming::stream_reason reason)
|
|
: _db(db)
|
|
, _stream_manager(sm)
|
|
, _token_metadata_ptr(std::move(tmptr))
|
|
, _abort_source(abort_source)
|
|
, _tokens(std::move(tokens))
|
|
, _address(address)
|
|
, _description(std::move(description))
|
|
, _reason(reason)
|
|
{
|
|
_abort_source.check();
|
|
}
|
|
|
|
range_streamer(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, const token_metadata_ptr tmptr, abort_source& abort_source, inet_address address, sstring description, streaming::stream_reason reason)
|
|
: range_streamer(db, sm, std::move(tmptr), abort_source, std::unordered_set<token>(), address, description, reason) {
|
|
}
|
|
|
|
void add_source_filter(std::unique_ptr<i_source_filter> filter) {
|
|
_source_filters.emplace(std::move(filter));
|
|
}
|
|
|
|
future<> add_ranges(const sstring& keyspace_name, dht::token_range_vector ranges, gms::gossiper& gossiper, bool is_replacing);
|
|
void add_tx_ranges(const sstring& keyspace_name, std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint);
|
|
void add_rx_ranges(const sstring& keyspace_name, std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint);
|
|
private:
|
|
bool use_strict_sources_for_ranges(const sstring& keyspace_name);
|
|
/**
|
|
* Get a map of all ranges and their respective sources that are candidates for streaming the given ranges
|
|
* to us. For each range, the list of sources is sorted by proximity relative to the given destAddress.
|
|
*/
|
|
std::unordered_map<dht::token_range, std::vector<inet_address>>
|
|
get_all_ranges_with_sources_for(const sstring& keyspace_name, dht::token_range_vector desired_ranges);
|
|
/**
|
|
* Get a map of all ranges and the source that will be cleaned up once this bootstrapped node is added for the given ranges.
|
|
* For each range, the list should only contain a single source. This allows us to consistently migrate data without violating
|
|
* consistency.
|
|
*/
|
|
std::unordered_map<dht::token_range, std::vector<inet_address>>
|
|
get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, dht::token_range_vector desired_ranges, gms::gossiper& gossiper);
|
|
private:
|
|
/**
|
|
* @param rangesWithSources The ranges we want to fetch (key) and their potential sources (value)
|
|
* @param sourceFilters A (possibly empty) collection of source filters to apply. In addition to any filters given
|
|
* here, we always exclude ourselves.
|
|
* @return
|
|
*/
|
|
std::unordered_map<inet_address, dht::token_range_vector>
|
|
get_range_fetch_map(const std::unordered_map<dht::token_range, std::vector<inet_address>>& ranges_with_sources,
|
|
const std::unordered_set<std::unique_ptr<i_source_filter>>& source_filters,
|
|
const sstring& keyspace);
|
|
|
|
#if 0
|
|
|
|
// For testing purposes
|
|
Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch()
|
|
{
|
|
return toFetch;
|
|
}
|
|
#endif
|
|
|
|
const token_metadata& get_token_metadata() {
|
|
return *_token_metadata_ptr;
|
|
}
|
|
public:
|
|
future<> stream_async();
|
|
size_t nr_ranges_to_stream();
|
|
private:
|
|
distributed<replica::database>& _db;
|
|
sharded<streaming::stream_manager>& _stream_manager;
|
|
const token_metadata_ptr _token_metadata_ptr;
|
|
abort_source& _abort_source;
|
|
std::unordered_set<token> _tokens;
|
|
inet_address _address;
|
|
sstring _description;
|
|
streaming::stream_reason _reason;
|
|
std::unordered_multimap<sstring, std::unordered_map<inet_address, dht::token_range_vector>> _to_stream;
|
|
std::unordered_set<std::unique_ptr<i_source_filter>> _source_filters;
|
|
// Number of tx and rx ranges added
|
|
unsigned _nr_tx_added = 0;
|
|
unsigned _nr_rx_added = 0;
|
|
// Limit the number of nodes to stream in parallel to reduce memory pressure with large cluster.
|
|
seastar::semaphore _limiter{16};
|
|
};
|
|
|
|
} // dht
|