Same range parallel downloader (#8554)

will do a more advanced version later
This commit is contained in:
Giulio rebuffo 2023-10-22 17:30:27 +02:00 committed by GitHub
parent ee942473d9
commit 8f29ca7405
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 93 additions and 44 deletions

View File

@ -35,8 +35,8 @@ func (d *Downloader) DownloadEpoch(tx kv.RwTx, ctx context.Context, epoch uint64
}
// NOTE: the downloader does not perform any real verification on these blocks
// validation must be done separately
for _, v := range blocks {
err := d.beacondDB.WriteBlock(ctx, tx, v.Data, true)
for _, v := range blocks.Data {
err := d.beacondDB.WriteBlock(ctx, tx, v, true)
if err != nil {
return err
}

View File

@ -33,7 +33,7 @@ func NewBeaconChainDatabaseFilesystem(rawDB RawBeaconBlockChain, executionEngine
}
}
func (b beaconChainDatabaseFilesystem) GetRange(ctx context.Context, tx kv.Tx, from uint64, count uint64) ([]*peers.PeeredObject[*cltypes.SignedBeaconBlock], error) {
func (b beaconChainDatabaseFilesystem) GetRange(ctx context.Context, tx kv.Tx, from uint64, count uint64) (*peers.PeeredObject[[]*cltypes.SignedBeaconBlock], error) {
// Retrieve block roots for each ranged slot
beaconBlockRooots, slots, err := beacon_indicies.ReadBeaconBlockRootsInSlotRange(ctx, tx, from, count)
if err != nil {
@ -41,10 +41,10 @@ func (b beaconChainDatabaseFilesystem) GetRange(ctx context.Context, tx kv.Tx, f
}
if len(beaconBlockRooots) == 0 {
return nil, nil
return &peers.PeeredObject[[]*cltypes.SignedBeaconBlock]{}, nil
}
blocks := []*peers.PeeredObject[*cltypes.SignedBeaconBlock]{}
blocks := []*cltypes.SignedBeaconBlock{}
for idx, blockRoot := range beaconBlockRooots {
slot := slots[idx]
@ -60,9 +60,9 @@ func (b beaconChainDatabaseFilesystem) GetRange(ctx context.Context, tx kv.Tx, f
return nil, err
}
blocks = append(blocks, &peers.PeeredObject[*cltypes.SignedBeaconBlock]{Data: block})
blocks = append(blocks, block)
}
return blocks, nil
return &peers.PeeredObject[[]*cltypes.SignedBeaconBlock]{Data: blocks}, nil
}

View File

@ -120,12 +120,12 @@ func TestBlockSaverStoreLoadPurgeFull(t *testing.T) {
blks, err := store.GetRange(context.Background(), tx, block.Block.Slot, 1)
require.NoError(t, err)
require.Equal(t, len(blks), 1)
require.Equal(t, len(blks.Data), 1)
expectedRoot, err := block.HashSSZ()
require.NoError(t, err)
haveRoot, err := blks[0].Data.HashSSZ()
haveRoot, err := blks.Data[0].HashSSZ()
require.NoError(t, err)
require.Equal(t, expectedRoot, haveRoot)
@ -134,5 +134,5 @@ func TestBlockSaverStoreLoadPurgeFull(t *testing.T) {
newBlks, err := store.GetRange(context.Background(), tx, block.Block.Slot, 1)
require.NoError(t, err)
require.Equal(t, len(newBlks), 0)
require.Equal(t, len(newBlks.Data), 0)
}

View File

@ -3,6 +3,7 @@ package persistence
import (
"context"
"sync"
"time"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/cl/cltypes"
@ -18,7 +19,7 @@ type BeaconRpcSource struct {
rpc *rpc.BeaconRpcP2P
}
func (b *BeaconRpcSource) SaveBlocks(ctx context.Context, blocks []*peers.PeeredObject[*cltypes.SignedBeaconBlock]) error {
func (b *BeaconRpcSource) SaveBlocks(ctx context.Context, blocks *peers.PeeredObject[*cltypes.SignedBeaconBlock]) error {
// it is a no-op because there is no need to do this
return nil
}
@ -29,20 +30,34 @@ func NewBeaconRpcSource(rpc *rpc.BeaconRpcP2P) *BeaconRpcSource {
}
}
func (b *BeaconRpcSource) GetRange(ctx context.Context, _ kv.Tx, from uint64, count uint64) ([]*peers.PeeredObject[*cltypes.SignedBeaconBlock], error) {
func (b *BeaconRpcSource) GetRange(ctx context.Context, _ kv.Tx, from uint64, count uint64) (*peers.PeeredObject[[]*cltypes.SignedBeaconBlock], error) {
if count == 0 {
return nil, nil
}
responses, pid, err := b.rpc.SendBeaconBlocksByRangeReq(ctx, from, count)
if err != nil {
b.rpc.BanPeer(pid)
return nil, err
var responses *peers.PeeredObject[[]*cltypes.SignedBeaconBlock]
reqInterval := time.NewTicker(200 * time.Millisecond)
doneRespCh := make(chan *peers.PeeredObject[[]*cltypes.SignedBeaconBlock], 1)
defer reqInterval.Stop()
for {
select {
case <-reqInterval.C:
go func() {
responses, pid, err := b.rpc.SendBeaconBlocksByRangeReq(ctx, from, count)
if err != nil {
return
}
select {
case doneRespCh <- &peers.PeeredObject[[]*cltypes.SignedBeaconBlock]{Data: responses, Peer: pid}:
default:
}
}()
case <-ctx.Done():
return nil, ctx.Err()
case responses = <-doneRespCh:
return responses, nil
}
}
out := make([]*peers.PeeredObject[*cltypes.SignedBeaconBlock], 0, len(responses))
for _, v := range responses {
out = append(out, &peers.PeeredObject[*cltypes.SignedBeaconBlock]{Data: v, Peer: pid})
}
return out, nil
}
// a noop for rpc source since we always return new data
@ -93,15 +108,16 @@ func (b *GossipSource) grabOrCreate(ctx context.Context, id uint64) chan *peers.
}
return ch
}
func (b *GossipSource) GetRange(ctx context.Context, _ kv.Tx, from uint64, count uint64) ([]*peers.PeeredObject[*cltypes.SignedBeaconBlock], error) {
out := make([]*peers.PeeredObject[*cltypes.SignedBeaconBlock], 0, count)
func (b *GossipSource) GetRange(ctx context.Context, _ kv.Tx, from uint64, count uint64) (*peers.PeeredObject[[]*cltypes.SignedBeaconBlock], error) {
out := &peers.PeeredObject[[]*cltypes.SignedBeaconBlock]{}
for i := from; i < from+count; i++ {
ch := b.grabOrCreate(ctx, i)
select {
case <-ctx.Done():
return nil, ctx.Err()
case item := <-ch:
out = append(out, item)
out.Data = append(out.Data, item.Data)
out.Peer = item.Peer
}
}
return out, nil

View File

@ -11,7 +11,7 @@ import (
)
type BlockSource interface {
GetRange(ctx context.Context, tx kv.Tx, from uint64, count uint64) ([]*peers.PeeredObject[*cltypes.SignedBeaconBlock], error)
GetRange(ctx context.Context, tx kv.Tx, from uint64, count uint64) (*peers.PeeredObject[[]*cltypes.SignedBeaconBlock], error)
PurgeRange(ctx context.Context, tx kv.RwTx, from uint64, count uint64) error
}

View File

@ -2,6 +2,7 @@ package network
import (
"sync"
"time"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/log/v3"
@ -85,9 +86,33 @@ func (b *BackwardBeaconDownloader) RequestMore(ctx context.Context) {
if start > b.slotToDownload {
start = 0
}
responses, _, err := b.rpc.SendBeaconBlocksByRangeReq(ctx, start, count)
if err != nil {
return
reqInterval := time.NewTicker(100 * time.Millisecond)
doneRespCh := make(chan []*cltypes.SignedBeaconBlock, 1)
var responses []*cltypes.SignedBeaconBlock
Loop:
for {
select {
case <-reqInterval.C:
go func() {
responses, peerId, err := b.rpc.SendBeaconBlocksByRangeReq(ctx, start, count)
if err != nil {
return
}
if len(responses) == 0 {
b.rpc.BanPeer(peerId)
return
}
select {
case doneRespCh <- responses:
default:
}
}()
case <-ctx.Done():
return
case responses = <-doneRespCh:
break Loop
}
}
// Import new blocks, order is forward so reverse the whole packet
for i := len(responses) - 1; i >= 0; i-- {

View File

@ -166,14 +166,13 @@ func ConsensusClStages(ctx context.Context,
) *clstages.StageGraph[*Cfg, Args] {
rpcSource := persistence.NewBeaconRpcSource(cfg.rpc)
gossipSource := persistence.NewGossipSource(ctx, cfg.gossipManager)
processBlock := func(tx kv.RwTx, block *peers.PeeredObject[*cltypes.SignedBeaconBlock], newPayload, fullValidation bool) error {
if err := cfg.forkChoice.OnBlock(block.Data, newPayload, fullValidation); err != nil {
log.Warn("fail to process block", "reason", err, "slot", block.Data.Block.Slot)
cfg.rpc.BanPeer(block.Peer)
processBlock := func(tx kv.RwTx, block *cltypes.SignedBeaconBlock, newPayload, fullValidation bool) error {
if err := cfg.forkChoice.OnBlock(block, newPayload, fullValidation); err != nil {
log.Warn("fail to process block", "reason", err, "slot", block.Block.Slot)
return err
}
// Write block to database optimistically if we are very behind.
return cfg.beaconDB.WriteBlock(ctx, tx, block.Data, false)
return cfg.beaconDB.WriteBlock(ctx, tx, block, false)
}
// TODO: this is an ugly hack, but it works! Basically, we want shared state in the clstages.
@ -282,21 +281,29 @@ func ConsensusClStages(ctx context.Context,
if err != nil {
return err
}
// If we got an empty packet ban the peer
if len(blocks.Data) == 0 {
cfg.rpc.BanPeer(blocks.Peer)
continue MainLoop
}
logger.Info("[Caplin] Epoch downloaded", "epoch", currentEpoch)
for _, block := range blocks {
if shouldInsert && block.Data.Version() >= clparams.BellatrixVersion {
executionPayload := block.Data.Block.Body.ExecutionPayload
for _, block := range blocks.Data {
if shouldInsert && block.Version() >= clparams.BellatrixVersion {
executionPayload := block.Block.Body.ExecutionPayload
body := executionPayload.Body()
txs, err := types.DecodeTransactions(body.Transactions)
if err != nil {
log.Warn("bad blocks segment received", "err", err)
cfg.rpc.BanPeer(blocks.Peer)
currentEpoch = utils.Max64(args.seenEpoch, currentEpoch-1)
continue MainLoop
}
header, err := executionPayload.RlpHeader()
if err != nil {
log.Warn("bad blocks segment received", "err", err)
cfg.rpc.BanPeer(blocks.Peer)
currentEpoch = utils.Max64(args.seenEpoch, currentEpoch-1)
continue MainLoop
}
@ -304,6 +311,7 @@ func ConsensusClStages(ctx context.Context,
}
if err := processBlock(tx, block, false, true); err != nil {
log.Warn("bad blocks segment received", "err", err)
cfg.rpc.BanPeer(blocks.Peer)
currentEpoch = utils.Max64(args.seenEpoch, currentEpoch-1)
continue MainLoop
}
@ -337,7 +345,7 @@ func ConsensusClStages(ctx context.Context,
"targetSlot", args.targetSlot,
"requestedSlots", totalRequest,
)
respCh := make(chan []*peers.PeeredObject[*cltypes.SignedBeaconBlock])
respCh := make(chan *peers.PeeredObject[[]*cltypes.SignedBeaconBlock])
errCh := make(chan error)
sources := []persistence.BlockSource{gossipSource}
@ -372,7 +380,7 @@ func ConsensusClStages(ctx context.Context,
case err := <-errCh:
return err
case blocks := <-respCh:
for _, block := range blocks {
for _, block := range blocks.Data {
if err := processBlock(tx, block, true, true); err != nil {
return err
}
@ -505,7 +513,7 @@ func ConsensusClStages(ctx context.Context,
return err
}
for _, block := range blocks {
for _, block := range blocks.Data {
err := processBlock(tx, block, true, true)
if err != nil {
// its okay if block processing fails

View File

@ -67,7 +67,7 @@ func (b *BeaconRpcP2P) sendBlocksRequest(ctx context.Context, topic string, reqD
if message.Error {
rd := snappy.NewReader(bytes.NewBuffer(message.Data))
errBytes, _ := io.ReadAll(rd)
log.Debug("received range req error", "err", string(errBytes), "raw", string(message.Data))
log.Trace("received range req error", "err", string(errBytes), "raw", string(message.Data))
return nil, message.Peer.Pid, nil
}

View File

@ -233,7 +233,7 @@ func (s *SentinelServer) SendRequest(ctx context.Context, req *sentinelrpc.Reque
}
resp, err := s.requestPeer(ctx, pid, req)
if err != nil {
s.logger.Debug("[sentinel] peer gave us bad data", "peer", pid, "err", err)
s.logger.Trace("[sentinel] peer gave us bad data", "peer", pid, "err", err)
// we simply retry
return false
}

View File

@ -228,7 +228,7 @@ func (b *Epochs) Run(cctx *Context) error {
for i := b.FromEpoch; i <= b.ToEpoch; i = i + 1 {
ii := i
egg.Go(func() error {
var blocks []*peers.PeeredObject[*cltypes.SignedBeaconBlock]
var blocks *peers.PeeredObject[[]*cltypes.SignedBeaconBlock]
for {
blocks, err = rpcSource.GetRange(ctx, tx, uint64(ii)*beaconConfig.SlotsPerEpoch, beaconConfig.SlotsPerEpoch)
if err != nil {
@ -237,10 +237,10 @@ func (b *Epochs) Run(cctx *Context) error {
break
}
}
for _, v := range blocks {
for _, v := range blocks.Data {
tk.Increment(1)
_, _ = beaconDB, v
err := beaconDB.WriteBlock(ctx, tx, v.Data, true)
err := beaconDB.WriteBlock(ctx, tx, v, true)
if err != nil {
return err
}