Compare commits

...

4 Commits

Author SHA1 Message Date
William Banfield
d729f4cc06 leak check 2021-07-12 18:09:35 -04:00
William Banfield
80103913f3 Merge branch 'master' into wb/dispatch-lightblock-race-cond 2021-07-12 17:48:50 -04:00
William Banfield
721b05deee use require over assert and spell canceled the american way 2021-07-12 17:31:08 -04:00
William Banfield
f4700c6cff statesync: remove outgoing calls race condition 2021-07-12 16:57:15 -04:00
2 changed files with 121 additions and 4 deletions

View File

@@ -49,13 +49,12 @@ func newDispatcher(requestCh chan<- p2p.Envelope, timeout time.Duration) *dispat
// in a list, tracks the call and waits for the reactor to pass along the response
func (d *dispatcher) LightBlock(ctx context.Context, height int64) (*types.LightBlock, types.NodeID, error) {
d.mtx.Lock()
outgoingCalls := len(d.calls)
d.mtx.Unlock()
// check to see that the dispatcher is connected to at least one peer
if d.availablePeers.Len() == 0 && outgoingCalls == 0 {
if d.availablePeers.Len() == 0 && len(d.calls) == 0 {
d.mtx.Unlock()
return nil, "", errNoConnectedPeers
}
d.mtx.Unlock()
// fetch the next peer id in the list and request a light block from that
// peer

View File

@@ -8,6 +8,7 @@ import (
"testing"
"time"
"github.com/fortytw2/leaktest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -17,6 +18,7 @@ import (
)
func TestDispatcherBasic(t *testing.T) {
t.Cleanup(leaktest.Check(t))
ch := make(chan p2p.Envelope, 100)
closeCh := make(chan struct{})
@@ -49,7 +51,77 @@ func TestDispatcherBasic(t *testing.T) {
wg.Wait()
}
func TestDispatcherReturnsNoBlock(t *testing.T) {
t.Cleanup(leaktest.Check(t))
ch := make(chan p2p.Envelope, 100)
d := newDispatcher(ch, 1*time.Second)
peerFromSet := createPeerSet(1)[0]
d.addPeer(peerFromSet)
doneCh := make(chan struct{})
go func() {
err := d.respond(nil, peerFromSet)
require.Nil(t, err)
close(doneCh)
}()
lb, peerResult, err := d.LightBlock(context.Background(), 1)
<-doneCh
require.Nil(t, lb)
require.Nil(t, err)
require.Equal(t, peerFromSet, peerResult)
}
func TestDispatcherErrorsWhenNoPeers(t *testing.T) {
t.Cleanup(leaktest.Check(t))
ch := make(chan p2p.Envelope, 100)
d := newDispatcher(ch, 1*time.Second)
lb, peerResult, err := d.LightBlock(context.Background(), 1)
require.Nil(t, lb)
require.Empty(t, peerResult)
require.Equal(t, errNoConnectedPeers, err)
}
func TestDispatcherReturnsBlockOncePeerAvailable(t *testing.T) {
t.Cleanup(leaktest.Check(t))
ch := make(chan p2p.Envelope, 100)
d := newDispatcher(ch, 1*time.Second)
peerFromSet := createPeerSet(1)[0]
d.addPeer(peerFromSet)
ctx := context.Background()
wrapped, cancelFunc := context.WithCancel(ctx)
doneCh := make(chan struct{})
go func() {
lb, peerResult, err := d.LightBlock(wrapped, 1)
require.Nil(t, lb)
require.Equal(t, peerFromSet, peerResult)
require.Nil(t, err)
close(doneCh)
}()
cancelFunc()
<-doneCh
go func() {
lb := &types.LightBlock{}
asProto, err := lb.ToProto()
require.Nil(t, err)
err = d.respond(asProto, peerFromSet)
require.Nil(t, err)
}()
lb, peerResult, err := d.LightBlock(context.Background(), 1)
require.NotNil(t, lb)
require.Equal(t, peerFromSet, peerResult)
require.Nil(t, err)
}
func TestDispatcherProviders(t *testing.T) {
t.Cleanup(leaktest.Check(t))
ch := make(chan p2p.Envelope, 100)
chainID := "state-sync-test"
@@ -78,6 +150,7 @@ func TestDispatcherProviders(t *testing.T) {
}
func TestPeerListBasic(t *testing.T) {
t.Cleanup(leaktest.Check(t))
peerList := newPeerList()
assert.Zero(t, peerList.Len())
numPeers := 10
@@ -108,7 +181,52 @@ func TestPeerListBasic(t *testing.T) {
}
func TestPeerListBlocksWhenEmpty(t *testing.T) {
t.Cleanup(leaktest.Check(t))
peerList := newPeerList()
require.Zero(t, peerList.Len())
doneCh := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
peerList.Pop(ctx)
close(doneCh)
}()
select {
case <-doneCh:
t.Error("empty peer list should not have returned result")
case <-time.After(100 * time.Millisecond):
}
}
func TestEmptyPeerListReturnsWhenContextCanceled(t *testing.T) {
t.Cleanup(leaktest.Check(t))
peerList := newPeerList()
require.Zero(t, peerList.Len())
doneCh := make(chan struct{})
ctx := context.Background()
wrapped, cancel := context.WithCancel(ctx)
go func() {
peerList.Pop(wrapped)
close(doneCh)
}()
select {
case <-doneCh:
t.Error("empty peer list should not have returned result")
case <-time.After(100 * time.Millisecond):
}
cancel()
select {
case <-doneCh:
case <-time.After(100 * time.Millisecond):
t.Error("peer list should have returned after context canceled")
}
}
func TestPeerListConcurrent(t *testing.T) {
t.Cleanup(leaktest.Check(t))
peerList := newPeerList()
numPeers := 10