From fb10d1c70538461aae618eb8fb93fbe53fc83c33 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Wed, 12 Jan 2022 12:57:23 -0500 Subject: [PATCH] statesync: clarify test cleanup (#7565) --- internal/statesync/reactor_test.go | 9 ++++--- internal/statesync/syncer_test.go | 43 +++++++++++++++++++----------- 2 files changed, 32 insertions(+), 20 deletions(-) diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index 4ff02d0bc..4e81d53b0 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -193,13 +193,14 @@ func setup( rts.reactor.metrics, ) + ctx, cancel := context.WithCancel(ctx) + require.NoError(t, rts.reactor.Start(ctx)) require.True(t, rts.reactor.IsRunning()) - t.Cleanup(func() { - rts.reactor.Wait() - require.False(t, rts.reactor.IsRunning()) - }) + t.Cleanup(cancel) + t.Cleanup(rts.reactor.Wait) + t.Cleanup(leaktest.Check(t)) return rts } diff --git a/internal/statesync/syncer_test.go b/internal/statesync/syncer_test.go index 3126a7688..7ce293890 100644 --- a/internal/statesync/syncer_test.go +++ b/internal/statesync/syncer_test.go @@ -133,27 +133,38 @@ func TestSyncer_SyncAny(t *testing.T) { chunkRequests := make(map[uint32]int) chunkRequestsMtx := sync.Mutex{} - var wg sync.WaitGroup - wg.Add(4) + chunkProcessDone := make(chan struct{}) go func() { - for e := range rts.chunkOutCh { - msg, ok := e.Message.(*ssproto.ChunkRequest) - assert.True(t, ok) + defer close(chunkProcessDone) + var seen int + for { + if seen >= 4 { + return + } - assert.EqualValues(t, 1, msg.Height) - assert.EqualValues(t, 1, msg.Format) - assert.LessOrEqual(t, msg.Index, uint32(len(chunks))) + select { + case <-ctx.Done(): + t.Logf("sent %d chunks", seen) + return + case e := <-rts.chunkOutCh: + msg, ok := e.Message.(*ssproto.ChunkRequest) + assert.True(t, ok) - added, err := rts.syncer.AddChunk(chunks[msg.Index]) - assert.NoError(t, err) - assert.True(t, added) + assert.EqualValues(t, 1, msg.Height) + assert.EqualValues(t, 1, msg.Format) + assert.LessOrEqual(t, msg.Index, uint32(len(chunks))) - chunkRequestsMtx.Lock() - chunkRequests[msg.Index]++ - chunkRequestsMtx.Unlock() + added, err := rts.syncer.AddChunk(chunks[msg.Index]) + assert.NoError(t, err) + assert.True(t, added) - wg.Done() + chunkRequestsMtx.Lock() + chunkRequests[msg.Index]++ + chunkRequestsMtx.Unlock() + seen++ + t.Logf("added chunk (%d of 4): %d", seen, msg.Index) + } } }() @@ -186,7 +197,7 @@ func TestSyncer_SyncAny(t *testing.T) { newState, lastCommit, err := rts.syncer.SyncAny(ctx, 0, func() error { return nil }) require.NoError(t, err) - wg.Wait() + <-chunkProcessDone chunkRequestsMtx.Lock() require.Equal(t, map[uint32]int{0: 1, 1: 2, 2: 1}, chunkRequests)