Files
seaweedfs/.github/workflows/kafka-tests.yml
Chris Lu 86c5e815d2 fix(kafka): make consumer-group rebalancing work end-to-end (#9143)
* fix(kafka): make consumer-group rebalancing work end-to-end

TestConsumerGroups was failing every run since the job was added
(2026-04-17) but the failures were masked by a `|| echo ...` trailer on
the go test invocation, so the CI reported green. Removing the mask
exposes several real bugs in the gateway's group-coordinator code:

1. JoinGroup deduplicated members by ClientID, which collapsed two
   Sarama consumers that share the default ClientID ("sarama") into a
   single member slot and broke rebalancing. Key dedup off the TCP
   ConnectionID instead; keep ClientID on the member for DescribeGroup
   fidelity.

2. Every JoinGroup replaced the *GroupMember struct, wiping the
   Assignment the leader had just published in its SyncGroup and leaving
   non-leader consumers with 0 partitions after a rebalance. Update the
   existing member in place on rejoin.

3. Non-leader SyncGroup returned an empty assignment while the leader
   was mid-rebalance, so consumers silently came up with no partitions.
   Return REBALANCE_IN_PROGRESS when the group is not Stable so Sarama
   retries the join/sync cycle (4 retries x 2s backoff by default).

4. Heartbeat returned ILLEGAL_GENERATION on a gen mismatch even when
   the group was in PreparingRebalance/CompletingRebalance. Return
   REBALANCE_IN_PROGRESS in that case so the heartbeat loop cleanly
   cancels the session instead of tearing it down on a fatal error.

5. LeaveGroup parser only handled v0-v2. Sarama at V2_8_0_0 sends v3
   (Members array) by default, so the gateway silently rejected the
   request as InvalidGroupID and dead consumers stayed in the group as
   phantom leaders. Added v3 (Members array) and v4+ (flexible/compact/
   tagged-fields) parsing.

The rebalancing integration tests called Consume() once per consumer,
which cannot survive a rebalance (heartbeat RBIP cancels the session
and Consume() returns - this is documented Sarama behaviour; callers
are expected to loop). Added a runConsumeLoop helper and used it in the
four affected sub-tests. RebalanceTestHandler.Setup now overwrites
stale entries in its assignments channel so the test observes the
settled post-rebalance snapshot rather than whatever arrived first.

* fix(kafka): address PR review feedback

- JoinGroup now snapshots existing members before mutating and restores
  the snapshot on INCONSISTENT_GROUP_PROTOCOL rollback. Previously the
  rollback path always deleted the entry, corrupting group state when
  an existing member rejoined with an incompatible protocol.

- handleLeaveGroup iterates request.Members instead of processing only
  the first entry, so v3+ batch departures (KIP-345 style) correctly
  remove every listed member and build a per-member response. A single
  group-state transition runs after the loop, with leader election
  only triggered if the actual group leader was among the departures.

- Added buildLeaveGroupFlexibleResponse for v4+ clients. The parser
  already decoded flexible versions, but the response still went out in
  non-flexible encoding (4-byte array lengths, 2-byte strings, no
  tagged fields), which v4+ clients could not parse. Route flexible
  versions through the new builder; v1-v3 keep buildLeaveGroupFullResponse.

- BasicFunctionality gives each consumer its own
  ConsumerGroupHandler/ready channel. The previous shared handler
  closed ready once, so readyCount advanced to numConsumers from a
  single signal; the test could proceed without the other consumers
  actually reaching Setup.

- RebalanceTestHandler.assignments is now a size-1 channel, so readers
  always observe the most recent rebalance snapshot instead of an
  intermediate one from an earlier round.
2026-04-20 10:11:45 -07:00

826 lines
28 KiB
YAML

name: "Kafka Gateway Tests"
on:
push:
branches: [ master ]
pull_request:
branches: [ master ]
concurrency:
group: ${{ github.head_ref }}/kafka-tests
cancel-in-progress: true
# Force different runners for better isolation
env:
FORCE_RUNNER_SEPARATION: true
permissions:
contents: read
jobs:
kafka-unit-tests:
name: Kafka Unit Tests
runs-on: ubuntu-latest
timeout-minutes: 5
strategy:
fail-fast: false
matrix:
container-id: [unit-tests-1]
container:
image: golang:1.24-alpine
options: --cpus 1.0 --memory 1g --hostname kafka-unit-${{ matrix.container-id }}
env:
GOMAXPROCS: 1
CGO_ENABLED: 0
CONTAINER_ID: ${{ matrix.container-id }}
steps:
- name: Set up Go 1.x
uses: actions/setup-go@v6
with:
go-version: ^1.25
id: go
- name: Check out code
uses: actions/checkout@v6
- name: Setup Container Environment
run: |
apk add --no-cache git
ulimit -n 1024 || echo "Warning: Could not set file descriptor limit"
- name: Get dependencies
run: |
cd test/kafka
go mod download
go mod tidy
- name: Run Kafka Gateway Unit Tests
run: |
cd test/kafka
# Set process limits for container isolation
ulimit -n 512 || echo "Warning: Could not set file descriptor limit"
ulimit -u 100 || echo "Warning: Could not set process limit"
go test -v -timeout 10s ./unit/...
kafka-integration-tests:
name: Kafka Integration Tests (Critical)
runs-on: ubuntu-latest
timeout-minutes: 5
strategy:
fail-fast: false
matrix:
container-id: [integration-1]
container:
image: golang:1.24-alpine
options: --cpus 2.0 --memory 2g --ulimit nofile=1024:1024 --hostname kafka-integration-${{ matrix.container-id }}
env:
GOMAXPROCS: 2
CGO_ENABLED: 0
KAFKA_TEST_ISOLATION: "true"
CONTAINER_ID: ${{ matrix.container-id }}
steps:
- name: Set up Go 1.x
uses: actions/setup-go@v6
with:
go-version: ^1.25
id: go
- name: Check out code
uses: actions/checkout@v6
- name: Setup Integration Container Environment
run: |
apk add --no-cache git procps
ulimit -n 2048 || echo "Warning: Could not set file descriptor limit"
- name: Get dependencies
run: |
cd test/kafka
go mod download
go mod tidy
- name: Run Integration Tests
run: |
cd test/kafka
# Higher limits for integration tests
ulimit -n 1024 || echo "Warning: Could not set file descriptor limit"
ulimit -u 200 || echo "Warning: Could not set process limit"
go test -v -timeout 90s ./integration/...
env:
GOMAXPROCS: 2
kafka-e2e-tests:
name: Kafka End-to-End Tests (with SMQ)
runs-on: ubuntu-latest
timeout-minutes: 20
strategy:
fail-fast: false
matrix:
container-id: [e2e-1]
container:
image: golang:1.24-alpine
options: --cpus 2.0 --memory 2g --hostname kafka-e2e-${{ matrix.container-id }}
env:
GOMAXPROCS: 2
CGO_ENABLED: 0
KAFKA_E2E_ISOLATION: "true"
CONTAINER_ID: ${{ matrix.container-id }}
steps:
- name: Check out code
uses: actions/checkout@v6
- name: Set up Go 1.x
uses: actions/setup-go@v6
with:
go-version: ^1.25
cache: true
cache-dependency-path: |
**/go.sum
id: go
- name: Setup E2E Container Environment
run: |
apk add --no-cache git procps curl netcat-openbsd
ulimit -n 2048 || echo "Warning: Could not set file descriptor limit"
- name: Warm Go module cache
run: |
# Warm cache for root module
go mod download || true
# Warm cache for kafka test module
cd test/kafka
go mod download || true
- name: Get dependencies
run: |
cd test/kafka
# Use go mod download with timeout to prevent hanging
timeout 90s go mod download || echo "Warning: Dependency download timed out, continuing with cached modules"
go mod tidy
- name: Build and start SeaweedFS MQ
run: |
set -e
cd $GITHUB_WORKSPACE
# Build weed binary
go build -o /usr/local/bin/weed ./weed
# Start SeaweedFS components with MQ brokers
export WEED_DATA_DIR=/tmp/seaweedfs-e2e-$RANDOM
mkdir -p "$WEED_DATA_DIR"
# Start SeaweedFS server (master, volume, filer) with consistent IP advertising
nohup weed -v 1 server \
-ip="127.0.0.1" \
-ip.bind="0.0.0.0" \
-dir="$WEED_DATA_DIR" \
-master.raftHashicorp \
-master.port=9333 \
-volume.port=8081 \
-filer.port=8888 \
-filer=true \
-metricsPort=9325 \
-master.peers=none \
> /tmp/weed-server.log 2>&1 &
# Wait for master to be ready
for i in $(seq 1 30); do
if curl -s http://127.0.0.1:9333/cluster/status >/dev/null; then
echo "SeaweedFS master HTTP is up"; break
fi
echo "Waiting for SeaweedFS master HTTP... ($i/30)"; sleep 1
done
# Wait for master gRPC to be ready (this is what broker discovery uses)
echo "Waiting for master gRPC port..."
for i in $(seq 1 30); do
if nc -z 127.0.0.1 19333; then
echo "✓ SeaweedFS master gRPC is up (port 19333)"
break
fi
echo " Waiting for master gRPC... ($i/30)"; sleep 1
done
# Give server time to initialize all components including gRPC services
echo "Waiting for SeaweedFS components to initialize..."
sleep 15
# Additional wait specifically for gRPC services to be ready for streaming
echo "Allowing extra time for master gRPC streaming services to initialize..."
sleep 10
# Start MQ broker with maximum verbosity for debugging
echo "Starting MQ broker..."
nohup weed -v 3 mq.broker \
-master="127.0.0.1:9333" \
-ip="127.0.0.1" \
-port=17777 \
-logFlushInterval=0 \
> /tmp/weed-mq-broker.log 2>&1 &
# Wait for broker to be ready with better error reporting
sleep 15
broker_ready=false
for i in $(seq 1 20); do
if nc -z 127.0.0.1 17777; then
echo "SeaweedFS MQ broker is up"
broker_ready=true
break
fi
echo "Waiting for MQ broker... ($i/20)"; sleep 1
done
# Give broker additional time to register with master
if [ "$broker_ready" = true ]; then
echo "Allowing broker to register with master..."
sleep 30
# Check if broker is properly registered by querying cluster nodes
echo "Cluster status after broker registration:"
curl -s "http://127.0.0.1:9333/cluster/status" || echo "Could not check cluster status"
echo "Checking cluster topology (includes registered components):"
curl -s "http://127.0.0.1:9333/dir/status" | head -20 || echo "Could not check dir status"
echo "Verifying broker discovery via master client debug:"
echo "If broker registration is successful, it should appear in dir status"
echo "Testing gRPC connectivity with weed binary:"
echo "This simulates what the gateway does during broker discovery..."
timeout 10s weed shell -master=127.0.0.1:9333 -filer=127.0.0.1:8888 > /tmp/shell-test.log 2>&1 || echo "weed shell test completed or timed out - checking logs..."
echo "Shell test results:"
cat /tmp/shell-test.log 2>/dev/null | head -10 || echo "No shell test logs"
fi
# Check if broker failed to start and show logs
if [ "$broker_ready" = false ]; then
echo "ERROR: MQ broker failed to start. Broker logs:"
cat /tmp/weed-mq-broker.log || echo "No broker logs found"
echo "Server logs:"
tail -20 /tmp/weed-server.log || echo "No server logs found"
exit 1
fi
- name: Run End-to-End Tests
run: |
cd test/kafka
# Higher limits for E2E tests
ulimit -n 1024 || echo "Warning: Could not set file descriptor limit"
ulimit -u 200 || echo "Warning: Could not set process limit"
# Allow additional time for all background processes to settle
echo "Allowing additional settlement time for SeaweedFS ecosystem..."
sleep 15
# Run tests and capture result
if ! go test -v -timeout 180s ./e2e/...; then
echo "========================================="
echo "Tests failed! Showing debug information:"
echo "========================================="
echo "Server logs (last 50 lines):"
tail -50 /tmp/weed-server.log || echo "No server logs"
echo "========================================="
echo "Broker logs (last 50 lines):"
tail -50 /tmp/weed-mq-broker.log || echo "No broker logs"
echo "========================================="
exit 1
fi
env:
GOMAXPROCS: 2
SEAWEEDFS_MASTERS: 127.0.0.1:9333
kafka-consumer-group-tests:
name: Kafka Consumer Group Tests (Highly Isolated)
runs-on: ubuntu-latest
timeout-minutes: 20
strategy:
fail-fast: false
matrix:
container-id: [consumer-group-1]
container:
image: golang:1.24-alpine
options: --cpus 1.0 --memory 2g --ulimit nofile=512:512 --hostname kafka-consumer-${{ matrix.container-id }}
env:
GOMAXPROCS: 1
CGO_ENABLED: 0
KAFKA_CONSUMER_ISOLATION: "true"
CONTAINER_ID: ${{ matrix.container-id }}
steps:
- name: Check out code
uses: actions/checkout@v6
- name: Set up Go 1.x
uses: actions/setup-go@v6
with:
go-version: ^1.25
cache: true
cache-dependency-path: |
**/go.sum
id: go
- name: Setup Consumer Group Container Environment
run: |
apk add --no-cache git procps curl netcat-openbsd
ulimit -n 256 || echo "Warning: Could not set file descriptor limit"
- name: Warm Go module cache
run: |
# Warm cache for root module
go mod download || true
# Warm cache for kafka test module
cd test/kafka
go mod download || true
- name: Get dependencies
run: |
cd test/kafka
# Use go mod download with timeout to prevent hanging
timeout 90s go mod download || echo "Warning: Dependency download timed out, continuing with cached modules"
go mod tidy
- name: Build and start SeaweedFS MQ
run: |
set -e
cd $GITHUB_WORKSPACE
# Build weed binary
go build -o /usr/local/bin/weed ./weed
# Start SeaweedFS components with MQ brokers
export WEED_DATA_DIR=/tmp/seaweedfs-mq-$RANDOM
mkdir -p "$WEED_DATA_DIR"
# Start SeaweedFS server (master, volume, filer) with consistent IP advertising
nohup weed -v 1 server \
-ip="127.0.0.1" \
-ip.bind="0.0.0.0" \
-dir="$WEED_DATA_DIR" \
-master.raftHashicorp \
-master.port=9333 \
-volume.port=8081 \
-filer.port=8888 \
-filer=true \
-metricsPort=9325 \
-master.peers=none \
> /tmp/weed-server.log 2>&1 &
# Wait for master to be ready
for i in $(seq 1 30); do
if curl -s http://127.0.0.1:9333/cluster/status >/dev/null; then
echo "SeaweedFS master HTTP is up"; break
fi
echo "Waiting for SeaweedFS master HTTP... ($i/30)"; sleep 1
done
# Wait for master gRPC to be ready (this is what broker discovery uses)
echo "Waiting for master gRPC port..."
for i in $(seq 1 30); do
if nc -z 127.0.0.1 19333; then
echo "✓ SeaweedFS master gRPC is up (port 19333)"
break
fi
echo " Waiting for master gRPC... ($i/30)"; sleep 1
done
# Give server time to initialize all components including gRPC services
echo "Waiting for SeaweedFS components to initialize..."
sleep 15
# Additional wait specifically for gRPC services to be ready for streaming
echo "Allowing extra time for master gRPC streaming services to initialize..."
sleep 10
# Start MQ broker with maximum verbosity for debugging
echo "Starting MQ broker..."
nohup weed -v 3 mq.broker \
-master="127.0.0.1:9333" \
-ip="127.0.0.1" \
-port=17777 \
-logFlushInterval=0 \
> /tmp/weed-mq-broker.log 2>&1 &
# Wait for broker to be ready with better error reporting
sleep 15
broker_ready=false
for i in $(seq 1 20); do
if nc -z 127.0.0.1 17777; then
echo "SeaweedFS MQ broker is up"
broker_ready=true
break
fi
echo "Waiting for MQ broker... ($i/20)"; sleep 1
done
# Give broker additional time to register with master
if [ "$broker_ready" = true ]; then
echo "Allowing broker to register with master..."
sleep 30
# Check if broker is properly registered by querying cluster nodes
echo "Cluster status after broker registration:"
curl -s "http://127.0.0.1:9333/cluster/status" || echo "Could not check cluster status"
echo "Checking cluster topology (includes registered components):"
curl -s "http://127.0.0.1:9333/dir/status" | head -20 || echo "Could not check dir status"
echo "Verifying broker discovery via master client debug:"
echo "If broker registration is successful, it should appear in dir status"
echo "Testing gRPC connectivity with weed binary:"
echo "This simulates what the gateway does during broker discovery..."
timeout 10s weed shell -master=127.0.0.1:9333 -filer=127.0.0.1:8888 > /tmp/shell-test.log 2>&1 || echo "weed shell test completed or timed out - checking logs..."
echo "Shell test results:"
cat /tmp/shell-test.log 2>/dev/null | head -10 || echo "No shell test logs"
fi
# Check if broker failed to start and show logs
if [ "$broker_ready" = false ]; then
echo "ERROR: MQ broker failed to start. Broker logs:"
cat /tmp/weed-mq-broker.log || echo "No broker logs found"
echo "Server logs:"
tail -20 /tmp/weed-server.log || echo "No server logs found"
exit 1
fi
- name: Run Consumer Group Tests
run: |
cd test/kafka
# Test consumer group functionality with explicit timeout
ulimit -n 512 || echo "Warning: Could not set file descriptor limit"
ulimit -u 100 || echo "Warning: Could not set process limit"
timeout 240s go test -v -run "^TestConsumerGroups" -timeout 180s ./integration/...
env:
GOMAXPROCS: 1
SEAWEEDFS_MASTERS: 127.0.0.1:9333
kafka-client-compatibility:
name: Kafka Client Compatibility (with SMQ)
runs-on: ubuntu-latest
timeout-minutes: 25
strategy:
fail-fast: false
matrix:
container-id: [client-compat-1]
container:
image: golang:1.24-alpine
options: --cpus 1.0 --memory 1.5g --shm-size 256m --hostname kafka-client-${{ matrix.container-id }}
env:
GOMAXPROCS: 1
CGO_ENABLED: 0
KAFKA_CLIENT_ISOLATION: "true"
CONTAINER_ID: ${{ matrix.container-id }}
steps:
- name: Check out code
uses: actions/checkout@v6
- name: Set up Go 1.x
uses: actions/setup-go@v6
with:
go-version: ^1.25
cache: true
cache-dependency-path: |
**/go.sum
id: go
- name: Setup Client Container Environment
run: |
apk add --no-cache git procps curl netcat-openbsd
ulimit -n 1024 || echo "Warning: Could not set file descriptor limit"
- name: Warm Go module cache
run: |
# Warm cache for root module
go mod download || true
# Warm cache for kafka test module
cd test/kafka
go mod download || true
- name: Get dependencies
run: |
cd test/kafka
timeout 90s go mod download || echo "Warning: Dependency download timed out, continuing with cached modules"
go mod tidy
- name: Build and start SeaweedFS MQ
run: |
set -e
cd $GITHUB_WORKSPACE
# Build weed binary
go build -o /usr/local/bin/weed ./weed
# Start SeaweedFS components with MQ brokers
export WEED_DATA_DIR=/tmp/seaweedfs-client-$RANDOM
mkdir -p "$WEED_DATA_DIR"
# Start SeaweedFS server (master, volume, filer) with consistent IP advertising
nohup weed -v 1 server \
-ip="127.0.0.1" \
-ip.bind="0.0.0.0" \
-dir="$WEED_DATA_DIR" \
-master.raftHashicorp \
-master.port=9333 \
-volume.port=8081 \
-filer.port=8888 \
-filer=true \
-metricsPort=9325 \
-master.peers=none \
> /tmp/weed-server.log 2>&1 &
# Wait for master to be ready
for i in $(seq 1 30); do
if curl -s http://127.0.0.1:9333/cluster/status >/dev/null; then
echo "SeaweedFS master HTTP is up"; break
fi
echo "Waiting for SeaweedFS master HTTP... ($i/30)"; sleep 1
done
# Wait for master gRPC to be ready (this is what broker discovery uses)
echo "Waiting for master gRPC port..."
for i in $(seq 1 30); do
if nc -z 127.0.0.1 19333; then
echo "✓ SeaweedFS master gRPC is up (port 19333)"
break
fi
echo " Waiting for master gRPC... ($i/30)"; sleep 1
done
# Give server time to initialize all components including gRPC services
echo "Waiting for SeaweedFS components to initialize..."
sleep 15
# Additional wait specifically for gRPC services to be ready for streaming
echo "Allowing extra time for master gRPC streaming services to initialize..."
sleep 10
# Start MQ broker with maximum verbosity for debugging
echo "Starting MQ broker..."
nohup weed -v 3 mq.broker \
-master="127.0.0.1:9333" \
-ip="127.0.0.1" \
-port=17777 \
-logFlushInterval=0 \
> /tmp/weed-mq-broker.log 2>&1 &
# Wait for broker to be ready with better error reporting
sleep 15
broker_ready=false
for i in $(seq 1 20); do
if nc -z 127.0.0.1 17777; then
echo "SeaweedFS MQ broker is up"
broker_ready=true
break
fi
echo "Waiting for MQ broker... ($i/20)"; sleep 1
done
# Give broker additional time to register with master
if [ "$broker_ready" = true ]; then
echo "Allowing broker to register with master..."
sleep 30
# Check if broker is properly registered by querying cluster nodes
echo "Cluster status after broker registration:"
curl -s "http://127.0.0.1:9333/cluster/status" || echo "Could not check cluster status"
echo "Checking cluster topology (includes registered components):"
curl -s "http://127.0.0.1:9333/dir/status" | head -20 || echo "Could not check dir status"
echo "Verifying broker discovery via master client debug:"
echo "If broker registration is successful, it should appear in dir status"
echo "Testing gRPC connectivity with weed binary:"
echo "This simulates what the gateway does during broker discovery..."
timeout 10s weed shell -master=127.0.0.1:9333 -filer=127.0.0.1:8888 > /tmp/shell-test.log 2>&1 || echo "weed shell test completed or timed out - checking logs..."
echo "Shell test results:"
cat /tmp/shell-test.log 2>/dev/null | head -10 || echo "No shell test logs"
fi
# Check if broker failed to start and show logs
if [ "$broker_ready" = false ]; then
echo "ERROR: MQ broker failed to start. Broker logs:"
cat /tmp/weed-mq-broker.log || echo "No broker logs found"
echo "Server logs:"
tail -20 /tmp/weed-server.log || echo "No server logs found"
exit 1
fi
- name: Run Client Compatibility Tests
run: |
cd test/kafka
go test -v -run "^TestClientCompatibility" -timeout 180s ./integration/...
env:
GOMAXPROCS: 1
SEAWEEDFS_MASTERS: 127.0.0.1:9333
kafka-smq-integration-tests:
name: Kafka SMQ Integration Tests (Full Stack)
runs-on: ubuntu-latest
timeout-minutes: 20
strategy:
fail-fast: false
matrix:
container-id: [smq-integration-1]
container:
image: golang:1.24-alpine
options: --cpus 1.0 --memory 2g --hostname kafka-smq-${{ matrix.container-id }}
env:
GOMAXPROCS: 1
CGO_ENABLED: 0
KAFKA_SMQ_INTEGRATION: "true"
CONTAINER_ID: ${{ matrix.container-id }}
steps:
- name: Check out code
uses: actions/checkout@v6
- name: Set up Go 1.x
uses: actions/setup-go@v6
with:
go-version: ^1.25
cache: true
cache-dependency-path: |
**/go.sum
id: go
- name: Setup SMQ Integration Container Environment
run: |
apk add --no-cache git procps curl netcat-openbsd
ulimit -n 1024 || echo "Warning: Could not set file descriptor limit"
- name: Warm Go module cache
run: |
# Warm cache for root module
go mod download || true
# Warm cache for kafka test module
cd test/kafka
go mod download || true
- name: Get dependencies
run: |
cd test/kafka
timeout 90s go mod download || echo "Warning: Dependency download timed out, continuing with cached modules"
go mod tidy
- name: Build and start SeaweedFS MQ
run: |
set -e
cd $GITHUB_WORKSPACE
# Build weed binary
go build -o /usr/local/bin/weed ./weed
# Start SeaweedFS components with MQ brokers
export WEED_DATA_DIR=/tmp/seaweedfs-smq-$RANDOM
mkdir -p "$WEED_DATA_DIR"
# Start SeaweedFS server (master, volume, filer) with consistent IP advertising
nohup weed -v 1 server \
-ip="127.0.0.1" \
-ip.bind="0.0.0.0" \
-dir="$WEED_DATA_DIR" \
-master.raftHashicorp \
-master.port=9333 \
-volume.port=8081 \
-filer.port=8888 \
-filer=true \
-metricsPort=9325 \
-master.peers=none \
> /tmp/weed-server.log 2>&1 &
# Wait for master to be ready
for i in $(seq 1 30); do
if curl -s http://127.0.0.1:9333/cluster/status >/dev/null; then
echo "SeaweedFS master HTTP is up"; break
fi
echo "Waiting for SeaweedFS master HTTP... ($i/30)"; sleep 1
done
# Wait for master gRPC to be ready (this is what broker discovery uses)
echo "Waiting for master gRPC port..."
for i in $(seq 1 30); do
if nc -z 127.0.0.1 19333; then
echo "✓ SeaweedFS master gRPC is up (port 19333)"
break
fi
echo " Waiting for master gRPC... ($i/30)"; sleep 1
done
# Give server time to initialize all components including gRPC services
echo "Waiting for SeaweedFS components to initialize..."
sleep 15
# Additional wait specifically for gRPC services to be ready for streaming
echo "Allowing extra time for master gRPC streaming services to initialize..."
sleep 10
# Start MQ broker with maximum verbosity for debugging
echo "Starting MQ broker..."
nohup weed -v 3 mq.broker \
-master="127.0.0.1:9333" \
-ip="127.0.0.1" \
-port=17777 \
-logFlushInterval=0 \
> /tmp/weed-mq-broker.log 2>&1 &
# Wait for broker to be ready with better error reporting
sleep 15
broker_ready=false
for i in $(seq 1 20); do
if nc -z 127.0.0.1 17777; then
echo "SeaweedFS MQ broker is up"
broker_ready=true
break
fi
echo "Waiting for MQ broker... ($i/20)"; sleep 1
done
# Give broker additional time to register with master
if [ "$broker_ready" = true ]; then
echo "Allowing broker to register with master..."
sleep 30
# Check if broker is properly registered by querying cluster nodes
echo "Cluster status after broker registration:"
curl -s "http://127.0.0.1:9333/cluster/status" || echo "Could not check cluster status"
echo "Checking cluster topology (includes registered components):"
curl -s "http://127.0.0.1:9333/dir/status" | head -20 || echo "Could not check dir status"
echo "Verifying broker discovery via master client debug:"
echo "If broker registration is successful, it should appear in dir status"
echo "Testing gRPC connectivity with weed binary:"
echo "This simulates what the gateway does during broker discovery..."
timeout 10s weed shell -master=127.0.0.1:9333 -filer=127.0.0.1:8888 > /tmp/shell-test.log 2>&1 || echo "weed shell test completed or timed out - checking logs..."
echo "Shell test results:"
cat /tmp/shell-test.log 2>/dev/null | head -10 || echo "No shell test logs"
fi
# Check if broker failed to start and show logs
if [ "$broker_ready" = false ]; then
echo "ERROR: MQ broker failed to start. Broker logs:"
cat /tmp/weed-mq-broker.log || echo "No broker logs found"
echo "Server logs:"
tail -20 /tmp/weed-server.log || echo "No server logs found"
exit 1
fi
- name: Run SMQ Integration Tests
run: |
cd test/kafka
ulimit -n 512 || echo "Warning: Could not set file descriptor limit"
ulimit -u 100 || echo "Warning: Could not set process limit"
# Run the dedicated SMQ integration tests
go test -v -run "^TestSMQIntegration" -timeout 180s ./integration/...
env:
GOMAXPROCS: 1
SEAWEEDFS_MASTERS: 127.0.0.1:9333
kafka-protocol-tests:
name: Kafka Protocol Tests (Isolated)
runs-on: ubuntu-latest
timeout-minutes: 5
strategy:
fail-fast: false
matrix:
container-id: [protocol-1]
container:
image: golang:1.24-alpine
options: --cpus 1.0 --memory 1g --tmpfs /tmp:exec --hostname kafka-protocol-${{ matrix.container-id }}
env:
GOMAXPROCS: 1
CGO_ENABLED: 0
KAFKA_PROTOCOL_ISOLATION: "true"
CONTAINER_ID: ${{ matrix.container-id }}
steps:
- name: Set up Go 1.x
uses: actions/setup-go@v6
with:
go-version: ^1.25
id: go
- name: Check out code
uses: actions/checkout@v6
- name: Setup Protocol Container Environment
run: |
apk add --no-cache git procps
# Ensure proper permissions for test execution
chmod -R 755 /tmp || true
export TMPDIR=/tmp
export GOCACHE=/tmp/go-cache
mkdir -p $GOCACHE
chmod 755 $GOCACHE
- name: Get dependencies
run: |
cd test/kafka
go mod download
go mod tidy
- name: Run Protocol Tests
run: |
cd test/kafka
export TMPDIR=/tmp
export GOCACHE=/tmp/go-cache
# Run protocol tests from the weed/mq/kafka directory since they test the protocol implementation
cd ../../weed/mq/kafka
go test -v -run "^Test.*" -timeout 10s ./...
env:
GOMAXPROCS: 1
TMPDIR: /tmp
GOCACHE: /tmp/go-cache