Merge 'repair: Fix rwlock in compaction_state and lock holder lifecycle' from Raphael Raph Carvalho
Consider this: - repair takes the lock holder - tablet merge filber destories the compaction group and the compaction state - repair fails - repair destroy the lock holder This is observed in the test: ``` repair - repair[5d73d094-72ee-4570-a3cc-1cd479b2a036] Repair 1 out of 1 tablets: table=sec_index.users range=(432345564227567615,504403158265495551] replicas=[0e9d51a5-9c99-4d6e-b9db-ad36a148b0ea:15, 498e354c-1254-4d8d-a565-2f5c6523845a:9, 5208598c-84f0-4526-bb7f-573728592172:28] ... repair - repair[5d73d094-72ee-4570-a3cc-1cd479b2a036]: Started to repair 1 out of 1 tables in keyspace=sec_index, table=users, table_id=ea2072d0-ccd9-11f0-8dba-c5ab01bffb77, repair_reason=repair repair - Enable incremental repair for table=sec_index.users range=(432345564227567615,504403158265495551] table - Disabled compaction for range=(432345564227567615,504403158265495551] session_id=a13a72cc-cd2d-11f0-8e9b-76d54580ab09 for incremental repair table - Got unrepaired compaction and repair lock for range=(432345564227567615,504403158265495551] session_id=a13a72cc-cd2d-11f0-8e9b-76d54580ab09 for incremental repair table - Disabled compaction for range=(432345564227567615,504403158265495551] session_id=a13a72cc-cd2d-11f0-8e9b-76d54580ab09 for incremental repair table - Got unrepaired compaction and repair lock for range=(432345564227567615,504403158265495551] session_id=a13a72cc-cd2d-11f0-8e9b-76d54580ab09 for incremental repair repair - repair[5d73d094-72ee-4570-a3cc-1cd479b2a036]: get_sync_boundary: got error from node=0e9d51a5-9c99-4d6e-b9db-ad36a148b0ea, keyspace=sec_index, table=users, range=(432345564227567615,504403158265495551], error=seastar::rpc::remote_verb_error (Compaction state for table [0x60f008fa34c0] not found) compaction_manager - Stopping 1 tasks for 1 ongoing compactions for table sec_index.users compaction_group=238 due to tablet merge compaction_manager - Stopping 1 tasks for 1 ongoing compactions for table sec_index.users compaction_group=238 due to tablet merge .... scylla[10793] Segmentation fault on shard 28, in scheduling group streaming ``` The rwlock in compaction_state could be destroyed before the lock holder of the rwlock is destroyed. This causes user after free when the lock the holder is destroyed. To fix it, users of repair lock will now be waited when a compaction group is being stopped. That way, compaction group - which controls the lifetime of rwlock - cannot be destroyed while the lock is held. Additionally, the merge completion fiber - that might remove groups - is properly serialized with incremental repair. The issue can be reproduced using sanitize build consistently and can not be reproduced after the fix. Fixes #27365 Closes scylladb/scylladb#28823 * github.com:scylladb/scylladb: repair: Fix rwlock in compaction_state and lock holder lifecycle repair: Prevent repair lock holder leakage after table drop
This commit is contained in:
@@ -778,6 +778,7 @@ compaction_manager::get_incremental_repair_read_lock(compaction::compaction_grou
|
||||
cmlog.debug("Get get_incremental_repair_read_lock for {} started", reason);
|
||||
}
|
||||
compaction::compaction_state& cs = get_compaction_state(&t);
|
||||
auto gh = cs.gate.hold();
|
||||
auto ret = co_await cs.incremental_repair_lock.hold_read_lock();
|
||||
if (!reason.empty()) {
|
||||
cmlog.debug("Get get_incremental_repair_read_lock for {} done", reason);
|
||||
@@ -791,6 +792,7 @@ compaction_manager::get_incremental_repair_write_lock(compaction::compaction_gro
|
||||
cmlog.debug("Get get_incremental_repair_write_lock for {} started", reason);
|
||||
}
|
||||
compaction::compaction_state& cs = get_compaction_state(&t);
|
||||
auto gh = cs.gate.hold();
|
||||
auto ret = co_await cs.incremental_repair_lock.hold_write_lock();
|
||||
if (!reason.empty()) {
|
||||
cmlog.debug("Get get_incremental_repair_write_lock for {} done", reason);
|
||||
@@ -2387,6 +2389,8 @@ future<> compaction_manager::remove(compaction_group_view& t, sstring reason) no
|
||||
if (!c_state.gate.is_closed()) {
|
||||
auto close_gate = c_state.gate.close();
|
||||
co_await stop_ongoing_compactions(reason, &t);
|
||||
// Wait for users of incremental repair lock (can be either repair itself or maintenance compactions).
|
||||
co_await c_state.incremental_repair_lock.write_lock();
|
||||
co_await std::move(close_gate);
|
||||
}
|
||||
|
||||
|
||||
@@ -1139,14 +1139,17 @@ future<> schema_applier::finalize_tables_and_views() {
|
||||
// was already dropped (see https://github.com/scylladb/scylla/issues/5614)
|
||||
for (auto& dropped_view : diff.tables_and_views.local().views.dropped) {
|
||||
auto s = dropped_view.get();
|
||||
co_await _ss.local().on_cleanup_for_drop_table(s->id());
|
||||
co_await replica::database::cleanup_drop_table_on_all_shards(sharded_db, _sys_ks, true, diff.table_shards[s->id()]);
|
||||
}
|
||||
for (auto& dropped_table : diff.tables_and_views.local().tables.dropped) {
|
||||
auto s = dropped_table.get();
|
||||
co_await _ss.local().on_cleanup_for_drop_table(s->id());
|
||||
co_await replica::database::cleanup_drop_table_on_all_shards(sharded_db, _sys_ks, true, diff.table_shards[s->id()]);
|
||||
}
|
||||
for (auto& dropped_cdc : diff.tables_and_views.local().cdc.dropped) {
|
||||
auto s = dropped_cdc.get();
|
||||
co_await _ss.local().on_cleanup_for_drop_table(s->id());
|
||||
co_await replica::database::cleanup_drop_table_on_all_shards(sharded_db, _sys_ks, true, diff.table_shards[s->id()]);
|
||||
}
|
||||
|
||||
|
||||
@@ -1211,6 +1211,7 @@ private:
|
||||
}
|
||||
|
||||
co_await utils::get_local_injector().inject("incremental_repair_prepare_wait", utils::wait_for_message(60s));
|
||||
rlogger.debug("Disabling compaction for range={} for incremental repair", _range);
|
||||
auto reenablers_and_holders = co_await table.get_compaction_reenablers_and_lock_holders_for_repair(_db.local(), _frozen_topology_guard, _range);
|
||||
for (auto& lock_holder : reenablers_and_holders.lock_holders) {
|
||||
_rs._repair_compaction_locks[gid].push_back(std::move(lock_holder));
|
||||
@@ -1240,6 +1241,8 @@ private:
|
||||
// compaction.
|
||||
reenablers_and_holders.cres.clear();
|
||||
rlogger.info("Re-enabled compaction for range={} for incremental repair", _range);
|
||||
|
||||
co_await utils::get_local_injector().inject("wait_after_prepare_sstables_for_incremental_repair", utils::wait_for_message(5min));
|
||||
}
|
||||
|
||||
// Read rows from sstable until the size of rows exceeds _max_row_buf_size - current_size
|
||||
@@ -3953,3 +3956,19 @@ future<std::optional<repair_task_progress>> repair_service::get_tablet_repair_ta
|
||||
task_uuid, tid, requested, finished, progress.progress(), finished_nomerge);
|
||||
co_return progress;
|
||||
}
|
||||
|
||||
void repair_service::on_cleanup_for_drop_table(const table_id& id) {
|
||||
// Prevent repair lock from being leaked in repair_service when table is dropped midway.
|
||||
// The RPC verb that removes the lock on success path will not be called by coordinator after table was dropped.
|
||||
// We also cannot move the lock from repair_service to repair_meta, since the lock must outlive the latter.
|
||||
// Since tablet metadata has been erased at this point, we can simply erase all instances for the dropped table.
|
||||
rlogger.debug("Cleaning up state for dropped table {}", id);
|
||||
for (auto it = _repair_compaction_locks.begin(); it != _repair_compaction_locks.end();) {
|
||||
auto& [global_tid, _] = *it;
|
||||
if (global_tid.table == id) {
|
||||
it = _repair_compaction_locks.erase(it);
|
||||
} else {
|
||||
it++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -318,6 +318,8 @@ public:
|
||||
|
||||
future<uint32_t> get_next_repair_meta_id();
|
||||
|
||||
void on_cleanup_for_drop_table(const table_id& id);
|
||||
|
||||
friend class repair::user_requested_repair_task_impl;
|
||||
friend class repair::data_sync_repair_task_impl;
|
||||
friend class repair::tablet_repair_task_impl;
|
||||
|
||||
@@ -448,6 +448,7 @@ public:
|
||||
virtual future<> maybe_split_compaction_group_of(size_t idx) = 0;
|
||||
virtual future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(const sstables::shared_sstable& sst) = 0;
|
||||
virtual dht::token_range get_token_range_after_split(const dht::token&) const noexcept = 0;
|
||||
virtual future<> wait_for_background_tablet_resize_work() = 0;
|
||||
|
||||
virtual lw_shared_ptr<sstables::sstable_set> make_sstable_set() const = 0;
|
||||
};
|
||||
|
||||
@@ -1370,8 +1370,6 @@ public:
|
||||
future<compaction_reenablers_and_lock_holders> get_compaction_reenablers_and_lock_holders_for_repair(replica::database& db,
|
||||
const service::frozen_topology_guard& guard, dht::token_range range);
|
||||
future<uint64_t> estimated_partitions_in_range(dht::token_range tr) const;
|
||||
private:
|
||||
future<std::vector<compaction::compaction_group_view*>> get_compaction_group_views_for_repair(dht::token_range range);
|
||||
};
|
||||
|
||||
lw_shared_ptr<sstables::sstable_set> make_tablet_sstable_set(schema_ptr, const storage_group_manager& sgm, const locator::tablet_map&);
|
||||
|
||||
@@ -750,6 +750,7 @@ public:
|
||||
return make_ready_future<std::vector<sstables::shared_sstable>>(std::vector<sstables::shared_sstable>{sst});
|
||||
}
|
||||
dht::token_range get_token_range_after_split(const dht::token&) const noexcept override { return dht::token_range(); }
|
||||
future<> wait_for_background_tablet_resize_work() override { return make_ready_future<>(); }
|
||||
|
||||
lw_shared_ptr<sstables::sstable_set> make_sstable_set() const override {
|
||||
return get_compaction_group().make_sstable_set();
|
||||
@@ -768,6 +769,13 @@ class tablet_storage_group_manager final : public storage_group_manager {
|
||||
locator::resize_decision::seq_number_t _split_ready_seq_number = std::numeric_limits<locator::resize_decision::seq_number_t>::min();
|
||||
future<> _merge_completion_fiber;
|
||||
condition_variable _merge_completion_event;
|
||||
// Ensures that processes such as incremental repair will wait for pending work from
|
||||
// merge fiber before proceeding. This guarantees stability on the compaction groups.
|
||||
// NOTE: it's important that we don't await on the barrier with any compaction group
|
||||
// gate held, since merge fiber will stop groups that in turn await on gate,
|
||||
// potentially causing an ABBA deadlock.
|
||||
utils::phased_barrier _merge_fiber_barrier;
|
||||
std::optional<utils::phased_barrier::operation> _pending_merge_fiber_work;
|
||||
// Holds compaction reenabler which disables compaction temporarily during tablet merge
|
||||
std::vector<compaction::compaction_reenabler> _compaction_reenablers_for_merging;
|
||||
private:
|
||||
@@ -856,6 +864,7 @@ public:
|
||||
, _my_host_id(erm.get_token_metadata().get_my_id())
|
||||
, _tablet_map(&erm.get_token_metadata().tablets().get_tablet_map(schema()->id()))
|
||||
, _merge_completion_fiber(merge_completion_fiber())
|
||||
, _merge_fiber_barrier(format("[table {}.{}] merge_fiber_barrier", _t.schema()->ks_name(), _t.schema()->cf_name()))
|
||||
{
|
||||
storage_group_map ret;
|
||||
|
||||
@@ -908,6 +917,10 @@ public:
|
||||
dht::token_range get_token_range_after_split(const dht::token& token) const noexcept override {
|
||||
return tablet_map().get_token_range_after_split(token);
|
||||
}
|
||||
future<> wait_for_background_tablet_resize_work() override {
|
||||
co_await _merge_fiber_barrier.advance_and_await();
|
||||
co_return;
|
||||
}
|
||||
|
||||
lw_shared_ptr<sstables::sstable_set> make_sstable_set() const override {
|
||||
// FIXME: avoid recreation of compound_set for groups which had no change. usually, only one group will be changed at a time.
|
||||
@@ -2117,33 +2130,31 @@ compaction_group::update_repaired_at_for_merge() {
|
||||
});
|
||||
}
|
||||
|
||||
future<std::vector<compaction::compaction_group_view*>> table::get_compaction_group_views_for_repair(dht::token_range range) {
|
||||
std::vector<compaction::compaction_group_view*> ret;
|
||||
auto sgs = storage_groups_for_token_range(range);
|
||||
for (auto& sg : sgs) {
|
||||
co_await coroutine::maybe_yield();
|
||||
sg->for_each_compaction_group([&ret] (const compaction_group_ptr& cg) {
|
||||
ret.push_back(&cg->view_for_unrepaired_data());
|
||||
});
|
||||
}
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
future<compaction_reenablers_and_lock_holders> table::get_compaction_reenablers_and_lock_holders_for_repair(replica::database& db,
|
||||
const service::frozen_topology_guard& guard, dht::token_range range) {
|
||||
auto ret = compaction_reenablers_and_lock_holders();
|
||||
auto views = co_await get_compaction_group_views_for_repair(range);
|
||||
for (auto view : views) {
|
||||
auto cre = co_await db.get_compaction_manager().await_and_disable_compaction(*view);
|
||||
// Waits for background tablet resize work like merge that might destroy compaction groups,
|
||||
// providing stability. Essentially, serializes tablet merge completion handling with
|
||||
// the start of incremental repair, from the replica side.
|
||||
co_await _sg_manager->wait_for_background_tablet_resize_work();
|
||||
|
||||
for (auto sg : storage_groups_for_token_range(range)) {
|
||||
// FIXME: indentation
|
||||
auto cgs = sg->compaction_groups_immediate();
|
||||
for (auto& cg : cgs) {
|
||||
auto gate_holder = cg->async_gate().hold();
|
||||
auto& view = cg->view_for_unrepaired_data();
|
||||
auto cre = co_await db.get_compaction_manager().await_and_disable_compaction(view);
|
||||
tlogger.info("Disabled compaction for range={} session_id={} for incremental repair", range, guard);
|
||||
ret.cres.push_back(std::make_unique<compaction::compaction_reenabler>(std::move(cre)));
|
||||
|
||||
// This lock prevents the unrepaired compaction started by major compaction to run in parallel with repair.
|
||||
// The unrepaired compaction started by minor compaction does not need to take the lock since it ignores
|
||||
// sstables being repaired, so it can run in parallel with repair.
|
||||
auto lock_holder = co_await db.get_compaction_manager().get_incremental_repair_write_lock(*view, "row_level_repair");
|
||||
auto lock_holder = co_await db.get_compaction_manager().get_incremental_repair_write_lock(view, "row_level_repair");
|
||||
tlogger.info("Got unrepaired compaction and repair lock for range={} session_id={} for incremental repair", range, guard);
|
||||
ret.lock_holders.push_back(std::move(lock_holder));
|
||||
}
|
||||
}
|
||||
co_return ret;
|
||||
}
|
||||
@@ -3017,7 +3028,7 @@ future<> tablet_storage_group_manager::merge_completion_fiber() {
|
||||
|
||||
while (!_t.async_gate().is_closed()) {
|
||||
try {
|
||||
co_await utils::get_local_injector().inject("merge_completion_fiber", utils::wait_for_message(60s));
|
||||
co_await utils::get_local_injector().inject("merge_completion_fiber", utils::wait_for_message(5min));
|
||||
auto ks_name = schema()->ks_name();
|
||||
auto cf_name = schema()->cf_name();
|
||||
// Enable compaction after merge is done.
|
||||
@@ -3051,6 +3062,7 @@ future<> tablet_storage_group_manager::merge_completion_fiber() {
|
||||
utils::get_local_injector().inject("replica_merge_completion_wait", [] () {
|
||||
tlogger.info("Merge completion fiber finished, about to sleep");
|
||||
});
|
||||
_pending_merge_fiber_work.reset();
|
||||
co_await _merge_completion_event.wait();
|
||||
tlogger.debug("Merge completion fiber woke up for {}.{}", schema()->ks_name(), schema()->cf_name());
|
||||
}
|
||||
@@ -3109,6 +3121,7 @@ void tablet_storage_group_manager::handle_tablet_merge_completion(const locator:
|
||||
new_storage_groups[new_tid] = std::move(new_sg);
|
||||
}
|
||||
_storage_groups = std::move(new_storage_groups);
|
||||
_pending_merge_fiber_work = _merge_fiber_barrier.start();
|
||||
_merge_completion_event.signal();
|
||||
}
|
||||
|
||||
@@ -3125,6 +3138,9 @@ void tablet_storage_group_manager::update_effective_replication_map(const locato
|
||||
} else if (new_tablet_count < old_tablet_count) {
|
||||
tlogger.info0("Detected tablet merge for table {}.{}, decreasing from {} to {} tablets",
|
||||
schema()->ks_name(), schema()->cf_name(), old_tablet_count, new_tablet_count);
|
||||
if (utils::get_local_injector().is_enabled("tablet_force_tablet_count_decrease_once")) {
|
||||
utils::get_local_injector().disable("tablet_force_tablet_count_decrease");
|
||||
}
|
||||
handle_tablet_merge_completion(*old_tablet_map, *new_tablet_map);
|
||||
}
|
||||
|
||||
|
||||
@@ -6647,4 +6647,13 @@ future<> storage_service::query_cdc_streams(table_id table, noncopyable_function
|
||||
return _cdc_gens.local().query_cdc_streams(table, std::move(f));
|
||||
}
|
||||
|
||||
future<> storage_service::on_cleanup_for_drop_table(const table_id& id) {
|
||||
co_await container().invoke_on_all([id] (storage_service& ss) {
|
||||
if (ss._repair.local_is_initialized()) {
|
||||
ss._repair.local().on_cleanup_for_drop_table(id);
|
||||
}
|
||||
});
|
||||
co_return;
|
||||
}
|
||||
|
||||
} // namespace service
|
||||
|
||||
@@ -591,6 +591,8 @@ public:
|
||||
virtual void on_drop_function(const sstring& ks_name, const sstring& function_name) override {}
|
||||
virtual void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
|
||||
virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) override {}
|
||||
|
||||
future<> on_cleanup_for_drop_table(const table_id& id);
|
||||
private:
|
||||
std::optional<db::system_keyspace::peer_info> get_peer_info_for_update(locator::host_id endpoint);
|
||||
// return an engaged value iff app_state_map has changes to the peer info
|
||||
|
||||
@@ -807,3 +807,80 @@ async def test_repair_sigsegv_with_diff_shard_count(manager: ManagerClient, use_
|
||||
else:
|
||||
logger.info("Starting vnode repair")
|
||||
await manager.api.repair(servers[1].ip_addr, ks, "test")
|
||||
|
||||
# Reproducer for https://github.com/scylladb/scylladb/issues/27365
|
||||
# Incremental repair vs tablet merge
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_tablet_incremental_repair_tablet_merge_compaction_group_gone(manager: ManagerClient):
|
||||
cmdline = ['--logger-log-level', 'repair=debug']
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, _, _ = await preapre_cluster_for_incremental_repair(manager, cmdline=cmdline)
|
||||
|
||||
coord = await get_topology_coordinator(manager)
|
||||
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
||||
coord_log = await manager.server_open_log(coord_serv.server_id)
|
||||
|
||||
# Trigger merge and wait until the merge fiber starts
|
||||
s1_mark = await coord_log.mark()
|
||||
await inject_error_on(manager, "merge_completion_fiber", servers)
|
||||
await inject_error_on(manager, "tablet_force_tablet_count_decrease_once", servers)
|
||||
await inject_error_on(manager, "tablet_force_tablet_count_decrease", servers)
|
||||
await coord_log.wait_for(f'Detected tablet merge for table', from_mark=s1_mark)
|
||||
await inject_error_off(manager, "tablet_force_tablet_count_decrease", servers)
|
||||
await coord_log.wait_for(f'merge_completion_fiber: waiting for message', from_mark=s1_mark)
|
||||
|
||||
# Trigger repair and wait for the inc repair prepare preparation to start
|
||||
s1_mark = await coord_log.mark()
|
||||
await inject_error_on(manager, "wait_after_prepare_sstables_for_incremental_repair", servers)
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token=-1, await_completion=False, incremental_mode='incremental')
|
||||
# Wait for preparation to start.
|
||||
await coord_log.wait_for('Disabling compaction for range', from_mark=s1_mark)
|
||||
# Without the serialization, sleep to increase chances of preparation finishing before merge fiber.
|
||||
# With the serialization, preparation will wait for merge fiber to finish.
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
# Continue to execute the merge fiber so that the compaction group is removed
|
||||
await inject_error_on(manager, "replica_merge_completion_wait", servers)
|
||||
for s in servers:
|
||||
await manager.api.message_injection(s.ip_addr, "merge_completion_fiber")
|
||||
|
||||
await coord_log.wait_for(f'Merge completion fiber finished', from_mark=s1_mark)
|
||||
|
||||
# Continue the repair to trigger use-after-free
|
||||
for s in servers:
|
||||
await manager.api.message_injection(s.ip_addr, "wait_after_prepare_sstables_for_incremental_repair")
|
||||
|
||||
await coord_log.wait_for(f'Finished tablet repair', from_mark=s1_mark)
|
||||
|
||||
# Reproducer for https://github.com/scylladb/scylladb/issues/27365
|
||||
# Incremental repair vs table drop
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manager: ManagerClient):
|
||||
cmdline = ['--logger-log-level', 'repair=debug']
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, _, _ = await preapre_cluster_for_incremental_repair(manager, cmdline=cmdline)
|
||||
|
||||
coord = await get_topology_coordinator(manager)
|
||||
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
||||
coord_log = await manager.server_open_log(coord_serv.server_id)
|
||||
|
||||
# Trigger merge and wait until the merge fiber starts
|
||||
s1_mark = await coord_log.mark()
|
||||
|
||||
# Trigger repair and wait for the inc repair prepare preparation to start
|
||||
s1_mark = await coord_log.mark()
|
||||
await inject_error_on(manager, "wait_after_prepare_sstables_for_incremental_repair", servers)
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token=-1, await_completion=False, incremental_mode='incremental')
|
||||
# Wait for preparation to finish.
|
||||
await coord_log.wait_for('Re-enabled compaction for range', from_mark=s1_mark)
|
||||
|
||||
s1_mark = await coord_log.mark()
|
||||
drop_future = cql.run_async(f"DROP TABLE {ks}.test;")
|
||||
await coord_log.wait_for(f'Stopping.*ongoing compactions for table {ks}.test', from_mark=s1_mark)
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
# Continue the repair to trigger use-after-free
|
||||
for s in servers:
|
||||
await manager.api.message_injection(s.ip_addr, "wait_after_prepare_sstables_for_incremental_repair")
|
||||
|
||||
await drop_future
|
||||
|
||||
Reference in New Issue
Block a user