mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-21 09:11:29 +00:00
make test-basic
This commit is contained in:
@@ -119,37 +119,39 @@ logs:
|
||||
docker-compose -f docker-compose.test.yml logs -f
|
||||
|
||||
# Run all integration tests
|
||||
test: up
|
||||
test:
|
||||
@echo "Running all integration tests..."
|
||||
docker-compose -f docker-compose.test.yml run --rm test-runner \
|
||||
sh -c "go test -v -timeout=30m ./test/mq/integration/... -args -test.parallel=4"
|
||||
|
||||
# Run basic pub/sub tests
|
||||
test-basic: up
|
||||
@echo "Running basic pub/sub tests..."
|
||||
docker-compose -f docker-compose.test.yml run --rm test-runner \
|
||||
sh -c "go test -v -timeout=10m ./test/mq/integration/ -run TestBasic"
|
||||
test-basic:
|
||||
@echo "Running basic pub/sub tests natively (no container restart)..."
|
||||
cd ../.. && SEAWEED_MASTERS="localhost:19333,localhost:19334,localhost:19335" \
|
||||
SEAWEED_BROKERS="localhost:17777,localhost:17778,localhost:17779" \
|
||||
SEAWEED_FILERS="localhost:18888,localhost:18889" \
|
||||
go test -v -timeout=10m ./test/mq/integration/ -run TestBasic
|
||||
|
||||
# Run performance tests
|
||||
test-performance: up
|
||||
test-performance:
|
||||
@echo "Running performance tests..."
|
||||
docker-compose -f docker-compose.test.yml run --rm test-runner \
|
||||
sh -c "go test -v -timeout=20m ./test/mq/integration/ -run TestPerformance"
|
||||
|
||||
# Run failover tests
|
||||
test-failover: up
|
||||
test-failover:
|
||||
@echo "Running failover tests..."
|
||||
docker-compose -f docker-compose.test.yml run --rm test-runner \
|
||||
sh -c "go test -v -timeout=15m ./test/mq/integration/ -run TestFailover"
|
||||
|
||||
# Run agent tests
|
||||
test-agent: up
|
||||
test-agent:
|
||||
@echo "Running agent tests..."
|
||||
docker-compose -f docker-compose.test.yml run --rm test-runner \
|
||||
sh -c "go test -v -timeout=10m ./test/mq/integration/ -run TestAgent"
|
||||
|
||||
# Development targets (run tests natively without Docker container)
|
||||
test-dev: up-cluster
|
||||
test-dev:
|
||||
@echo "Running tests in development mode (using local binaries)..."
|
||||
SEAWEED_MASTERS="localhost:19333,localhost:19334,localhost:19335" \
|
||||
SEAWEED_BROKERS="localhost:17777,localhost:17778,localhost:17779" \
|
||||
@@ -157,7 +159,7 @@ test-dev: up-cluster
|
||||
go test -v -timeout=10m ./integration/...
|
||||
|
||||
# Native test running (no Docker container for tests)
|
||||
test-native: up
|
||||
test-native:
|
||||
@echo "Running tests natively (without Docker container for tests)..."
|
||||
cd ../.. && SEAWEED_MASTERS="localhost:19333,localhost:19334,localhost:19335" \
|
||||
SEAWEED_BROKERS="localhost:17777,localhost:17778,localhost:17779" \
|
||||
@@ -165,7 +167,7 @@ test-native: up
|
||||
go test -v -timeout=10m ./test/mq/integration/...
|
||||
|
||||
# Basic native tests
|
||||
test-basic-native: up
|
||||
test-basic-native:
|
||||
@echo "Running basic tests natively..."
|
||||
cd ../.. && SEAWEED_MASTERS="localhost:19333,localhost:19334,localhost:19335" \
|
||||
SEAWEED_BROKERS="localhost:17777,localhost:17778,localhost:17779" \
|
||||
@@ -173,13 +175,13 @@ test-basic-native: up
|
||||
go test -v -timeout=10m ./test/mq/integration/ -run TestBasic
|
||||
|
||||
# Quick smoke test
|
||||
smoke-test: up
|
||||
smoke-test:
|
||||
@echo "Running smoke test..."
|
||||
docker-compose -f docker-compose.test.yml run --rm test-runner \
|
||||
sh -c "go test -v -timeout=5m ./test/mq/integration/ -run TestBasicPublishSubscribe"
|
||||
|
||||
# Performance benchmarks
|
||||
benchmark: up
|
||||
benchmark:
|
||||
@echo "Running performance benchmarks..."
|
||||
docker-compose -f docker-compose.test.yml run --rm test-runner \
|
||||
sh -c "go test -v -timeout=30m -bench=. ./test/mq/integration/..."
|
||||
@@ -208,7 +210,7 @@ report:
|
||||
sh -c "go test -v -timeout=30m ./test/mq/integration/... -json > /test-results/test-report.json"
|
||||
|
||||
# Load testing
|
||||
load-test: up
|
||||
load-test:
|
||||
@echo "Running load tests..."
|
||||
docker-compose -f docker-compose.test.yml run --rm test-runner \
|
||||
sh -c "go test -v -timeout=45m ./test/mq/integration/ -run TestLoad"
|
||||
|
||||
@@ -213,7 +213,7 @@ services:
|
||||
mq.broker
|
||||
-master=master0:9333,master1:9334,master2:9335
|
||||
-port=17777
|
||||
-ip=localhost
|
||||
-ip=127.0.0.1
|
||||
-dataCenter=dc1
|
||||
-rack=rack1
|
||||
networks:
|
||||
@@ -237,7 +237,7 @@ services:
|
||||
mq.broker
|
||||
-master=master0:9333,master1:9334,master2:9335
|
||||
-port=17778
|
||||
-ip=localhost
|
||||
-ip=127.0.0.1
|
||||
-dataCenter=dc1
|
||||
-rack=rack2
|
||||
networks:
|
||||
@@ -256,7 +256,7 @@ services:
|
||||
mq.broker
|
||||
-master=master0:9333,master1:9334,master2:9335
|
||||
-port=17779
|
||||
-ip=localhost
|
||||
-ip=127.0.0.1
|
||||
-dataCenter=dc2
|
||||
-rack=rack1
|
||||
networks:
|
||||
|
||||
@@ -16,9 +16,8 @@ func TestBasicPublishSubscribe(t *testing.T) {
|
||||
suite := NewIntegrationTestSuite(t)
|
||||
require.NoError(t, suite.Setup())
|
||||
|
||||
// Test configuration
|
||||
namespace := "test"
|
||||
topicName := "basic-pubsub"
|
||||
topicName := fmt.Sprintf("basic-pubsub-%d", time.Now().UnixNano()) // Unique topic name per run
|
||||
testSchema := CreateTestSchema()
|
||||
messageCount := 10
|
||||
|
||||
@@ -27,12 +26,12 @@ func TestBasicPublishSubscribe(t *testing.T) {
|
||||
Namespace: namespace,
|
||||
TopicName: topicName,
|
||||
PartitionCount: 1,
|
||||
PublisherName: "test-publisher",
|
||||
PublisherName: "basic-publisher",
|
||||
RecordType: testSchema,
|
||||
}
|
||||
|
||||
publisher, err := suite.CreatePublisher(pubConfig)
|
||||
require.NoError(t, err, "Failed to create publisher")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create subscriber
|
||||
subConfig := &SubscriberTestConfig{
|
||||
@@ -51,6 +50,7 @@ func TestBasicPublishSubscribe(t *testing.T) {
|
||||
// Set up message collector
|
||||
collector := NewMessageCollector(messageCount)
|
||||
subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) {
|
||||
t.Logf("[Subscriber] Received message with key: %s, ts: %d", string(m.Data.Key), m.Data.TsNs)
|
||||
collector.AddMessage(TestMessage{
|
||||
ID: fmt.Sprintf("msg-%d", len(collector.GetMessages())),
|
||||
Content: m.Data.Value,
|
||||
@@ -68,6 +68,7 @@ func TestBasicPublishSubscribe(t *testing.T) {
|
||||
}()
|
||||
|
||||
// Wait for subscriber to be ready
|
||||
t.Logf("[Test] Waiting for subscriber to be ready...")
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// Publish test messages
|
||||
@@ -80,12 +81,14 @@ func TestBasicPublishSubscribe(t *testing.T) {
|
||||
RecordEnd()
|
||||
|
||||
key := []byte(fmt.Sprintf("key-%d", i))
|
||||
t.Logf("[Publisher] Publishing message %d with key: %s", i, string(key))
|
||||
err := publisher.PublishRecord(key, record)
|
||||
require.NoError(t, err, "Failed to publish message %d", i)
|
||||
}
|
||||
|
||||
// Wait for messages to be received
|
||||
t.Logf("[Test] Waiting for messages to be received...")
|
||||
messages := collector.WaitForMessages(30 * time.Second)
|
||||
t.Logf("[Test] WaitForMessages returned. Received %d messages.", len(messages))
|
||||
|
||||
// Verify all messages were received
|
||||
assert.Len(t, messages, messageCount, "Expected %d messages, got %d", messageCount, len(messages))
|
||||
@@ -95,6 +98,8 @@ func TestBasicPublishSubscribe(t *testing.T) {
|
||||
assert.NotEmpty(t, msg.Content, "Message %d should have content", i)
|
||||
assert.NotEmpty(t, msg.Key, "Message %d should have key", i)
|
||||
}
|
||||
|
||||
t.Logf("[Test] TestBasicPublishSubscribe completed.")
|
||||
}
|
||||
|
||||
func TestMultipleConsumers(t *testing.T) {
|
||||
|
||||
@@ -37,6 +37,7 @@ type IntegrationTestSuite struct {
|
||||
agents map[string]*agent.MessageQueueAgent
|
||||
publishers map[string]*pub_client.TopicPublisher
|
||||
subscribers map[string]*sub_client.TopicSubscriber
|
||||
subCancels map[string]context.CancelFunc
|
||||
cleanupOnce sync.Once
|
||||
t *testing.T
|
||||
}
|
||||
@@ -55,6 +56,7 @@ func NewIntegrationTestSuite(t *testing.T) *IntegrationTestSuite {
|
||||
agents: make(map[string]*agent.MessageQueueAgent),
|
||||
publishers: make(map[string]*pub_client.TopicPublisher),
|
||||
subscribers: make(map[string]*sub_client.TopicSubscriber),
|
||||
subCancels: make(map[string]context.CancelFunc),
|
||||
t: t,
|
||||
}
|
||||
}
|
||||
@@ -75,16 +77,34 @@ func (its *IntegrationTestSuite) Setup() error {
|
||||
// Cleanup performs cleanup operations
|
||||
func (its *IntegrationTestSuite) Cleanup() {
|
||||
its.cleanupOnce.Do(func() {
|
||||
// Close all subscribers (they use context cancellation)
|
||||
for name, _ := range its.subscribers {
|
||||
// Close all subscribers first (they use context cancellation)
|
||||
for name := range its.subscribers {
|
||||
if cancel, ok := its.subCancels[name]; ok && cancel != nil {
|
||||
cancel()
|
||||
its.t.Logf("Cancelled subscriber context: %s", name)
|
||||
}
|
||||
its.t.Logf("Cleaned up subscriber: %s", name)
|
||||
}
|
||||
|
||||
// Wait a moment for gRPC connections to close gracefully
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// Close all publishers
|
||||
for name, publisher := range its.publishers {
|
||||
if publisher != nil {
|
||||
publisher.Shutdown()
|
||||
its.t.Logf("Cleaned up publisher: %s", name)
|
||||
// Add timeout to prevent deadlock during shutdown
|
||||
done := make(chan bool, 1)
|
||||
go func(p *pub_client.TopicPublisher, n string) {
|
||||
p.Shutdown()
|
||||
done <- true
|
||||
}(publisher, name)
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
its.t.Logf("Cleaned up publisher: %s", name)
|
||||
case <-time.After(5 * time.Second):
|
||||
its.t.Logf("Publisher shutdown timed out: %s", name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,8 +155,9 @@ func (its *IntegrationTestSuite) CreateSubscriber(config *SubscriberTestConfig)
|
||||
}
|
||||
|
||||
offsetChan := make(chan sub_client.KeyedOffset, 1024)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
subscriber := sub_client.NewTopicSubscriber(
|
||||
context.Background(),
|
||||
ctx,
|
||||
its.env.Brokers,
|
||||
subscriberConfig,
|
||||
contentConfig,
|
||||
@@ -144,6 +165,7 @@ func (its *IntegrationTestSuite) CreateSubscriber(config *SubscriberTestConfig)
|
||||
)
|
||||
|
||||
its.subscribers[config.ConsumerInstanceId] = subscriber
|
||||
its.subCancels[config.ConsumerInstanceId] = cancel
|
||||
return subscriber, nil
|
||||
}
|
||||
|
||||
@@ -204,6 +226,7 @@ type MessageCollector struct {
|
||||
mutex sync.RWMutex
|
||||
waitCh chan struct{}
|
||||
expected int
|
||||
closed bool // protect against closing waitCh multiple times
|
||||
}
|
||||
|
||||
// NewMessageCollector creates a new message collector
|
||||
@@ -221,8 +244,9 @@ func (mc *MessageCollector) AddMessage(msg TestMessage) {
|
||||
defer mc.mutex.Unlock()
|
||||
|
||||
mc.messages = append(mc.messages, msg)
|
||||
if len(mc.messages) >= mc.expected {
|
||||
if len(mc.messages) >= mc.expected && !mc.closed {
|
||||
close(mc.waitCh)
|
||||
mc.closed = true
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user