fix(kafka): close late-joiner orphan race in consumer-group rebalance

The CI-observed orphan (one consumer with empty assignment after
`TestConsumerGroups/Rebalancing/MultipleConsumersJoin`) came from a
race in the group coordinator: once the leader had taken its member-
list snapshot in its JoinGroup response, a new member could still
arrive before the leader's SyncGroup landed. The gateway accepted the
stale SyncGroup, moved to Stable, and the late joiner's own SyncGroup
then served an empty Assignment from the Stable-state path — leaving
it silently unassigned with no further rebalance to fix it.

Three changes in `handleJoinGroup` / `handleSyncGroup` close the race:

- Late join during `CompletingRebalance` bumps the generation and
  resets to `PreparingRebalance`, so the leader's in-flight SyncGroup
  fails its generation check and the round restarts with the new
  member in the snapshot.
- SyncGroup generation-mismatch returns `REBALANCE_IN_PROGRESS` (not
  `ILLEGAL_GENERATION`) while the group is rebalancing, mirroring the
  existing heartbeat fix — otherwise Sarama's `Consume()` tears down
  on the stale SyncGroup instead of retrying.
- Leader SyncGroup verifies its assignment covers every current
  member and rejects with `REBALANCE_IN_PROGRESS` otherwise, as a
  belt-and-suspenders catch for joins that slip in between the
  leader's JoinGroup reply and its SyncGroup without going through
  `CompletingRebalance` state.

Verified: baseline reliably reproduces the orphan locally; with the
fix `TestConsumerGroups` passes end-to-end (53s total,
`MultipleConsumersJoin` 15-17s) and a 10-iteration stress loop against
the same gateway is 10/10 green with every consumer getting exactly
one partition.
This commit is contained in:
Chris Lu
2026-04-20 14:39:37 -07:00
parent 86c5e815d2
commit 6eb3bc46bd

View File

@@ -152,7 +152,25 @@ func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID
group.State = consumer.GroupStateCompletingRebalance
}
case consumer.GroupStateCompletingRebalance:
// Allow join but don't change generation until SyncGroup
// A new member joining here — after the leader has already taken its
// member-list snapshot in its JoinGroup response — means the
// leader's upcoming SyncGroup will omit this member. That leaves it
// with an empty Assignment when the group goes Stable, and its own
// SyncGroup then silently serves the empty assignment (the
// CI-observed orphan). Pre-empt that: bump the generation so the
// leader's in-flight SyncGroup fails its generation check and the
// join cycle restarts with the new member in the leader's snapshot.
// (handleSyncGroup also catches this at commit time as a
// belt-and-suspenders check in case a new member slipped in between
// the leader's JoinGroup reply and its SyncGroup without hitting
// CompletingRebalance state here.)
if isNewMember {
group.State = consumer.GroupStatePreparingRebalance
group.Generation++
for _, m := range group.Members {
m.State = consumer.MemberStatePending
}
}
case consumer.GroupStateDead:
return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil
}
@@ -879,9 +897,20 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID, apiVersion), nil
}
// Validate generation
// Validate generation. When a late joiner forces us back to
// PreparingRebalance mid-round, the in-flight leader's SyncGroup carries
// the stale generation. Returning ILLEGAL_GENERATION here would tear down
// Sarama's Consume() (it does not re-enter on that error); returning
// REBALANCE_IN_PROGRESS while the group is rebalancing lets Sarama's
// newSession retry the join/sync cycle at the new generation, mirroring
// the handling in handleHeartbeat.
if request.GenerationID != group.Generation {
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeIllegalGeneration, apiVersion), nil
switch group.State {
case consumer.GroupStatePreparingRebalance, consumer.GroupStateCompletingRebalance:
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeRebalanceInProgress, apiVersion), nil
default:
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeIllegalGeneration, apiVersion), nil
}
}
// Check if this is the group leader with assignments
@@ -889,7 +918,41 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque
request.MemberID, group.Leader, group.State, len(request.GroupAssignments) > 0, len(group.Members), request.GenerationID)
if request.MemberID == group.Leader && len(request.GroupAssignments) > 0 {
// Leader is providing assignments - process and store them
// Leader is providing assignments - process and store them.
// Before committing, verify the leader's assignment covers every
// current member. A late joiner can arrive either during the
// leader's JoinGroup call (added to group.Members but not in the
// response the leader already built) or during the narrow window
// between the leader's JoinGroup reply and this SyncGroup. In both
// cases the missing member's ID won't appear in GroupAssignments;
// committing now would leave it with an empty Assignment, and once
// state goes Stable its own SyncGroup would silently serve that
// empty assignment (the CI-observed orphan).
//
// Reject instead: bump the generation, reset state to
// PreparingRebalance, and return REBALANCE_IN_PROGRESS so Sarama's
// newSession retries the join/sync cycle at the new generation
// with the complete member list in the leader's JoinGroup response.
// The stale SyncGroup that the leader is still trying to deliver
// will hit the generation-mismatch branch above and also retry.
assigned := make(map[string]bool, len(request.GroupAssignments))
for _, ga := range request.GroupAssignments {
assigned[ga.MemberID] = true
}
for mid := range group.Members {
if !assigned[mid] {
glog.V(1).Infof("[SYNCGROUP] Leader %s assignment omits member %s (late joiner); forcing rebalance",
request.MemberID, mid)
group.State = consumer.GroupStatePreparingRebalance
group.Generation++
for _, m := range group.Members {
m.State = consumer.MemberStatePending
}
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeRebalanceInProgress, apiVersion), nil
}
}
glog.V(2).Infof("[SYNCGROUP] Leader %s providing client-side assignments for group %s (%d assignments)",
request.MemberID, request.GroupID, len(request.GroupAssignments))
err = h.processGroupAssignments(group, request.GroupAssignments)