fix(kafka): clear stale Assignment when restarting a rebalance round

Review spot: the two restart paths added in the previous commit bumped
group.Generation and reset each member's State to Pending but left
member.Assignment populated with the prior generation's partitions.

The non-leader SyncGroup path only returns REBALANCE_IN_PROGRESS when
`member.Assignment` is empty (handleSyncGroup ~line 982). Leaving the
stale assignment in place means a member rejoining at the new
generation — before the leader's SyncGroup has published fresh
assignments — falls through that guard and is served its old
partitions from the pre-rebalance state.

Clear m.Assignment alongside m.State in both restart sites so the
guard fires and the member correctly re-enters the join/sync cycle.

Verified with a fresh-broker TestConsumerGroups run: 50.99s total,
MultipleConsumersJoin 15.25s, all four consumers each get exactly one
partition.
This commit is contained in:
Chris Lu
2026-04-20 15:25:51 -07:00
parent 6eb3bc46bd
commit 8ae77f92ec

View File

@@ -167,8 +167,15 @@ func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID
if isNewMember {
group.State = consumer.GroupStatePreparingRebalance
group.Generation++
// Clear prior-generation assignments. The non-leader SyncGroup
// path in handleSyncGroup only returns REBALANCE_IN_PROGRESS
// when `member.Assignment` is empty; without clearing, a
// member rejoining at the new generation would be served its
// stale old-generation partitions from the pre-rebalance
// state.
for _, m := range group.Members {
m.State = consumer.MemberStatePending
m.Assignment = nil
}
}
case consumer.GroupStateDead:
@@ -946,8 +953,14 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque
request.MemberID, mid)
group.State = consumer.GroupStatePreparingRebalance
group.Generation++
// Clear prior-generation assignments so the non-leader
// SyncGroup path (which only returns REBALANCE_IN_PROGRESS
// when member.Assignment is empty) doesn't serve stale
// partitions between now and the next successful
// leader SyncGroup.
for _, m := range group.Members {
m.State = consumer.MemberStatePending
m.Assignment = nil
}
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeRebalanceInProgress, apiVersion), nil
}