mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-28 12:41:15 +00:00
* fix(kafka): make consumer-group rebalancing work end-to-end
TestConsumerGroups was failing every run since the job was added
(2026-04-17) but the failures were masked by a `|| echo ...` trailer on
the go test invocation, so the CI reported green. Removing the mask
exposes several real bugs in the gateway's group-coordinator code:
1. JoinGroup deduplicated members by ClientID, which collapsed two
Sarama consumers that share the default ClientID ("sarama") into a
single member slot and broke rebalancing. Key dedup off the TCP
ConnectionID instead; keep ClientID on the member for DescribeGroup
fidelity.
2. Every JoinGroup replaced the *GroupMember struct, wiping the
Assignment the leader had just published in its SyncGroup and leaving
non-leader consumers with 0 partitions after a rebalance. Update the
existing member in place on rejoin.
3. Non-leader SyncGroup returned an empty assignment while the leader
was mid-rebalance, so consumers silently came up with no partitions.
Return REBALANCE_IN_PROGRESS when the group is not Stable so Sarama
retries the join/sync cycle (4 retries x 2s backoff by default).
4. Heartbeat returned ILLEGAL_GENERATION on a gen mismatch even when
the group was in PreparingRebalance/CompletingRebalance. Return
REBALANCE_IN_PROGRESS in that case so the heartbeat loop cleanly
cancels the session instead of tearing it down on a fatal error.
5. LeaveGroup parser only handled v0-v2. Sarama at V2_8_0_0 sends v3
(Members array) by default, so the gateway silently rejected the
request as InvalidGroupID and dead consumers stayed in the group as
phantom leaders. Added v3 (Members array) and v4+ (flexible/compact/
tagged-fields) parsing.
The rebalancing integration tests called Consume() once per consumer,
which cannot survive a rebalance (heartbeat RBIP cancels the session
and Consume() returns - this is documented Sarama behaviour; callers
are expected to loop). Added a runConsumeLoop helper and used it in the
four affected sub-tests. RebalanceTestHandler.Setup now overwrites
stale entries in its assignments channel so the test observes the
settled post-rebalance snapshot rather than whatever arrived first.
* fix(kafka): address PR review feedback
- JoinGroup now snapshots existing members before mutating and restores
the snapshot on INCONSISTENT_GROUP_PROTOCOL rollback. Previously the
rollback path always deleted the entry, corrupting group state when
an existing member rejoined with an incompatible protocol.
- handleLeaveGroup iterates request.Members instead of processing only
the first entry, so v3+ batch departures (KIP-345 style) correctly
remove every listed member and build a per-member response. A single
group-state transition runs after the loop, with leader election
only triggered if the actual group leader was among the departures.
- Added buildLeaveGroupFlexibleResponse for v4+ clients. The parser
already decoded flexible versions, but the response still went out in
non-flexible encoding (4-byte array lengths, 2-byte strings, no
tagged fields), which v4+ clients could not parse. Route flexible
versions through the new builder; v1-v3 keep buildLeaveGroupFullResponse.
- BasicFunctionality gives each consumer its own
ConsumerGroupHandler/ready channel. The previous shared handler
closed ready once, so readyCount advanced to numConsumers from a
single signal; the test could proceed without the other consumers
actually reaching Setup.
- RebalanceTestHandler.assignments is now a size-1 channel, so readers
always observe the most recent rebalance snapshot instead of an
intermediate one from an earlier round.
826 lines
28 KiB
YAML
826 lines
28 KiB
YAML
name: "Kafka Gateway Tests"
|
|
|
|
on:
|
|
push:
|
|
branches: [ master ]
|
|
pull_request:
|
|
branches: [ master ]
|
|
|
|
concurrency:
|
|
group: ${{ github.head_ref }}/kafka-tests
|
|
cancel-in-progress: true
|
|
|
|
# Force different runners for better isolation
|
|
env:
|
|
FORCE_RUNNER_SEPARATION: true
|
|
|
|
permissions:
|
|
contents: read
|
|
|
|
jobs:
|
|
kafka-unit-tests:
|
|
name: Kafka Unit Tests
|
|
runs-on: ubuntu-latest
|
|
timeout-minutes: 5
|
|
strategy:
|
|
fail-fast: false
|
|
matrix:
|
|
container-id: [unit-tests-1]
|
|
container:
|
|
image: golang:1.24-alpine
|
|
options: --cpus 1.0 --memory 1g --hostname kafka-unit-${{ matrix.container-id }}
|
|
env:
|
|
GOMAXPROCS: 1
|
|
CGO_ENABLED: 0
|
|
CONTAINER_ID: ${{ matrix.container-id }}
|
|
steps:
|
|
- name: Set up Go 1.x
|
|
uses: actions/setup-go@v6
|
|
with:
|
|
go-version: ^1.25
|
|
id: go
|
|
|
|
- name: Check out code
|
|
uses: actions/checkout@v6
|
|
|
|
- name: Setup Container Environment
|
|
run: |
|
|
apk add --no-cache git
|
|
ulimit -n 1024 || echo "Warning: Could not set file descriptor limit"
|
|
|
|
- name: Get dependencies
|
|
run: |
|
|
cd test/kafka
|
|
go mod download
|
|
go mod tidy
|
|
|
|
- name: Run Kafka Gateway Unit Tests
|
|
run: |
|
|
cd test/kafka
|
|
# Set process limits for container isolation
|
|
ulimit -n 512 || echo "Warning: Could not set file descriptor limit"
|
|
ulimit -u 100 || echo "Warning: Could not set process limit"
|
|
go test -v -timeout 10s ./unit/...
|
|
|
|
kafka-integration-tests:
|
|
name: Kafka Integration Tests (Critical)
|
|
runs-on: ubuntu-latest
|
|
timeout-minutes: 5
|
|
strategy:
|
|
fail-fast: false
|
|
matrix:
|
|
container-id: [integration-1]
|
|
container:
|
|
image: golang:1.24-alpine
|
|
options: --cpus 2.0 --memory 2g --ulimit nofile=1024:1024 --hostname kafka-integration-${{ matrix.container-id }}
|
|
env:
|
|
GOMAXPROCS: 2
|
|
CGO_ENABLED: 0
|
|
KAFKA_TEST_ISOLATION: "true"
|
|
CONTAINER_ID: ${{ matrix.container-id }}
|
|
steps:
|
|
- name: Set up Go 1.x
|
|
uses: actions/setup-go@v6
|
|
with:
|
|
go-version: ^1.25
|
|
id: go
|
|
|
|
- name: Check out code
|
|
uses: actions/checkout@v6
|
|
|
|
- name: Setup Integration Container Environment
|
|
run: |
|
|
apk add --no-cache git procps
|
|
ulimit -n 2048 || echo "Warning: Could not set file descriptor limit"
|
|
|
|
- name: Get dependencies
|
|
run: |
|
|
cd test/kafka
|
|
go mod download
|
|
go mod tidy
|
|
|
|
- name: Run Integration Tests
|
|
run: |
|
|
cd test/kafka
|
|
# Higher limits for integration tests
|
|
ulimit -n 1024 || echo "Warning: Could not set file descriptor limit"
|
|
ulimit -u 200 || echo "Warning: Could not set process limit"
|
|
go test -v -timeout 90s ./integration/...
|
|
env:
|
|
GOMAXPROCS: 2
|
|
|
|
kafka-e2e-tests:
|
|
name: Kafka End-to-End Tests (with SMQ)
|
|
runs-on: ubuntu-latest
|
|
timeout-minutes: 20
|
|
strategy:
|
|
fail-fast: false
|
|
matrix:
|
|
container-id: [e2e-1]
|
|
container:
|
|
image: golang:1.24-alpine
|
|
options: --cpus 2.0 --memory 2g --hostname kafka-e2e-${{ matrix.container-id }}
|
|
env:
|
|
GOMAXPROCS: 2
|
|
CGO_ENABLED: 0
|
|
KAFKA_E2E_ISOLATION: "true"
|
|
CONTAINER_ID: ${{ matrix.container-id }}
|
|
steps:
|
|
- name: Check out code
|
|
uses: actions/checkout@v6
|
|
|
|
- name: Set up Go 1.x
|
|
uses: actions/setup-go@v6
|
|
with:
|
|
go-version: ^1.25
|
|
cache: true
|
|
cache-dependency-path: |
|
|
**/go.sum
|
|
id: go
|
|
|
|
- name: Setup E2E Container Environment
|
|
run: |
|
|
apk add --no-cache git procps curl netcat-openbsd
|
|
ulimit -n 2048 || echo "Warning: Could not set file descriptor limit"
|
|
|
|
- name: Warm Go module cache
|
|
run: |
|
|
# Warm cache for root module
|
|
go mod download || true
|
|
# Warm cache for kafka test module
|
|
cd test/kafka
|
|
go mod download || true
|
|
|
|
- name: Get dependencies
|
|
run: |
|
|
cd test/kafka
|
|
# Use go mod download with timeout to prevent hanging
|
|
timeout 90s go mod download || echo "Warning: Dependency download timed out, continuing with cached modules"
|
|
go mod tidy
|
|
|
|
- name: Build and start SeaweedFS MQ
|
|
run: |
|
|
set -e
|
|
cd $GITHUB_WORKSPACE
|
|
# Build weed binary
|
|
go build -o /usr/local/bin/weed ./weed
|
|
# Start SeaweedFS components with MQ brokers
|
|
export WEED_DATA_DIR=/tmp/seaweedfs-e2e-$RANDOM
|
|
mkdir -p "$WEED_DATA_DIR"
|
|
|
|
# Start SeaweedFS server (master, volume, filer) with consistent IP advertising
|
|
nohup weed -v 1 server \
|
|
-ip="127.0.0.1" \
|
|
-ip.bind="0.0.0.0" \
|
|
-dir="$WEED_DATA_DIR" \
|
|
-master.raftHashicorp \
|
|
-master.port=9333 \
|
|
-volume.port=8081 \
|
|
-filer.port=8888 \
|
|
-filer=true \
|
|
-metricsPort=9325 \
|
|
-master.peers=none \
|
|
> /tmp/weed-server.log 2>&1 &
|
|
|
|
# Wait for master to be ready
|
|
for i in $(seq 1 30); do
|
|
if curl -s http://127.0.0.1:9333/cluster/status >/dev/null; then
|
|
echo "SeaweedFS master HTTP is up"; break
|
|
fi
|
|
echo "Waiting for SeaweedFS master HTTP... ($i/30)"; sleep 1
|
|
done
|
|
|
|
# Wait for master gRPC to be ready (this is what broker discovery uses)
|
|
echo "Waiting for master gRPC port..."
|
|
for i in $(seq 1 30); do
|
|
if nc -z 127.0.0.1 19333; then
|
|
echo "✓ SeaweedFS master gRPC is up (port 19333)"
|
|
break
|
|
fi
|
|
echo " Waiting for master gRPC... ($i/30)"; sleep 1
|
|
done
|
|
|
|
# Give server time to initialize all components including gRPC services
|
|
echo "Waiting for SeaweedFS components to initialize..."
|
|
sleep 15
|
|
|
|
# Additional wait specifically for gRPC services to be ready for streaming
|
|
echo "Allowing extra time for master gRPC streaming services to initialize..."
|
|
sleep 10
|
|
|
|
# Start MQ broker with maximum verbosity for debugging
|
|
echo "Starting MQ broker..."
|
|
nohup weed -v 3 mq.broker \
|
|
-master="127.0.0.1:9333" \
|
|
-ip="127.0.0.1" \
|
|
-port=17777 \
|
|
-logFlushInterval=0 \
|
|
> /tmp/weed-mq-broker.log 2>&1 &
|
|
|
|
# Wait for broker to be ready with better error reporting
|
|
sleep 15
|
|
broker_ready=false
|
|
for i in $(seq 1 20); do
|
|
if nc -z 127.0.0.1 17777; then
|
|
echo "SeaweedFS MQ broker is up"
|
|
broker_ready=true
|
|
break
|
|
fi
|
|
echo "Waiting for MQ broker... ($i/20)"; sleep 1
|
|
done
|
|
|
|
# Give broker additional time to register with master
|
|
if [ "$broker_ready" = true ]; then
|
|
echo "Allowing broker to register with master..."
|
|
sleep 30
|
|
|
|
# Check if broker is properly registered by querying cluster nodes
|
|
echo "Cluster status after broker registration:"
|
|
curl -s "http://127.0.0.1:9333/cluster/status" || echo "Could not check cluster status"
|
|
|
|
echo "Checking cluster topology (includes registered components):"
|
|
curl -s "http://127.0.0.1:9333/dir/status" | head -20 || echo "Could not check dir status"
|
|
|
|
echo "Verifying broker discovery via master client debug:"
|
|
echo "If broker registration is successful, it should appear in dir status"
|
|
|
|
echo "Testing gRPC connectivity with weed binary:"
|
|
echo "This simulates what the gateway does during broker discovery..."
|
|
timeout 10s weed shell -master=127.0.0.1:9333 -filer=127.0.0.1:8888 > /tmp/shell-test.log 2>&1 || echo "weed shell test completed or timed out - checking logs..."
|
|
echo "Shell test results:"
|
|
cat /tmp/shell-test.log 2>/dev/null | head -10 || echo "No shell test logs"
|
|
fi
|
|
|
|
# Check if broker failed to start and show logs
|
|
if [ "$broker_ready" = false ]; then
|
|
echo "ERROR: MQ broker failed to start. Broker logs:"
|
|
cat /tmp/weed-mq-broker.log || echo "No broker logs found"
|
|
echo "Server logs:"
|
|
tail -20 /tmp/weed-server.log || echo "No server logs found"
|
|
exit 1
|
|
fi
|
|
|
|
- name: Run End-to-End Tests
|
|
run: |
|
|
cd test/kafka
|
|
# Higher limits for E2E tests
|
|
ulimit -n 1024 || echo "Warning: Could not set file descriptor limit"
|
|
ulimit -u 200 || echo "Warning: Could not set process limit"
|
|
|
|
# Allow additional time for all background processes to settle
|
|
echo "Allowing additional settlement time for SeaweedFS ecosystem..."
|
|
sleep 15
|
|
|
|
# Run tests and capture result
|
|
if ! go test -v -timeout 180s ./e2e/...; then
|
|
echo "========================================="
|
|
echo "Tests failed! Showing debug information:"
|
|
echo "========================================="
|
|
echo "Server logs (last 50 lines):"
|
|
tail -50 /tmp/weed-server.log || echo "No server logs"
|
|
echo "========================================="
|
|
echo "Broker logs (last 50 lines):"
|
|
tail -50 /tmp/weed-mq-broker.log || echo "No broker logs"
|
|
echo "========================================="
|
|
exit 1
|
|
fi
|
|
env:
|
|
GOMAXPROCS: 2
|
|
SEAWEEDFS_MASTERS: 127.0.0.1:9333
|
|
|
|
kafka-consumer-group-tests:
|
|
name: Kafka Consumer Group Tests (Highly Isolated)
|
|
runs-on: ubuntu-latest
|
|
timeout-minutes: 20
|
|
strategy:
|
|
fail-fast: false
|
|
matrix:
|
|
container-id: [consumer-group-1]
|
|
container:
|
|
image: golang:1.24-alpine
|
|
options: --cpus 1.0 --memory 2g --ulimit nofile=512:512 --hostname kafka-consumer-${{ matrix.container-id }}
|
|
env:
|
|
GOMAXPROCS: 1
|
|
CGO_ENABLED: 0
|
|
KAFKA_CONSUMER_ISOLATION: "true"
|
|
CONTAINER_ID: ${{ matrix.container-id }}
|
|
steps:
|
|
- name: Check out code
|
|
uses: actions/checkout@v6
|
|
|
|
- name: Set up Go 1.x
|
|
uses: actions/setup-go@v6
|
|
with:
|
|
go-version: ^1.25
|
|
cache: true
|
|
cache-dependency-path: |
|
|
**/go.sum
|
|
id: go
|
|
|
|
- name: Setup Consumer Group Container Environment
|
|
run: |
|
|
apk add --no-cache git procps curl netcat-openbsd
|
|
ulimit -n 256 || echo "Warning: Could not set file descriptor limit"
|
|
|
|
- name: Warm Go module cache
|
|
run: |
|
|
# Warm cache for root module
|
|
go mod download || true
|
|
# Warm cache for kafka test module
|
|
cd test/kafka
|
|
go mod download || true
|
|
|
|
- name: Get dependencies
|
|
run: |
|
|
cd test/kafka
|
|
# Use go mod download with timeout to prevent hanging
|
|
timeout 90s go mod download || echo "Warning: Dependency download timed out, continuing with cached modules"
|
|
go mod tidy
|
|
|
|
- name: Build and start SeaweedFS MQ
|
|
run: |
|
|
set -e
|
|
cd $GITHUB_WORKSPACE
|
|
# Build weed binary
|
|
go build -o /usr/local/bin/weed ./weed
|
|
# Start SeaweedFS components with MQ brokers
|
|
export WEED_DATA_DIR=/tmp/seaweedfs-mq-$RANDOM
|
|
mkdir -p "$WEED_DATA_DIR"
|
|
|
|
# Start SeaweedFS server (master, volume, filer) with consistent IP advertising
|
|
nohup weed -v 1 server \
|
|
-ip="127.0.0.1" \
|
|
-ip.bind="0.0.0.0" \
|
|
-dir="$WEED_DATA_DIR" \
|
|
-master.raftHashicorp \
|
|
-master.port=9333 \
|
|
-volume.port=8081 \
|
|
-filer.port=8888 \
|
|
-filer=true \
|
|
-metricsPort=9325 \
|
|
-master.peers=none \
|
|
> /tmp/weed-server.log 2>&1 &
|
|
|
|
# Wait for master to be ready
|
|
for i in $(seq 1 30); do
|
|
if curl -s http://127.0.0.1:9333/cluster/status >/dev/null; then
|
|
echo "SeaweedFS master HTTP is up"; break
|
|
fi
|
|
echo "Waiting for SeaweedFS master HTTP... ($i/30)"; sleep 1
|
|
done
|
|
|
|
# Wait for master gRPC to be ready (this is what broker discovery uses)
|
|
echo "Waiting for master gRPC port..."
|
|
for i in $(seq 1 30); do
|
|
if nc -z 127.0.0.1 19333; then
|
|
echo "✓ SeaweedFS master gRPC is up (port 19333)"
|
|
break
|
|
fi
|
|
echo " Waiting for master gRPC... ($i/30)"; sleep 1
|
|
done
|
|
|
|
# Give server time to initialize all components including gRPC services
|
|
echo "Waiting for SeaweedFS components to initialize..."
|
|
sleep 15
|
|
|
|
# Additional wait specifically for gRPC services to be ready for streaming
|
|
echo "Allowing extra time for master gRPC streaming services to initialize..."
|
|
sleep 10
|
|
|
|
# Start MQ broker with maximum verbosity for debugging
|
|
echo "Starting MQ broker..."
|
|
nohup weed -v 3 mq.broker \
|
|
-master="127.0.0.1:9333" \
|
|
-ip="127.0.0.1" \
|
|
-port=17777 \
|
|
-logFlushInterval=0 \
|
|
> /tmp/weed-mq-broker.log 2>&1 &
|
|
|
|
# Wait for broker to be ready with better error reporting
|
|
sleep 15
|
|
broker_ready=false
|
|
for i in $(seq 1 20); do
|
|
if nc -z 127.0.0.1 17777; then
|
|
echo "SeaweedFS MQ broker is up"
|
|
broker_ready=true
|
|
break
|
|
fi
|
|
echo "Waiting for MQ broker... ($i/20)"; sleep 1
|
|
done
|
|
|
|
# Give broker additional time to register with master
|
|
if [ "$broker_ready" = true ]; then
|
|
echo "Allowing broker to register with master..."
|
|
sleep 30
|
|
|
|
# Check if broker is properly registered by querying cluster nodes
|
|
echo "Cluster status after broker registration:"
|
|
curl -s "http://127.0.0.1:9333/cluster/status" || echo "Could not check cluster status"
|
|
|
|
echo "Checking cluster topology (includes registered components):"
|
|
curl -s "http://127.0.0.1:9333/dir/status" | head -20 || echo "Could not check dir status"
|
|
|
|
echo "Verifying broker discovery via master client debug:"
|
|
echo "If broker registration is successful, it should appear in dir status"
|
|
|
|
echo "Testing gRPC connectivity with weed binary:"
|
|
echo "This simulates what the gateway does during broker discovery..."
|
|
timeout 10s weed shell -master=127.0.0.1:9333 -filer=127.0.0.1:8888 > /tmp/shell-test.log 2>&1 || echo "weed shell test completed or timed out - checking logs..."
|
|
echo "Shell test results:"
|
|
cat /tmp/shell-test.log 2>/dev/null | head -10 || echo "No shell test logs"
|
|
fi
|
|
|
|
# Check if broker failed to start and show logs
|
|
if [ "$broker_ready" = false ]; then
|
|
echo "ERROR: MQ broker failed to start. Broker logs:"
|
|
cat /tmp/weed-mq-broker.log || echo "No broker logs found"
|
|
echo "Server logs:"
|
|
tail -20 /tmp/weed-server.log || echo "No server logs found"
|
|
exit 1
|
|
fi
|
|
|
|
- name: Run Consumer Group Tests
|
|
run: |
|
|
cd test/kafka
|
|
# Test consumer group functionality with explicit timeout
|
|
ulimit -n 512 || echo "Warning: Could not set file descriptor limit"
|
|
ulimit -u 100 || echo "Warning: Could not set process limit"
|
|
timeout 240s go test -v -run "^TestConsumerGroups" -timeout 180s ./integration/...
|
|
env:
|
|
GOMAXPROCS: 1
|
|
SEAWEEDFS_MASTERS: 127.0.0.1:9333
|
|
|
|
kafka-client-compatibility:
|
|
name: Kafka Client Compatibility (with SMQ)
|
|
runs-on: ubuntu-latest
|
|
timeout-minutes: 25
|
|
strategy:
|
|
fail-fast: false
|
|
matrix:
|
|
container-id: [client-compat-1]
|
|
container:
|
|
image: golang:1.24-alpine
|
|
options: --cpus 1.0 --memory 1.5g --shm-size 256m --hostname kafka-client-${{ matrix.container-id }}
|
|
env:
|
|
GOMAXPROCS: 1
|
|
CGO_ENABLED: 0
|
|
KAFKA_CLIENT_ISOLATION: "true"
|
|
CONTAINER_ID: ${{ matrix.container-id }}
|
|
steps:
|
|
- name: Check out code
|
|
uses: actions/checkout@v6
|
|
|
|
- name: Set up Go 1.x
|
|
uses: actions/setup-go@v6
|
|
with:
|
|
go-version: ^1.25
|
|
cache: true
|
|
cache-dependency-path: |
|
|
**/go.sum
|
|
id: go
|
|
|
|
- name: Setup Client Container Environment
|
|
run: |
|
|
apk add --no-cache git procps curl netcat-openbsd
|
|
ulimit -n 1024 || echo "Warning: Could not set file descriptor limit"
|
|
|
|
- name: Warm Go module cache
|
|
run: |
|
|
# Warm cache for root module
|
|
go mod download || true
|
|
# Warm cache for kafka test module
|
|
cd test/kafka
|
|
go mod download || true
|
|
|
|
- name: Get dependencies
|
|
run: |
|
|
cd test/kafka
|
|
timeout 90s go mod download || echo "Warning: Dependency download timed out, continuing with cached modules"
|
|
go mod tidy
|
|
|
|
- name: Build and start SeaweedFS MQ
|
|
run: |
|
|
set -e
|
|
cd $GITHUB_WORKSPACE
|
|
# Build weed binary
|
|
go build -o /usr/local/bin/weed ./weed
|
|
# Start SeaweedFS components with MQ brokers
|
|
export WEED_DATA_DIR=/tmp/seaweedfs-client-$RANDOM
|
|
mkdir -p "$WEED_DATA_DIR"
|
|
|
|
# Start SeaweedFS server (master, volume, filer) with consistent IP advertising
|
|
nohup weed -v 1 server \
|
|
-ip="127.0.0.1" \
|
|
-ip.bind="0.0.0.0" \
|
|
-dir="$WEED_DATA_DIR" \
|
|
-master.raftHashicorp \
|
|
-master.port=9333 \
|
|
-volume.port=8081 \
|
|
-filer.port=8888 \
|
|
-filer=true \
|
|
-metricsPort=9325 \
|
|
-master.peers=none \
|
|
> /tmp/weed-server.log 2>&1 &
|
|
|
|
# Wait for master to be ready
|
|
for i in $(seq 1 30); do
|
|
if curl -s http://127.0.0.1:9333/cluster/status >/dev/null; then
|
|
echo "SeaweedFS master HTTP is up"; break
|
|
fi
|
|
echo "Waiting for SeaweedFS master HTTP... ($i/30)"; sleep 1
|
|
done
|
|
|
|
# Wait for master gRPC to be ready (this is what broker discovery uses)
|
|
echo "Waiting for master gRPC port..."
|
|
for i in $(seq 1 30); do
|
|
if nc -z 127.0.0.1 19333; then
|
|
echo "✓ SeaweedFS master gRPC is up (port 19333)"
|
|
break
|
|
fi
|
|
echo " Waiting for master gRPC... ($i/30)"; sleep 1
|
|
done
|
|
|
|
# Give server time to initialize all components including gRPC services
|
|
echo "Waiting for SeaweedFS components to initialize..."
|
|
sleep 15
|
|
|
|
# Additional wait specifically for gRPC services to be ready for streaming
|
|
echo "Allowing extra time for master gRPC streaming services to initialize..."
|
|
sleep 10
|
|
|
|
# Start MQ broker with maximum verbosity for debugging
|
|
echo "Starting MQ broker..."
|
|
nohup weed -v 3 mq.broker \
|
|
-master="127.0.0.1:9333" \
|
|
-ip="127.0.0.1" \
|
|
-port=17777 \
|
|
-logFlushInterval=0 \
|
|
> /tmp/weed-mq-broker.log 2>&1 &
|
|
|
|
# Wait for broker to be ready with better error reporting
|
|
sleep 15
|
|
broker_ready=false
|
|
for i in $(seq 1 20); do
|
|
if nc -z 127.0.0.1 17777; then
|
|
echo "SeaweedFS MQ broker is up"
|
|
broker_ready=true
|
|
break
|
|
fi
|
|
echo "Waiting for MQ broker... ($i/20)"; sleep 1
|
|
done
|
|
|
|
# Give broker additional time to register with master
|
|
if [ "$broker_ready" = true ]; then
|
|
echo "Allowing broker to register with master..."
|
|
sleep 30
|
|
|
|
# Check if broker is properly registered by querying cluster nodes
|
|
echo "Cluster status after broker registration:"
|
|
curl -s "http://127.0.0.1:9333/cluster/status" || echo "Could not check cluster status"
|
|
|
|
echo "Checking cluster topology (includes registered components):"
|
|
curl -s "http://127.0.0.1:9333/dir/status" | head -20 || echo "Could not check dir status"
|
|
|
|
echo "Verifying broker discovery via master client debug:"
|
|
echo "If broker registration is successful, it should appear in dir status"
|
|
|
|
echo "Testing gRPC connectivity with weed binary:"
|
|
echo "This simulates what the gateway does during broker discovery..."
|
|
timeout 10s weed shell -master=127.0.0.1:9333 -filer=127.0.0.1:8888 > /tmp/shell-test.log 2>&1 || echo "weed shell test completed or timed out - checking logs..."
|
|
echo "Shell test results:"
|
|
cat /tmp/shell-test.log 2>/dev/null | head -10 || echo "No shell test logs"
|
|
fi
|
|
|
|
# Check if broker failed to start and show logs
|
|
if [ "$broker_ready" = false ]; then
|
|
echo "ERROR: MQ broker failed to start. Broker logs:"
|
|
cat /tmp/weed-mq-broker.log || echo "No broker logs found"
|
|
echo "Server logs:"
|
|
tail -20 /tmp/weed-server.log || echo "No server logs found"
|
|
exit 1
|
|
fi
|
|
|
|
- name: Run Client Compatibility Tests
|
|
run: |
|
|
cd test/kafka
|
|
go test -v -run "^TestClientCompatibility" -timeout 180s ./integration/...
|
|
env:
|
|
GOMAXPROCS: 1
|
|
SEAWEEDFS_MASTERS: 127.0.0.1:9333
|
|
|
|
kafka-smq-integration-tests:
|
|
name: Kafka SMQ Integration Tests (Full Stack)
|
|
runs-on: ubuntu-latest
|
|
timeout-minutes: 20
|
|
strategy:
|
|
fail-fast: false
|
|
matrix:
|
|
container-id: [smq-integration-1]
|
|
container:
|
|
image: golang:1.24-alpine
|
|
options: --cpus 1.0 --memory 2g --hostname kafka-smq-${{ matrix.container-id }}
|
|
env:
|
|
GOMAXPROCS: 1
|
|
CGO_ENABLED: 0
|
|
KAFKA_SMQ_INTEGRATION: "true"
|
|
CONTAINER_ID: ${{ matrix.container-id }}
|
|
steps:
|
|
- name: Check out code
|
|
uses: actions/checkout@v6
|
|
|
|
- name: Set up Go 1.x
|
|
uses: actions/setup-go@v6
|
|
with:
|
|
go-version: ^1.25
|
|
cache: true
|
|
cache-dependency-path: |
|
|
**/go.sum
|
|
id: go
|
|
|
|
- name: Setup SMQ Integration Container Environment
|
|
run: |
|
|
apk add --no-cache git procps curl netcat-openbsd
|
|
ulimit -n 1024 || echo "Warning: Could not set file descriptor limit"
|
|
|
|
- name: Warm Go module cache
|
|
run: |
|
|
# Warm cache for root module
|
|
go mod download || true
|
|
# Warm cache for kafka test module
|
|
cd test/kafka
|
|
go mod download || true
|
|
|
|
- name: Get dependencies
|
|
run: |
|
|
cd test/kafka
|
|
timeout 90s go mod download || echo "Warning: Dependency download timed out, continuing with cached modules"
|
|
go mod tidy
|
|
|
|
- name: Build and start SeaweedFS MQ
|
|
run: |
|
|
set -e
|
|
cd $GITHUB_WORKSPACE
|
|
# Build weed binary
|
|
go build -o /usr/local/bin/weed ./weed
|
|
# Start SeaweedFS components with MQ brokers
|
|
export WEED_DATA_DIR=/tmp/seaweedfs-smq-$RANDOM
|
|
mkdir -p "$WEED_DATA_DIR"
|
|
|
|
# Start SeaweedFS server (master, volume, filer) with consistent IP advertising
|
|
nohup weed -v 1 server \
|
|
-ip="127.0.0.1" \
|
|
-ip.bind="0.0.0.0" \
|
|
-dir="$WEED_DATA_DIR" \
|
|
-master.raftHashicorp \
|
|
-master.port=9333 \
|
|
-volume.port=8081 \
|
|
-filer.port=8888 \
|
|
-filer=true \
|
|
-metricsPort=9325 \
|
|
-master.peers=none \
|
|
> /tmp/weed-server.log 2>&1 &
|
|
|
|
# Wait for master to be ready
|
|
for i in $(seq 1 30); do
|
|
if curl -s http://127.0.0.1:9333/cluster/status >/dev/null; then
|
|
echo "SeaweedFS master HTTP is up"; break
|
|
fi
|
|
echo "Waiting for SeaweedFS master HTTP... ($i/30)"; sleep 1
|
|
done
|
|
|
|
# Wait for master gRPC to be ready (this is what broker discovery uses)
|
|
echo "Waiting for master gRPC port..."
|
|
for i in $(seq 1 30); do
|
|
if nc -z 127.0.0.1 19333; then
|
|
echo "✓ SeaweedFS master gRPC is up (port 19333)"
|
|
break
|
|
fi
|
|
echo " Waiting for master gRPC... ($i/30)"; sleep 1
|
|
done
|
|
|
|
# Give server time to initialize all components including gRPC services
|
|
echo "Waiting for SeaweedFS components to initialize..."
|
|
sleep 15
|
|
|
|
# Additional wait specifically for gRPC services to be ready for streaming
|
|
echo "Allowing extra time for master gRPC streaming services to initialize..."
|
|
sleep 10
|
|
|
|
# Start MQ broker with maximum verbosity for debugging
|
|
echo "Starting MQ broker..."
|
|
nohup weed -v 3 mq.broker \
|
|
-master="127.0.0.1:9333" \
|
|
-ip="127.0.0.1" \
|
|
-port=17777 \
|
|
-logFlushInterval=0 \
|
|
> /tmp/weed-mq-broker.log 2>&1 &
|
|
|
|
# Wait for broker to be ready with better error reporting
|
|
sleep 15
|
|
broker_ready=false
|
|
for i in $(seq 1 20); do
|
|
if nc -z 127.0.0.1 17777; then
|
|
echo "SeaweedFS MQ broker is up"
|
|
broker_ready=true
|
|
break
|
|
fi
|
|
echo "Waiting for MQ broker... ($i/20)"; sleep 1
|
|
done
|
|
|
|
# Give broker additional time to register with master
|
|
if [ "$broker_ready" = true ]; then
|
|
echo "Allowing broker to register with master..."
|
|
sleep 30
|
|
|
|
# Check if broker is properly registered by querying cluster nodes
|
|
echo "Cluster status after broker registration:"
|
|
curl -s "http://127.0.0.1:9333/cluster/status" || echo "Could not check cluster status"
|
|
|
|
echo "Checking cluster topology (includes registered components):"
|
|
curl -s "http://127.0.0.1:9333/dir/status" | head -20 || echo "Could not check dir status"
|
|
|
|
echo "Verifying broker discovery via master client debug:"
|
|
echo "If broker registration is successful, it should appear in dir status"
|
|
|
|
echo "Testing gRPC connectivity with weed binary:"
|
|
echo "This simulates what the gateway does during broker discovery..."
|
|
timeout 10s weed shell -master=127.0.0.1:9333 -filer=127.0.0.1:8888 > /tmp/shell-test.log 2>&1 || echo "weed shell test completed or timed out - checking logs..."
|
|
echo "Shell test results:"
|
|
cat /tmp/shell-test.log 2>/dev/null | head -10 || echo "No shell test logs"
|
|
fi
|
|
|
|
# Check if broker failed to start and show logs
|
|
if [ "$broker_ready" = false ]; then
|
|
echo "ERROR: MQ broker failed to start. Broker logs:"
|
|
cat /tmp/weed-mq-broker.log || echo "No broker logs found"
|
|
echo "Server logs:"
|
|
tail -20 /tmp/weed-server.log || echo "No server logs found"
|
|
exit 1
|
|
fi
|
|
|
|
- name: Run SMQ Integration Tests
|
|
run: |
|
|
cd test/kafka
|
|
ulimit -n 512 || echo "Warning: Could not set file descriptor limit"
|
|
ulimit -u 100 || echo "Warning: Could not set process limit"
|
|
# Run the dedicated SMQ integration tests
|
|
go test -v -run "^TestSMQIntegration" -timeout 180s ./integration/...
|
|
env:
|
|
GOMAXPROCS: 1
|
|
SEAWEEDFS_MASTERS: 127.0.0.1:9333
|
|
|
|
kafka-protocol-tests:
|
|
name: Kafka Protocol Tests (Isolated)
|
|
runs-on: ubuntu-latest
|
|
timeout-minutes: 5
|
|
strategy:
|
|
fail-fast: false
|
|
matrix:
|
|
container-id: [protocol-1]
|
|
container:
|
|
image: golang:1.24-alpine
|
|
options: --cpus 1.0 --memory 1g --tmpfs /tmp:exec --hostname kafka-protocol-${{ matrix.container-id }}
|
|
env:
|
|
GOMAXPROCS: 1
|
|
CGO_ENABLED: 0
|
|
KAFKA_PROTOCOL_ISOLATION: "true"
|
|
CONTAINER_ID: ${{ matrix.container-id }}
|
|
steps:
|
|
- name: Set up Go 1.x
|
|
uses: actions/setup-go@v6
|
|
with:
|
|
go-version: ^1.25
|
|
id: go
|
|
|
|
- name: Check out code
|
|
uses: actions/checkout@v6
|
|
|
|
- name: Setup Protocol Container Environment
|
|
run: |
|
|
apk add --no-cache git procps
|
|
# Ensure proper permissions for test execution
|
|
chmod -R 755 /tmp || true
|
|
export TMPDIR=/tmp
|
|
export GOCACHE=/tmp/go-cache
|
|
mkdir -p $GOCACHE
|
|
chmod 755 $GOCACHE
|
|
|
|
- name: Get dependencies
|
|
run: |
|
|
cd test/kafka
|
|
go mod download
|
|
go mod tidy
|
|
|
|
- name: Run Protocol Tests
|
|
run: |
|
|
cd test/kafka
|
|
export TMPDIR=/tmp
|
|
export GOCACHE=/tmp/go-cache
|
|
# Run protocol tests from the weed/mq/kafka directory since they test the protocol implementation
|
|
cd ../../weed/mq/kafka
|
|
go test -v -run "^Test.*" -timeout 10s ./...
|
|
env:
|
|
GOMAXPROCS: 1
|
|
TMPDIR: /tmp
|
|
GOCACHE: /tmp/go-cache
|