erigon-pulse/cmd/erigon-cl/network/beacon_downloader.go

124 lines
3.5 KiB
Go

package network
import (
"sync"
"time"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"golang.org/x/net/context"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/rpc"
)
// Input: the currently highest slot processed and the list of blocks we want to know process
// Output: the new last new highest slot processed and an error possibly?
type ProcessFn func(
highestSlotProcessed uint64,
highestBlockRootProcessed libcommon.Hash,
blocks []*cltypes.SignedBeaconBlock) (
newHighestSlotProcessed uint64,
newHighestBlockRootProcessed libcommon.Hash,
err error)
type ForwardBeaconDownloader struct {
ctx context.Context
highestSlotProcessed uint64
highestBlockRootProcessed libcommon.Hash
rpc *rpc.BeaconRpcP2P
process ProcessFn
segments []*cltypes.SignedBeaconBlock // Unprocessed downloaded segments
mu sync.Mutex
}
func NewForwardBeaconDownloader(ctx context.Context, rpc *rpc.BeaconRpcP2P) *ForwardBeaconDownloader {
return &ForwardBeaconDownloader{
ctx: ctx,
segments: []*cltypes.SignedBeaconBlock{},
rpc: rpc,
}
}
// SetProcessFunction sets the function used to process segments.
func (f *ForwardBeaconDownloader) SetProcessFunction(fn ProcessFn) {
f.mu.Lock()
defer f.mu.Unlock()
f.process = fn
}
// SetHighestProcessedSlot sets the highest processed slot so far.
func (f *ForwardBeaconDownloader) SetHighestProcessedSlot(highestSlotProcessed uint64) {
f.mu.Lock()
defer f.mu.Unlock()
f.highestSlotProcessed = highestSlotProcessed
}
// SetHighestProcessedRoot sets the highest processed block root so far.
func (f *ForwardBeaconDownloader) SetHighestProcessedRoot(root libcommon.Hash) {
f.mu.Lock()
defer f.mu.Unlock()
f.highestBlockRootProcessed = root
}
// HighestProcessedRoot returns the highest processed block root so far.
func (f *ForwardBeaconDownloader) HighestProcessedRoot() libcommon.Hash {
f.mu.Lock()
defer f.mu.Unlock()
return f.highestBlockRootProcessed
}
// addSegment process new block segment.
func (f *ForwardBeaconDownloader) addSegment(block *cltypes.SignedBeaconBlock) {
f.mu.Lock()
defer f.mu.Unlock()
// Skip if does continue the segment.
f.segments = append(f.segments, block)
}
func (f *ForwardBeaconDownloader) RequestMore() {
count := uint64(4) // dont need many
responses, pid, err := f.rpc.SendBeaconBlocksByRangeReq(f.highestSlotProcessed+1, count)
if err != nil {
f.rpc.BanPeer(pid)
// Wait a bit in this case (we do not need to be super performant here).
time.Sleep(time.Second)
return
}
for _, response := range responses {
f.addSegment(response)
}
}
// ProcessBlocks processes blocks we accumulated.
func (f *ForwardBeaconDownloader) ProcessBlocks() error {
if len(f.segments) == 0 {
return nil
}
f.mu.Lock()
defer f.mu.Unlock()
var err error
var highestSlotProcessed uint64
var highestBlockRootProcessed libcommon.Hash
if highestSlotProcessed, highestBlockRootProcessed, err = f.process(f.highestSlotProcessed, f.highestBlockRootProcessed, f.segments); err != nil {
return err
}
f.highestSlotProcessed = highestSlotProcessed
f.highestBlockRootProcessed = highestBlockRootProcessed
// clear segments
f.segments = f.segments[:0]
return nil
}
// GetHighestProcessedSlot retrieve the highest processed slot we accumulated.
func (f *ForwardBeaconDownloader) GetHighestProcessedSlot() uint64 {
f.mu.Lock()
defer f.mu.Unlock()
return f.highestSlotProcessed
}
func (f *ForwardBeaconDownloader) Peers() (uint64, error) {
return f.rpc.Peers()
}