diff --git a/beacon-chain/sync/initial-sync/BUILD.bazel b/beacon-chain/sync/initial-sync/BUILD.bazel index 0a36c6313..f7b196c06 100644 --- a/beacon-chain/sync/initial-sync/BUILD.bazel +++ b/beacon-chain/sync/initial-sync/BUILD.bazel @@ -35,6 +35,8 @@ go_test( name = "go_default_test", srcs = ["round_robin_test.go"], embed = [":go_default_library"], + race = "on", + tags = ["race_on"], deps = [ "//beacon-chain/blockchain/testing:go_default_library", "//beacon-chain/p2p/testing:go_default_library", diff --git a/beacon-chain/sync/initial-sync/round_robin.go b/beacon-chain/sync/initial-sync/round_robin.go index 369bdd6a8..1d0e324be 100644 --- a/beacon-chain/sync/initial-sync/round_robin.go +++ b/beacon-chain/sync/initial-sync/round_robin.go @@ -5,7 +5,7 @@ import ( "fmt" "io" "sort" - "sync" + "sync/atomic" "time" "github.com/libp2p/go-libp2p-core/peer" @@ -42,13 +42,10 @@ func (s *InitialSync) roundRobinSync(genesis time.Time) error { counter := ratecounter.NewRateCounter(counterSeconds * time.Second) var lastEmptyRequests int - errChan := make(chan error) // Step 1 - Sync to end of finalized epoch. for s.chain.HeadSlot() < helpers.StartSlot(highestFinalizedEpoch()+1) { root, finalizedEpoch, peers := bestFinalized() - var blocks []*eth.BeaconBlock - // request a range of blocks to be requested from multiple peers. // Example: // - number of peers = 4 @@ -61,11 +58,14 @@ func (s *InitialSync) roundRobinSync(genesis time.Time) error { if len(peers) == 0 { return nil, errors.WithStack(errors.New("no peers left to request blocks")) } - var wg sync.WaitGroup + var p2pRequestCount int32 + errChan := make(chan error) + blocksChan := make(chan []*eth.BeaconBlock) // Handle block large block ranges of skipped slots. start += count * uint64(lastEmptyRequests*len(peers)) + atomic.AddInt32(&p2pRequestCount, int32(len(peers))) for i, pid := range peers { if ctx.Err() != nil { return nil, ctx.Err() @@ -90,10 +90,14 @@ func (s *InitialSync) roundRobinSync(genesis time.Time) error { Step: step, } - // Fulfill requests asynchronously, in parallel, and wait for results from all. - wg.Add(1) go func(i int, pid peer.ID) { - defer wg.Done() + defer func() { + zeroIfIAmTheLast := atomic.AddInt32(&p2pRequestCount, -1) + if zeroIfIAmTheLast == 0 { + close(blocksChan) + } + }() + resp, err := s.requestBlocks(ctx, req, pid) log.WithField("peer", pid.Pretty()).Debugf("Received %d blocks", len(resp)) if err != nil { @@ -110,28 +114,30 @@ func (s *InitialSync) roundRobinSync(genesis time.Time) error { errChan <- errors.WithStack(errors.New("no peers left to request blocks")) return } - _, err = request(start, step, count/uint64(len(ps)) /*count*/, ps, int(count)%len(ps) /*remainder*/) + resp, err = request(start, step, count/uint64(len(ps)) /*count*/, ps, int(count)%len(ps) /*remainder*/) if err != nil { errChan <- err return } } - blocks = append(blocks, resp...) + blocksChan <- resp }(i, pid) } - // Wait for done signal or any error. - done := make(chan interface{}) - go func() { - wg.Wait() - done <- true - }() + var unionRespBlocks []*eth.BeaconBlock for { select { case err := <-errChan: return nil, err - case <-done: - return blocks, nil + case resp, ok := <-blocksChan: + if ok { + // if this synchronization becomes a bottleneck: + // think about immediately allocating space for all peers in unionRespBlocks, + // and write without synchronization + unionRespBlocks = append(unionRespBlocks, resp...) + } else { + return unionRespBlocks, nil + } } } }