diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index c35e74b06..52131167d 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -167,8 +167,15 @@ func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID if isNewMember { group.State = consumer.GroupStatePreparingRebalance group.Generation++ + // Clear prior-generation assignments. The non-leader SyncGroup + // path in handleSyncGroup only returns REBALANCE_IN_PROGRESS + // when `member.Assignment` is empty; without clearing, a + // member rejoining at the new generation would be served its + // stale old-generation partitions from the pre-rebalance + // state. for _, m := range group.Members { m.State = consumer.MemberStatePending + m.Assignment = nil } } case consumer.GroupStateDead: @@ -946,8 +953,14 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque request.MemberID, mid) group.State = consumer.GroupStatePreparingRebalance group.Generation++ + // Clear prior-generation assignments so the non-leader + // SyncGroup path (which only returns REBALANCE_IN_PROGRESS + // when member.Assignment is empty) doesn't serve stale + // partitions between now and the next successful + // leader SyncGroup. for _, m := range group.Members { m.State = consumer.MemberStatePending + m.Assignment = nil } return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeRebalanceInProgress, apiVersion), nil }