erigon-pulse/cl/phase1/network/backward_beacon_downloader.go

148 lines
4.1 KiB
Go

package network
import (
"sync"
"time"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/log/v3"
"golang.org/x/net/context"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/rpc"
)
// Whether the reverse downloader arrived at expected height or condition.
type OnNewBlock func(blk *cltypes.SignedBeaconBlock) (finished bool, err error)
type BackwardBeaconDownloader struct {
ctx context.Context
slotToDownload uint64
expectedRoot libcommon.Hash
rpc *rpc.BeaconRpcP2P
onNewBlock OnNewBlock
finished bool
mu sync.Mutex
}
func NewBackwardBeaconDownloader(ctx context.Context, rpc *rpc.BeaconRpcP2P) *BackwardBeaconDownloader {
return &BackwardBeaconDownloader{
ctx: ctx,
rpc: rpc,
}
}
// SetSlotToDownload sets slot to download.
func (b *BackwardBeaconDownloader) SetSlotToDownload(slot uint64) {
b.mu.Lock()
defer b.mu.Unlock()
b.slotToDownload = slot
}
// SetExpectedRoot sets the expected root we expect to download.
func (b *BackwardBeaconDownloader) SetExpectedRoot(root libcommon.Hash) {
b.mu.Lock()
defer b.mu.Unlock()
b.expectedRoot = root
}
// SetShouldStopAtFn sets the stop condition.
func (b *BackwardBeaconDownloader) SetOnNewBlock(onNewBlock OnNewBlock) {
b.mu.Lock()
defer b.mu.Unlock()
b.onNewBlock = onNewBlock
}
// HighestProcessedRoot returns the highest processed block root so far.
func (b *BackwardBeaconDownloader) Finished() bool {
b.mu.Lock()
defer b.mu.Unlock()
return b.finished
}
// Progress current progress.
func (b *BackwardBeaconDownloader) Progress() uint64 {
// Skip if it is not downloading or limit was reached
b.mu.Lock()
defer b.mu.Unlock()
return b.slotToDownload
}
// Peers returns the current number of peers connected to the BackwardBeaconDownloader.
func (b *BackwardBeaconDownloader) Peers() (uint64, error) {
return b.rpc.Peers()
}
// RequestMore downloads a range of blocks in a backward manner.
// The function sends a request for a range of blocks starting from a given slot and ending count blocks before it.
// It then processes the response by iterating over the blocks in reverse order and calling a provided callback function onNewBlock on each block.
// If the callback returns an error or signals that the download should be finished, the function will exit.
// If the block's root hash does not match the expected root hash, it will be rejected and the function will continue to the next block.
func (b *BackwardBeaconDownloader) RequestMore(ctx context.Context) {
count := uint64(64)
start := b.slotToDownload - count + 1
// Overflow? round to 0.
if start > b.slotToDownload {
start = 0
}
reqInterval := time.NewTicker(300 * time.Millisecond)
doneRespCh := make(chan []*cltypes.SignedBeaconBlock, 1)
var responses []*cltypes.SignedBeaconBlock
Loop:
for {
select {
case <-reqInterval.C:
go func() {
responses, peerId, err := b.rpc.SendBeaconBlocksByRangeReq(ctx, start, count)
if err != nil {
return
}
if responses == nil {
return
}
if len(responses) == 0 {
b.rpc.BanPeer(peerId)
return
}
select {
case doneRespCh <- responses:
default:
}
}()
case <-ctx.Done():
return
case responses = <-doneRespCh:
break Loop
}
}
// Import new blocks, order is forward so reverse the whole packet
for i := len(responses) - 1; i >= 0; i-- {
if b.finished {
return
}
segment := responses[i]
// is this new block root equal to the expected root?
blockRoot, err := segment.Block.HashSSZ()
if err != nil {
log.Debug("Could not compute block root while processing packet", "err", err)
continue
}
// No? Reject.
if blockRoot != b.expectedRoot {
log.Debug("Gotten unexpected root", "got", blockRoot, "expected", b.expectedRoot)
continue
}
// Yes? then go for the callback.
b.finished, err = b.onNewBlock(segment)
if err != nil {
log.Warn("Found error while processing packet", "err", err)
continue
}
// set expected root to the segment parent root
b.expectedRoot = segment.Block.ParentRoot
b.slotToDownload = segment.Block.Slot - 1 // update slot (might be inexact but whatever)
}
}