From 6eb3bc46bd1e5320d09e55ca01ca9b364f6eaefa Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 20 Apr 2026 14:39:37 -0700 Subject: [PATCH] fix(kafka): close late-joiner orphan race in consumer-group rebalance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The CI-observed orphan (one consumer with empty assignment after `TestConsumerGroups/Rebalancing/MultipleConsumersJoin`) came from a race in the group coordinator: once the leader had taken its member- list snapshot in its JoinGroup response, a new member could still arrive before the leader's SyncGroup landed. The gateway accepted the stale SyncGroup, moved to Stable, and the late joiner's own SyncGroup then served an empty Assignment from the Stable-state path — leaving it silently unassigned with no further rebalance to fix it. Three changes in `handleJoinGroup` / `handleSyncGroup` close the race: - Late join during `CompletingRebalance` bumps the generation and resets to `PreparingRebalance`, so the leader's in-flight SyncGroup fails its generation check and the round restarts with the new member in the snapshot. - SyncGroup generation-mismatch returns `REBALANCE_IN_PROGRESS` (not `ILLEGAL_GENERATION`) while the group is rebalancing, mirroring the existing heartbeat fix — otherwise Sarama's `Consume()` tears down on the stale SyncGroup instead of retrying. - Leader SyncGroup verifies its assignment covers every current member and rejects with `REBALANCE_IN_PROGRESS` otherwise, as a belt-and-suspenders catch for joins that slip in between the leader's JoinGroup reply and its SyncGroup without going through `CompletingRebalance` state. Verified: baseline reliably reproduces the orphan locally; with the fix `TestConsumerGroups` passes end-to-end (53s total, `MultipleConsumersJoin` 15-17s) and a 10-iteration stress loop against the same gateway is 10/10 green with every consumer getting exactly one partition. --- weed/mq/kafka/protocol/joingroup.go | 71 +++++++++++++++++++++++++++-- 1 file changed, 67 insertions(+), 4 deletions(-) diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index 7a939ea19..c35e74b06 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -152,7 +152,25 @@ func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID group.State = consumer.GroupStateCompletingRebalance } case consumer.GroupStateCompletingRebalance: - // Allow join but don't change generation until SyncGroup + // A new member joining here — after the leader has already taken its + // member-list snapshot in its JoinGroup response — means the + // leader's upcoming SyncGroup will omit this member. That leaves it + // with an empty Assignment when the group goes Stable, and its own + // SyncGroup then silently serves the empty assignment (the + // CI-observed orphan). Pre-empt that: bump the generation so the + // leader's in-flight SyncGroup fails its generation check and the + // join cycle restarts with the new member in the leader's snapshot. + // (handleSyncGroup also catches this at commit time as a + // belt-and-suspenders check in case a new member slipped in between + // the leader's JoinGroup reply and its SyncGroup without hitting + // CompletingRebalance state here.) + if isNewMember { + group.State = consumer.GroupStatePreparingRebalance + group.Generation++ + for _, m := range group.Members { + m.State = consumer.MemberStatePending + } + } case consumer.GroupStateDead: return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil } @@ -879,9 +897,20 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID, apiVersion), nil } - // Validate generation + // Validate generation. When a late joiner forces us back to + // PreparingRebalance mid-round, the in-flight leader's SyncGroup carries + // the stale generation. Returning ILLEGAL_GENERATION here would tear down + // Sarama's Consume() (it does not re-enter on that error); returning + // REBALANCE_IN_PROGRESS while the group is rebalancing lets Sarama's + // newSession retry the join/sync cycle at the new generation, mirroring + // the handling in handleHeartbeat. if request.GenerationID != group.Generation { - return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeIllegalGeneration, apiVersion), nil + switch group.State { + case consumer.GroupStatePreparingRebalance, consumer.GroupStateCompletingRebalance: + return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeRebalanceInProgress, apiVersion), nil + default: + return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeIllegalGeneration, apiVersion), nil + } } // Check if this is the group leader with assignments @@ -889,7 +918,41 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque request.MemberID, group.Leader, group.State, len(request.GroupAssignments) > 0, len(group.Members), request.GenerationID) if request.MemberID == group.Leader && len(request.GroupAssignments) > 0 { - // Leader is providing assignments - process and store them + // Leader is providing assignments - process and store them. + + // Before committing, verify the leader's assignment covers every + // current member. A late joiner can arrive either during the + // leader's JoinGroup call (added to group.Members but not in the + // response the leader already built) or during the narrow window + // between the leader's JoinGroup reply and this SyncGroup. In both + // cases the missing member's ID won't appear in GroupAssignments; + // committing now would leave it with an empty Assignment, and once + // state goes Stable its own SyncGroup would silently serve that + // empty assignment (the CI-observed orphan). + // + // Reject instead: bump the generation, reset state to + // PreparingRebalance, and return REBALANCE_IN_PROGRESS so Sarama's + // newSession retries the join/sync cycle at the new generation + // with the complete member list in the leader's JoinGroup response. + // The stale SyncGroup that the leader is still trying to deliver + // will hit the generation-mismatch branch above and also retry. + assigned := make(map[string]bool, len(request.GroupAssignments)) + for _, ga := range request.GroupAssignments { + assigned[ga.MemberID] = true + } + for mid := range group.Members { + if !assigned[mid] { + glog.V(1).Infof("[SYNCGROUP] Leader %s assignment omits member %s (late joiner); forcing rebalance", + request.MemberID, mid) + group.State = consumer.GroupStatePreparingRebalance + group.Generation++ + for _, m := range group.Members { + m.State = consumer.MemberStatePending + } + return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeRebalanceInProgress, apiVersion), nil + } + } + glog.V(2).Infof("[SYNCGROUP] Leader %s providing client-side assignments for group %s (%d assignments)", request.MemberID, request.GroupID, len(request.GroupAssignments)) err = h.processGroupAssignments(group, request.GroupAssignments)