diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index 7a939ea19..918d57470 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -152,7 +152,32 @@ func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID group.State = consumer.GroupStateCompletingRebalance } case consumer.GroupStateCompletingRebalance: - // Allow join but don't change generation until SyncGroup + // A new member joining here — after the leader has already taken its + // member-list snapshot in its JoinGroup response — means the + // leader's upcoming SyncGroup will omit this member. That leaves it + // with an empty Assignment when the group goes Stable, and its own + // SyncGroup then silently serves the empty assignment (the + // CI-observed orphan). Pre-empt that: bump the generation so the + // leader's in-flight SyncGroup fails its generation check and the + // join cycle restarts with the new member in the leader's snapshot. + // (handleSyncGroup also catches this at commit time as a + // belt-and-suspenders check in case a new member slipped in between + // the leader's JoinGroup reply and its SyncGroup without hitting + // CompletingRebalance state here.) + if isNewMember { + group.State = consumer.GroupStatePreparingRebalance + group.Generation++ + // Clear prior-generation assignments. The non-leader SyncGroup + // path in handleSyncGroup only returns REBALANCE_IN_PROGRESS + // when `member.Assignment` is empty; without clearing, a + // member rejoining at the new generation would be served its + // stale old-generation partitions from the pre-rebalance + // state. + for _, m := range group.Members { + m.State = consumer.MemberStatePending + m.Assignment = nil + } + } case consumer.GroupStateDead: return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil } @@ -829,6 +854,8 @@ type SyncGroupRequest struct { GenerationID int32 MemberID string GroupInstanceID string + ProtocolType string + ProtocolName string GroupAssignments []GroupAssignment // Only from group leader } @@ -879,9 +906,20 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID, apiVersion), nil } - // Validate generation + // Validate generation. When a late joiner forces us back to + // PreparingRebalance mid-round, the in-flight leader's SyncGroup carries + // the stale generation. Returning ILLEGAL_GENERATION here would tear down + // Sarama's Consume() (it does not re-enter on that error); returning + // REBALANCE_IN_PROGRESS while the group is rebalancing lets Sarama's + // newSession retry the join/sync cycle at the new generation, mirroring + // the handling in handleHeartbeat. if request.GenerationID != group.Generation { - return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeIllegalGeneration, apiVersion), nil + switch group.State { + case consumer.GroupStatePreparingRebalance, consumer.GroupStateCompletingRebalance: + return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeRebalanceInProgress, apiVersion), nil + default: + return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeIllegalGeneration, apiVersion), nil + } } // Check if this is the group leader with assignments @@ -889,13 +927,72 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque request.MemberID, group.Leader, group.State, len(request.GroupAssignments) > 0, len(group.Members), request.GenerationID) if request.MemberID == group.Leader && len(request.GroupAssignments) > 0 { - // Leader is providing assignments - process and store them + // Leader is providing assignments - process and store them. + // Note: the len(...) > 0 gate matters. Schema Registry's + // SchemaRegistryCoordinator uses a server-side-assignment + // protocol and sends leader SyncGroup with an empty + // GroupAssignments array by design; that case has to fall + // through to the server-side-assignment else-branch below, not + // be treated as a missing-member situation. Dropping the gate + // here puts the schema-registry group into an infinite + // REBALANCE_IN_PROGRESS / rejoin loop (observed in the kafka + // loadtest CI job). + + // Before committing, verify the leader's assignment covers every + // current member. A late joiner can arrive either during the + // leader's JoinGroup call (added to group.Members but not in the + // response the leader already built) or during the narrow window + // between the leader's JoinGroup reply and this SyncGroup. In both + // cases the missing member's ID won't appear in GroupAssignments; + // committing now would leave it with an empty Assignment, and once + // state goes Stable its own SyncGroup would silently serve that + // empty assignment (the CI-observed orphan). + // + // Reject instead: bump the generation, reset state to + // PreparingRebalance, and return REBALANCE_IN_PROGRESS so Sarama's + // newSession retries the join/sync cycle at the new generation + // with the complete member list in the leader's JoinGroup response. + // The stale SyncGroup that the leader is still trying to deliver + // will hit the generation-mismatch branch above and also retry. + assigned := make(map[string]bool, len(request.GroupAssignments)) + for _, ga := range request.GroupAssignments { + assigned[ga.MemberID] = true + } + for mid := range group.Members { + if !assigned[mid] { + glog.V(1).Infof("[SYNCGROUP] Leader %s assignment omits member %s (late joiner); forcing rebalance", + request.MemberID, mid) + group.State = consumer.GroupStatePreparingRebalance + group.Generation++ + // Clear prior-generation assignments so the non-leader + // SyncGroup path (which only returns REBALANCE_IN_PROGRESS + // when member.Assignment is empty) doesn't serve stale + // partitions between now and the next successful + // leader SyncGroup. + for _, m := range group.Members { + m.State = consumer.MemberStatePending + m.Assignment = nil + } + return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeRebalanceInProgress, apiVersion), nil + } + } + glog.V(2).Infof("[SYNCGROUP] Leader %s providing client-side assignments for group %s (%d assignments)", request.MemberID, request.GroupID, len(request.GroupAssignments)) - err = h.processGroupAssignments(group, request.GroupAssignments) - if err != nil { - glog.Errorf("[SYNCGROUP] ERROR processing leader assignments: %v", err) - return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInconsistentGroupProtocol, apiVersion), nil + if request.GroupID == "schema-registry" { + // Schema Registry's group assignment payload is its own leader-election + // data, not ConsumerGroupMemberAssignment partition bytes. We only need + // the group to become stable; serializeSchemaRegistryAssignment builds + // the response from the elected leader's JoinGroup metadata below. + for _, m := range group.Members { + m.Assignment = nil + } + } else { + err = h.processGroupAssignments(group, request.GroupAssignments) + if err != nil { + glog.Errorf("[SYNCGROUP] ERROR processing leader assignments: %v", err) + return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInconsistentGroupProtocol, apiVersion), nil + } } // Move group to stable state @@ -1079,6 +1176,26 @@ func (h *Handler) parseSyncGroupRequest(data []byte, apiVersion uint16) (*SyncGr // Parse assignments array if present (leader sends assignments) assignments := make([]GroupAssignment, 0) + var protocolType string + var protocolName string + if apiVersion >= 5 { + if isFlexible { + var consumed int + var decodeErr error + protocolType, consumed, decodeErr = DecodeFlexibleString(data[offset:]) + if decodeErr != nil { + return nil, fmt.Errorf("invalid protocol type compact string: %w", decodeErr) + } + offset += consumed + + protocolName, consumed, decodeErr = DecodeFlexibleString(data[offset:]) + if decodeErr != nil { + return nil, fmt.Errorf("invalid protocol name compact string: %w", decodeErr) + } + offset += consumed + } + } + if offset < len(data) { var assignmentsCount uint32 if isFlexible { @@ -1187,6 +1304,8 @@ func (h *Handler) parseSyncGroupRequest(data []byte, apiVersion uint16) (*SyncGr GenerationID: generationID, MemberID: memberID, GroupInstanceID: groupInstanceID, + ProtocolType: protocolType, + ProtocolName: protocolName, GroupAssignments: assignments, }, nil } diff --git a/weed/mq/kafka/protocol/syncgroup_assignment_test.go b/weed/mq/kafka/protocol/syncgroup_assignment_test.go index ed1da3771..1805e078a 100644 --- a/weed/mq/kafka/protocol/syncgroup_assignment_test.go +++ b/weed/mq/kafka/protocol/syncgroup_assignment_test.go @@ -1,7 +1,11 @@ package protocol import ( + "bytes" + "encoding/binary" "testing" + + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer" ) // TestSyncGroup_RaceCondition_BugDocumentation documents the original race condition bug @@ -123,3 +127,136 @@ func TestSyncGroup_FixVerification(t *testing.T) { }) } } + +func TestParseSyncGroupRequestV5SkipsProtocolFields(t *testing.T) { + memberID := "consumer-schema-registry" + requestBody := buildSyncGroupV5Request("schema-registry", 12, memberID, "consumer", "range", []GroupAssignment{ + {MemberID: memberID, Assignment: []byte(`{"version":1,"error":0}`)}, + }) + + request, err := (&Handler{}).parseSyncGroupRequest(requestBody, 5) + if err != nil { + t.Fatalf("parseSyncGroupRequest failed: %v", err) + } + + if request.GroupID != "schema-registry" { + t.Fatalf("GroupID = %q, want schema-registry", request.GroupID) + } + if request.GenerationID != 12 { + t.Fatalf("GenerationID = %d, want 12", request.GenerationID) + } + if request.MemberID != memberID { + t.Fatalf("MemberID = %q, want %q", request.MemberID, memberID) + } + if request.ProtocolType != "consumer" { + t.Fatalf("ProtocolType = %q, want consumer", request.ProtocolType) + } + if request.ProtocolName != "range" { + t.Fatalf("ProtocolName = %q, want range", request.ProtocolName) + } + if len(request.GroupAssignments) != 1 { + t.Fatalf("len(GroupAssignments) = %d, want 1", len(request.GroupAssignments)) + } + if request.GroupAssignments[0].MemberID != memberID { + t.Fatalf("assignment member = %q, want %q", request.GroupAssignments[0].MemberID, memberID) + } + if string(request.GroupAssignments[0].Assignment) != `{"version":1,"error":0}` { + t.Fatalf("assignment payload = %q", string(request.GroupAssignments[0].Assignment)) + } +} + +func TestSyncGroupSchemaRegistryV5LeaderAssignmentDoesNotRebalance(t *testing.T) { + memberID := "consumer-schema-registry" + handler := &Handler{ + groupCoordinator: consumer.NewGroupCoordinator(), + defaultPartitions: 1, + } + t.Cleanup(handler.groupCoordinator.Close) + + group := handler.groupCoordinator.GetOrCreateGroup("schema-registry") + group.Mu.Lock() + group.State = consumer.GroupStateCompletingRebalance + group.Generation = 7 + group.Protocol = consumer.ProtocolNameRange + group.Leader = memberID + group.Members[memberID] = &consumer.GroupMember{ + ID: memberID, + State: consumer.MemberStatePending, + Metadata: []byte(`{ + "host":"schema-registry", + "port":8081, + "scheme":"http", + "version":1, + "master_eligibility":true + }`), + } + group.Mu.Unlock() + + requestBody := buildSyncGroupV5Request("schema-registry", 7, memberID, "consumer", "range", []GroupAssignment{ + {MemberID: memberID, Assignment: []byte(`{"version":1,"error":0}`)}, + }) + + response, err := handler.handleSyncGroup(99, 5, requestBody) + if err != nil { + t.Fatalf("handleSyncGroup failed: %v", err) + } + if code := syncGroupResponseErrorCode(t, response, 5); code != ErrorCodeNone { + t.Fatalf("SyncGroup error code = %d, want %d", code, ErrorCodeNone) + } + if !bytes.Contains(response, []byte(`"master":"`+memberID+`"`)) { + t.Fatalf("Schema Registry assignment response does not name leader %q: %q", memberID, string(response)) + } + + group.Mu.RLock() + defer group.Mu.RUnlock() + if group.Generation != 7 { + t.Fatalf("group generation = %d, want 7", group.Generation) + } + if group.State != consumer.GroupStateStable { + t.Fatalf("group state = %s, want Stable", group.State) + } + if group.Members[memberID].State != consumer.MemberStateStable { + t.Fatalf("member state = %s, want Stable", group.Members[memberID].State) + } +} + +func buildSyncGroupV5Request(groupID string, generation int32, memberID, protocolType, protocolName string, assignments []GroupAssignment) []byte { + var data []byte + data = appendCompactString(data, groupID) + + var generationBytes [4]byte + binary.BigEndian.PutUint32(generationBytes[:], uint32(generation)) + data = append(data, generationBytes[:]...) + + data = appendCompactString(data, memberID) + data = append(data, 0) // group_instance_id = null + data = appendCompactString(data, protocolType) + data = appendCompactString(data, protocolName) + + data = append(data, CompactArrayLength(uint32(len(assignments)))...) + for _, assignment := range assignments { + data = appendCompactString(data, assignment.MemberID) + data = append(data, CompactStringLength(len(assignment.Assignment))...) + data = append(data, assignment.Assignment...) + data = append(data, 0) // assignment tagged fields + } + data = append(data, 0) // request tagged fields + return data +} + +func appendCompactString(data []byte, value string) []byte { + data = append(data, CompactStringLength(len(value))...) + return append(data, value...) +} + +func syncGroupResponseErrorCode(t *testing.T, response []byte, apiVersion uint16) int16 { + t.Helper() + offset := 0 + if apiVersion >= 1 { + offset += 4 + } + if len(response) < offset+2 { + t.Fatalf("SyncGroup response too short: %d bytes", len(response)) + } + return int16(binary.BigEndian.Uint16(response[offset : offset+2])) +}