fix(kafka): close late-joiner orphan race in consumer-group rebalance (#9162)

* fix(kafka): close late-joiner orphan race in consumer-group rebalance

The CI-observed orphan (one consumer with empty assignment after
`TestConsumerGroups/Rebalancing/MultipleConsumersJoin`) came from a
race in the group coordinator: once the leader had taken its member-
list snapshot in its JoinGroup response, a new member could still
arrive before the leader's SyncGroup landed. The gateway accepted the
stale SyncGroup, moved to Stable, and the late joiner's own SyncGroup
then served an empty Assignment from the Stable-state path — leaving
it silently unassigned with no further rebalance to fix it.

Three changes in `handleJoinGroup` / `handleSyncGroup` close the race:

- Late join during `CompletingRebalance` bumps the generation and
  resets to `PreparingRebalance`, so the leader's in-flight SyncGroup
  fails its generation check and the round restarts with the new
  member in the snapshot.
- SyncGroup generation-mismatch returns `REBALANCE_IN_PROGRESS` (not
  `ILLEGAL_GENERATION`) while the group is rebalancing, mirroring the
  existing heartbeat fix — otherwise Sarama's `Consume()` tears down
  on the stale SyncGroup instead of retrying.
- Leader SyncGroup verifies its assignment covers every current
  member and rejects with `REBALANCE_IN_PROGRESS` otherwise, as a
  belt-and-suspenders catch for joins that slip in between the
  leader's JoinGroup reply and its SyncGroup without going through
  `CompletingRebalance` state.

Verified: baseline reliably reproduces the orphan locally; with the
fix `TestConsumerGroups` passes end-to-end (53s total,
`MultipleConsumersJoin` 15-17s) and a 10-iteration stress loop against
the same gateway is 10/10 green with every consumer getting exactly
one partition.

* fix(kafka): clear stale Assignment when restarting a rebalance round

Review spot: the two restart paths added in the previous commit bumped
group.Generation and reset each member's State to Pending but left
member.Assignment populated with the prior generation's partitions.

The non-leader SyncGroup path only returns REBALANCE_IN_PROGRESS when
`member.Assignment` is empty (handleSyncGroup ~line 982). Leaving the
stale assignment in place means a member rejoining at the new
generation — before the leader's SyncGroup has published fresh
assignments — falls through that guard and is served its old
partitions from the pre-rebalance state.

Clear m.Assignment alongside m.State in both restart sites so the
guard fires and the member correctly re-enters the join/sync cycle.

Verified with a fresh-broker TestConsumerGroups run: 50.99s total,
MultipleConsumersJoin 15.25s, all four consumers each get exactly one
partition.

* fix(kafka): don't let empty leader assignments bypass coverage check

Review spot: the leader-assignment branch was gated on
`len(request.GroupAssignments) > 0`, so a leader SyncGroup that omitted
every current member (empty array with a non-empty group) fell through
to the server-side-assignment `else` branch and could move the group
Stable without the intended rebalance retry.

Drop the length guard. Whenever the caller is the leader, build the
assigned-member map and run the coverage check; if the assignment
omits any current member (including the all-empty case against a
non-empty group), bump the generation, reset to PreparingRebalance,
clear each member's Assignment, and return REBALANCE_IN_PROGRESS so
the leader rebuilds its snapshot and sends a complete assignment on
retry. The server-side-assignment branch (documented as "should not
happen with Sarama") is now only reachable for non-leader+non-empty
SyncGroups — a genuinely unexpected case — and keeps its existing
warning.

* revert: keep len(GroupAssignments) > 0 gate on leader-assign branch

The previous commit (797f4f779) dropped the len(request.GroupAssignments)
> 0 guard on the leader-branch so that an empty-assignments-with-
non-empty-members leader SyncGroup would be forced through the coverage
check. Confluent Schema Registry's SchemaRegistryCoordinator, however,
uses a server-side-assignment protocol and by design sends leader
SyncGroup with an empty GroupAssignments array; dropping the gate put
the schema-registry group into a REBALANCE_IN_PROGRESS rejoin storm
(generation 84000+ observed in the Kafka Quick Test / Load Test with
Schema Registry CI job against PR #9162).

Restore the gate and document why it's load-bearing. The original
CodeRabbit concern (empty leader assignment from a client-side protocol
accidentally bypassing the coverage check) is theoretical — no
real client-side-assignment client sends empty leader assignments — and
the server-side-assignment else-branch is how schema-registry is
supposed to be served.

TestConsumerGroups still passes end-to-end (52.97s fresh-broker,
MultipleConsumersJoin 17.26s, all 4 consumers get exactly one
partition).

* fix(kafka): parse SyncGroup v5 protocol fields; skip partition decode for schema-registry

Two issues surfaced after PR #9162's coverage check was re-gated on
non-empty GroupAssignments:

1. parseSyncGroupRequest was stopping after GroupInstanceID even though
   SyncGroup v5+ (the version Confluent Schema Registry uses) inserts
   ProtocolType and ProtocolName strings before the assignments array.
   The old parser read the protocol strings' compact-string length
   prefixes as assignments-array length and either failed or came back
   with bogus assignment entries. Parse v5 flexible protocol fields
   explicitly and add them to SyncGroupRequest.

2. The schema-registry leader's assignment payload is the SR JSON
   leader-identity blob, not ConsumerGroupMemberAssignment partition
   bytes. processGroupAssignments would parse it as partition bytes
   and either fail or corrupt member.Assignment. Special-case the
   schema-registry group in the leader-assign branch: skip
   processGroupAssignments, clear member.Assignment so
   serializeSchemaRegistryAssignment rebuilds the response from the
   elected leader's JoinGroup metadata, and transition to Stable.

Adds two unit tests: one asserts the v5 parser pulls the protocol
fields out correctly, the other drives the full handleSyncGroup path
for a schema-registry leader and asserts the group reaches Stable
without a partition-decode error.
This commit is contained in:
Chris Lu
2026-04-20 18:44:16 -07:00
committed by GitHub
parent 49f62df2cf
commit 0007245c7f
2 changed files with 264 additions and 8 deletions

View File

@@ -152,7 +152,32 @@ func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID
group.State = consumer.GroupStateCompletingRebalance
}
case consumer.GroupStateCompletingRebalance:
// Allow join but don't change generation until SyncGroup
// A new member joining here — after the leader has already taken its
// member-list snapshot in its JoinGroup response — means the
// leader's upcoming SyncGroup will omit this member. That leaves it
// with an empty Assignment when the group goes Stable, and its own
// SyncGroup then silently serves the empty assignment (the
// CI-observed orphan). Pre-empt that: bump the generation so the
// leader's in-flight SyncGroup fails its generation check and the
// join cycle restarts with the new member in the leader's snapshot.
// (handleSyncGroup also catches this at commit time as a
// belt-and-suspenders check in case a new member slipped in between
// the leader's JoinGroup reply and its SyncGroup without hitting
// CompletingRebalance state here.)
if isNewMember {
group.State = consumer.GroupStatePreparingRebalance
group.Generation++
// Clear prior-generation assignments. The non-leader SyncGroup
// path in handleSyncGroup only returns REBALANCE_IN_PROGRESS
// when `member.Assignment` is empty; without clearing, a
// member rejoining at the new generation would be served its
// stale old-generation partitions from the pre-rebalance
// state.
for _, m := range group.Members {
m.State = consumer.MemberStatePending
m.Assignment = nil
}
}
case consumer.GroupStateDead:
return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil
}
@@ -829,6 +854,8 @@ type SyncGroupRequest struct {
GenerationID int32
MemberID string
GroupInstanceID string
ProtocolType string
ProtocolName string
GroupAssignments []GroupAssignment // Only from group leader
}
@@ -879,9 +906,20 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID, apiVersion), nil
}
// Validate generation
// Validate generation. When a late joiner forces us back to
// PreparingRebalance mid-round, the in-flight leader's SyncGroup carries
// the stale generation. Returning ILLEGAL_GENERATION here would tear down
// Sarama's Consume() (it does not re-enter on that error); returning
// REBALANCE_IN_PROGRESS while the group is rebalancing lets Sarama's
// newSession retry the join/sync cycle at the new generation, mirroring
// the handling in handleHeartbeat.
if request.GenerationID != group.Generation {
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeIllegalGeneration, apiVersion), nil
switch group.State {
case consumer.GroupStatePreparingRebalance, consumer.GroupStateCompletingRebalance:
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeRebalanceInProgress, apiVersion), nil
default:
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeIllegalGeneration, apiVersion), nil
}
}
// Check if this is the group leader with assignments
@@ -889,13 +927,72 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque
request.MemberID, group.Leader, group.State, len(request.GroupAssignments) > 0, len(group.Members), request.GenerationID)
if request.MemberID == group.Leader && len(request.GroupAssignments) > 0 {
// Leader is providing assignments - process and store them
// Leader is providing assignments - process and store them.
// Note: the len(...) > 0 gate matters. Schema Registry's
// SchemaRegistryCoordinator uses a server-side-assignment
// protocol and sends leader SyncGroup with an empty
// GroupAssignments array by design; that case has to fall
// through to the server-side-assignment else-branch below, not
// be treated as a missing-member situation. Dropping the gate
// here puts the schema-registry group into an infinite
// REBALANCE_IN_PROGRESS / rejoin loop (observed in the kafka
// loadtest CI job).
// Before committing, verify the leader's assignment covers every
// current member. A late joiner can arrive either during the
// leader's JoinGroup call (added to group.Members but not in the
// response the leader already built) or during the narrow window
// between the leader's JoinGroup reply and this SyncGroup. In both
// cases the missing member's ID won't appear in GroupAssignments;
// committing now would leave it with an empty Assignment, and once
// state goes Stable its own SyncGroup would silently serve that
// empty assignment (the CI-observed orphan).
//
// Reject instead: bump the generation, reset state to
// PreparingRebalance, and return REBALANCE_IN_PROGRESS so Sarama's
// newSession retries the join/sync cycle at the new generation
// with the complete member list in the leader's JoinGroup response.
// The stale SyncGroup that the leader is still trying to deliver
// will hit the generation-mismatch branch above and also retry.
assigned := make(map[string]bool, len(request.GroupAssignments))
for _, ga := range request.GroupAssignments {
assigned[ga.MemberID] = true
}
for mid := range group.Members {
if !assigned[mid] {
glog.V(1).Infof("[SYNCGROUP] Leader %s assignment omits member %s (late joiner); forcing rebalance",
request.MemberID, mid)
group.State = consumer.GroupStatePreparingRebalance
group.Generation++
// Clear prior-generation assignments so the non-leader
// SyncGroup path (which only returns REBALANCE_IN_PROGRESS
// when member.Assignment is empty) doesn't serve stale
// partitions between now and the next successful
// leader SyncGroup.
for _, m := range group.Members {
m.State = consumer.MemberStatePending
m.Assignment = nil
}
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeRebalanceInProgress, apiVersion), nil
}
}
glog.V(2).Infof("[SYNCGROUP] Leader %s providing client-side assignments for group %s (%d assignments)",
request.MemberID, request.GroupID, len(request.GroupAssignments))
err = h.processGroupAssignments(group, request.GroupAssignments)
if err != nil {
glog.Errorf("[SYNCGROUP] ERROR processing leader assignments: %v", err)
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInconsistentGroupProtocol, apiVersion), nil
if request.GroupID == "schema-registry" {
// Schema Registry's group assignment payload is its own leader-election
// data, not ConsumerGroupMemberAssignment partition bytes. We only need
// the group to become stable; serializeSchemaRegistryAssignment builds
// the response from the elected leader's JoinGroup metadata below.
for _, m := range group.Members {
m.Assignment = nil
}
} else {
err = h.processGroupAssignments(group, request.GroupAssignments)
if err != nil {
glog.Errorf("[SYNCGROUP] ERROR processing leader assignments: %v", err)
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInconsistentGroupProtocol, apiVersion), nil
}
}
// Move group to stable state
@@ -1079,6 +1176,26 @@ func (h *Handler) parseSyncGroupRequest(data []byte, apiVersion uint16) (*SyncGr
// Parse assignments array if present (leader sends assignments)
assignments := make([]GroupAssignment, 0)
var protocolType string
var protocolName string
if apiVersion >= 5 {
if isFlexible {
var consumed int
var decodeErr error
protocolType, consumed, decodeErr = DecodeFlexibleString(data[offset:])
if decodeErr != nil {
return nil, fmt.Errorf("invalid protocol type compact string: %w", decodeErr)
}
offset += consumed
protocolName, consumed, decodeErr = DecodeFlexibleString(data[offset:])
if decodeErr != nil {
return nil, fmt.Errorf("invalid protocol name compact string: %w", decodeErr)
}
offset += consumed
}
}
if offset < len(data) {
var assignmentsCount uint32
if isFlexible {
@@ -1187,6 +1304,8 @@ func (h *Handler) parseSyncGroupRequest(data []byte, apiVersion uint16) (*SyncGr
GenerationID: generationID,
MemberID: memberID,
GroupInstanceID: groupInstanceID,
ProtocolType: protocolType,
ProtocolName: protocolName,
GroupAssignments: assignments,
}, nil
}

View File

@@ -1,7 +1,11 @@
package protocol
import (
"bytes"
"encoding/binary"
"testing"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
)
// TestSyncGroup_RaceCondition_BugDocumentation documents the original race condition bug
@@ -123,3 +127,136 @@ func TestSyncGroup_FixVerification(t *testing.T) {
})
}
}
func TestParseSyncGroupRequestV5SkipsProtocolFields(t *testing.T) {
memberID := "consumer-schema-registry"
requestBody := buildSyncGroupV5Request("schema-registry", 12, memberID, "consumer", "range", []GroupAssignment{
{MemberID: memberID, Assignment: []byte(`{"version":1,"error":0}`)},
})
request, err := (&Handler{}).parseSyncGroupRequest(requestBody, 5)
if err != nil {
t.Fatalf("parseSyncGroupRequest failed: %v", err)
}
if request.GroupID != "schema-registry" {
t.Fatalf("GroupID = %q, want schema-registry", request.GroupID)
}
if request.GenerationID != 12 {
t.Fatalf("GenerationID = %d, want 12", request.GenerationID)
}
if request.MemberID != memberID {
t.Fatalf("MemberID = %q, want %q", request.MemberID, memberID)
}
if request.ProtocolType != "consumer" {
t.Fatalf("ProtocolType = %q, want consumer", request.ProtocolType)
}
if request.ProtocolName != "range" {
t.Fatalf("ProtocolName = %q, want range", request.ProtocolName)
}
if len(request.GroupAssignments) != 1 {
t.Fatalf("len(GroupAssignments) = %d, want 1", len(request.GroupAssignments))
}
if request.GroupAssignments[0].MemberID != memberID {
t.Fatalf("assignment member = %q, want %q", request.GroupAssignments[0].MemberID, memberID)
}
if string(request.GroupAssignments[0].Assignment) != `{"version":1,"error":0}` {
t.Fatalf("assignment payload = %q", string(request.GroupAssignments[0].Assignment))
}
}
func TestSyncGroupSchemaRegistryV5LeaderAssignmentDoesNotRebalance(t *testing.T) {
memberID := "consumer-schema-registry"
handler := &Handler{
groupCoordinator: consumer.NewGroupCoordinator(),
defaultPartitions: 1,
}
t.Cleanup(handler.groupCoordinator.Close)
group := handler.groupCoordinator.GetOrCreateGroup("schema-registry")
group.Mu.Lock()
group.State = consumer.GroupStateCompletingRebalance
group.Generation = 7
group.Protocol = consumer.ProtocolNameRange
group.Leader = memberID
group.Members[memberID] = &consumer.GroupMember{
ID: memberID,
State: consumer.MemberStatePending,
Metadata: []byte(`{
"host":"schema-registry",
"port":8081,
"scheme":"http",
"version":1,
"master_eligibility":true
}`),
}
group.Mu.Unlock()
requestBody := buildSyncGroupV5Request("schema-registry", 7, memberID, "consumer", "range", []GroupAssignment{
{MemberID: memberID, Assignment: []byte(`{"version":1,"error":0}`)},
})
response, err := handler.handleSyncGroup(99, 5, requestBody)
if err != nil {
t.Fatalf("handleSyncGroup failed: %v", err)
}
if code := syncGroupResponseErrorCode(t, response, 5); code != ErrorCodeNone {
t.Fatalf("SyncGroup error code = %d, want %d", code, ErrorCodeNone)
}
if !bytes.Contains(response, []byte(`"master":"`+memberID+`"`)) {
t.Fatalf("Schema Registry assignment response does not name leader %q: %q", memberID, string(response))
}
group.Mu.RLock()
defer group.Mu.RUnlock()
if group.Generation != 7 {
t.Fatalf("group generation = %d, want 7", group.Generation)
}
if group.State != consumer.GroupStateStable {
t.Fatalf("group state = %s, want Stable", group.State)
}
if group.Members[memberID].State != consumer.MemberStateStable {
t.Fatalf("member state = %s, want Stable", group.Members[memberID].State)
}
}
func buildSyncGroupV5Request(groupID string, generation int32, memberID, protocolType, protocolName string, assignments []GroupAssignment) []byte {
var data []byte
data = appendCompactString(data, groupID)
var generationBytes [4]byte
binary.BigEndian.PutUint32(generationBytes[:], uint32(generation))
data = append(data, generationBytes[:]...)
data = appendCompactString(data, memberID)
data = append(data, 0) // group_instance_id = null
data = appendCompactString(data, protocolType)
data = appendCompactString(data, protocolName)
data = append(data, CompactArrayLength(uint32(len(assignments)))...)
for _, assignment := range assignments {
data = appendCompactString(data, assignment.MemberID)
data = append(data, CompactStringLength(len(assignment.Assignment))...)
data = append(data, assignment.Assignment...)
data = append(data, 0) // assignment tagged fields
}
data = append(data, 0) // request tagged fields
return data
}
func appendCompactString(data []byte, value string) []byte {
data = append(data, CompactStringLength(len(value))...)
return append(data, value...)
}
func syncGroupResponseErrorCode(t *testing.T, response []byte, apiVersion uint16) int16 {
t.Helper()
offset := 0
if apiVersion >= 1 {
offset += 4
}
if len(response) < offset+2 {
t.Fatalf("SyncGroup response too short: %d bytes", len(response))
}
return int16(binary.BigEndian.Uint16(response[offset : offset+2]))
}