diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index 7a939ea19..c35e74b06 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -152,7 +152,25 @@ func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID group.State = consumer.GroupStateCompletingRebalance } case consumer.GroupStateCompletingRebalance: - // Allow join but don't change generation until SyncGroup + // A new member joining here — after the leader has already taken its + // member-list snapshot in its JoinGroup response — means the + // leader's upcoming SyncGroup will omit this member. That leaves it + // with an empty Assignment when the group goes Stable, and its own + // SyncGroup then silently serves the empty assignment (the + // CI-observed orphan). Pre-empt that: bump the generation so the + // leader's in-flight SyncGroup fails its generation check and the + // join cycle restarts with the new member in the leader's snapshot. + // (handleSyncGroup also catches this at commit time as a + // belt-and-suspenders check in case a new member slipped in between + // the leader's JoinGroup reply and its SyncGroup without hitting + // CompletingRebalance state here.) + if isNewMember { + group.State = consumer.GroupStatePreparingRebalance + group.Generation++ + for _, m := range group.Members { + m.State = consumer.MemberStatePending + } + } case consumer.GroupStateDead: return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil } @@ -879,9 +897,20 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID, apiVersion), nil } - // Validate generation + // Validate generation. When a late joiner forces us back to + // PreparingRebalance mid-round, the in-flight leader's SyncGroup carries + // the stale generation. Returning ILLEGAL_GENERATION here would tear down + // Sarama's Consume() (it does not re-enter on that error); returning + // REBALANCE_IN_PROGRESS while the group is rebalancing lets Sarama's + // newSession retry the join/sync cycle at the new generation, mirroring + // the handling in handleHeartbeat. if request.GenerationID != group.Generation { - return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeIllegalGeneration, apiVersion), nil + switch group.State { + case consumer.GroupStatePreparingRebalance, consumer.GroupStateCompletingRebalance: + return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeRebalanceInProgress, apiVersion), nil + default: + return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeIllegalGeneration, apiVersion), nil + } } // Check if this is the group leader with assignments @@ -889,7 +918,41 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque request.MemberID, group.Leader, group.State, len(request.GroupAssignments) > 0, len(group.Members), request.GenerationID) if request.MemberID == group.Leader && len(request.GroupAssignments) > 0 { - // Leader is providing assignments - process and store them + // Leader is providing assignments - process and store them. + + // Before committing, verify the leader's assignment covers every + // current member. A late joiner can arrive either during the + // leader's JoinGroup call (added to group.Members but not in the + // response the leader already built) or during the narrow window + // between the leader's JoinGroup reply and this SyncGroup. In both + // cases the missing member's ID won't appear in GroupAssignments; + // committing now would leave it with an empty Assignment, and once + // state goes Stable its own SyncGroup would silently serve that + // empty assignment (the CI-observed orphan). + // + // Reject instead: bump the generation, reset state to + // PreparingRebalance, and return REBALANCE_IN_PROGRESS so Sarama's + // newSession retries the join/sync cycle at the new generation + // with the complete member list in the leader's JoinGroup response. + // The stale SyncGroup that the leader is still trying to deliver + // will hit the generation-mismatch branch above and also retry. + assigned := make(map[string]bool, len(request.GroupAssignments)) + for _, ga := range request.GroupAssignments { + assigned[ga.MemberID] = true + } + for mid := range group.Members { + if !assigned[mid] { + glog.V(1).Infof("[SYNCGROUP] Leader %s assignment omits member %s (late joiner); forcing rebalance", + request.MemberID, mid) + group.State = consumer.GroupStatePreparingRebalance + group.Generation++ + for _, m := range group.Members { + m.State = consumer.MemberStatePending + } + return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeRebalanceInProgress, apiVersion), nil + } + } + glog.V(2).Infof("[SYNCGROUP] Leader %s providing client-side assignments for group %s (%d assignments)", request.MemberID, request.GroupID, len(request.GroupAssignments)) err = h.processGroupAssignments(group, request.GroupAssignments)