From 8ae77f92ecb832f17a762ccdc209dad3ded71133 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 20 Apr 2026 15:25:51 -0700 Subject: [PATCH] fix(kafka): clear stale Assignment when restarting a rebalance round MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Review spot: the two restart paths added in the previous commit bumped group.Generation and reset each member's State to Pending but left member.Assignment populated with the prior generation's partitions. The non-leader SyncGroup path only returns REBALANCE_IN_PROGRESS when `member.Assignment` is empty (handleSyncGroup ~line 982). Leaving the stale assignment in place means a member rejoining at the new generation — before the leader's SyncGroup has published fresh assignments — falls through that guard and is served its old partitions from the pre-rebalance state. Clear m.Assignment alongside m.State in both restart sites so the guard fires and the member correctly re-enters the join/sync cycle. Verified with a fresh-broker TestConsumerGroups run: 50.99s total, MultipleConsumersJoin 15.25s, all four consumers each get exactly one partition. --- weed/mq/kafka/protocol/joingroup.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index c35e74b06..52131167d 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -167,8 +167,15 @@ func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID if isNewMember { group.State = consumer.GroupStatePreparingRebalance group.Generation++ + // Clear prior-generation assignments. The non-leader SyncGroup + // path in handleSyncGroup only returns REBALANCE_IN_PROGRESS + // when `member.Assignment` is empty; without clearing, a + // member rejoining at the new generation would be served its + // stale old-generation partitions from the pre-rebalance + // state. for _, m := range group.Members { m.State = consumer.MemberStatePending + m.Assignment = nil } } case consumer.GroupStateDead: @@ -946,8 +953,14 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque request.MemberID, mid) group.State = consumer.GroupStatePreparingRebalance group.Generation++ + // Clear prior-generation assignments so the non-leader + // SyncGroup path (which only returns REBALANCE_IN_PROGRESS + // when member.Assignment is empty) doesn't serve stale + // partitions between now and the next successful + // leader SyncGroup. for _, m := range group.Members { m.State = consumer.MemberStatePending + m.Assignment = nil } return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeRebalanceInProgress, apiVersion), nil }