Avoid constantly triggering stageloop when using Engine API (#4797)

* avoid constantly triggering stageloop when using Engine API

* fix lint + test

* fixed comments

* ops

* little fixes here and there

Co-authored-by: giuliorebuffo <giuliorebuffo@system76-pc.localdomain>
This commit is contained in:
Giulio rebuffo 2022-07-23 18:57:23 +02:00 committed by GitHub
parent d2389a1f26
commit 1cb6be02a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 208 additions and 186 deletions

View File

@ -40,7 +40,7 @@ func TestEthSubscribe(t *testing.T) {
m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed
ctx := context.Background()
backendServer := privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, false)
backendServer := privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, false)
backendClient := direct.NewEthBackendClientDirect(backendServer)
backend := rpcservices.NewRemoteBackend(backendClient, m.DB, snapshotsync.NewBlockReader())
ff := rpchelper.New(ctx, backend, nil, nil, func() {})

View File

@ -292,7 +292,7 @@ func CreateTestGrpcConn(t *testing.T, m *stages.MockSentry) (context.Context, *g
ethashApi := apis[1].Service.(*ethash.API)
server := grpc.NewServer()
remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, false))
remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, false))
txpool.RegisterTxpoolServer(server, m.TxPoolGrpcServer)
txpool.RegisterMiningServer(server, privateapi.NewMiningServer(ctx, &IsMiningMock{}, ethashApi))
starknet.RegisterCAIROVMServer(server, &starknet.UnimplementedCAIROVMServer{})

View File

@ -293,7 +293,7 @@ func CreateTestGrpcConn(t *testing.T, m *stages.MockSentry) (context.Context, *g
ethashApi := apis[1].Service.(*ethash.API)
server := grpc.NewServer()
remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, false))
remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, false))
txpool.RegisterTxpoolServer(server, m.TxPoolGrpcServer)
txpool.RegisterMiningServer(server, privateapi.NewMiningServer(ctx, &IsMiningMock{}, ethashApi))
starknet.RegisterCAIROVMServer(server, &starknet.UnimplementedCAIROVMServer{})

View File

@ -411,8 +411,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
// Initialize ethbackend
ethBackendRPC := privateapi.NewEthBackendServer(ctx, backend, backend.chainDB, backend.notifications.Events,
blockReader, chainConfig, backend.sentriesClient.Hd.BeaconRequestList, backend.sentriesClient.Hd.PayloadStatusCh,
assembleBlockPOS, config.Miner.EnabledPOS)
blockReader, chainConfig, assembleBlockPOS, backend.sentriesClient.Hd, config.Miner.EnabledPOS)
miningRPC = privateapi.NewMiningServer(ctx, backend, ethashApi)
if stack.Config().PrivateApiAddr != "" {

View File

@ -180,7 +180,7 @@ func HeadersPOS(
cfg.hd.ClearPendingPayloadHash()
cfg.hd.SetPendingPayloadStatus(nil)
var payloadStatus *privateapi.PayloadStatus
var payloadStatus *engineapi.PayloadStatus
if forkChoiceInsteadOfNewPayload {
payloadStatus, err = startHandlingForkChoice(forkChoiceMessage, requestStatus, requestId, s, u, ctx, tx, cfg, headerInserter)
} else {
@ -190,7 +190,7 @@ func HeadersPOS(
if err != nil {
if requestStatus == engineapi.New {
cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: err}
cfg.hd.PayloadStatusCh <- engineapi.PayloadStatus{CriticalError: err}
}
return err
}
@ -257,7 +257,7 @@ func startHandlingForkChoice(
tx kv.RwTx,
cfg HeadersCfg,
headerInserter *headerdownload.HeaderInserter,
) (*privateapi.PayloadStatus, error) {
) (*engineapi.PayloadStatus, error) {
if cfg.memoryOverlay {
defer cfg.forkValidator.ClearWithUnwind(tx, cfg.notifications.Accumulator, cfg.notifications.StateChangesConsumer)
}
@ -274,27 +274,17 @@ func startHandlingForkChoice(
return nil, err
}
if canonical {
return &privateapi.PayloadStatus{
return &engineapi.PayloadStatus{
Status: remote.EngineStatus_VALID,
LatestValidHash: currentHeadHash,
}, nil
} else {
return &privateapi.PayloadStatus{
return &engineapi.PayloadStatus{
CriticalError: &privateapi.InvalidForkchoiceStateErr,
}, nil
}
}
bad, lastValidHash := cfg.hd.IsBadHeaderPoS(headerHash)
if bad {
log.Warn(fmt.Sprintf("[%s] Fork choice bad head block", s.LogPrefix()), "headerHash", headerHash)
cfg.hd.BeaconRequestList.Remove(requestId)
return &privateapi.PayloadStatus{
Status: remote.EngineStatus_INVALID,
LatestValidHash: lastValidHash,
}, nil
}
// Header itself may already be in the snapshots, if CL starts off at much earlier state than Erigon
header, err := cfg.blockReader.HeaderByHash(ctx, tx, headerHash)
if err != nil {
@ -307,33 +297,12 @@ func startHandlingForkChoice(
log.Info(fmt.Sprintf("[%s] Fork choice missing header with hash %x", s.LogPrefix(), headerHash))
cfg.hd.SetPoSDownloaderTip(headerHash)
schedulePoSDownload(requestId, headerHash, 0 /* header height is unknown, setting to 0 */, s, cfg)
return &privateapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}, nil
return &engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}, nil
}
cfg.hd.BeaconRequestList.Remove(requestId)
headerNumber := header.Number.Uint64()
// If header is canonical, then no reorgs are required
canonicalHash, err := rawdb.ReadCanonicalHash(tx, headerNumber)
if err != nil {
log.Warn(fmt.Sprintf("[%s] Fork choice err (reading canonical hash of %d)", s.LogPrefix(), headerNumber), "err", err)
cfg.hd.BeaconRequestList.Remove(requestId)
return nil, err
}
if headerHash == canonicalHash {
log.Info(fmt.Sprintf("[%s] Fork choice on previously known block", s.LogPrefix()))
cfg.hd.BeaconRequestList.Remove(requestId)
// Per the Engine API spec:
// Client software MAY skip an update of the forkchoice state and MUST NOT begin a payload build process
// if forkchoiceState.headBlockHash references an ancestor of the head of canonical chain.
// In the case of such an event, client software MUST return
// {payloadStatus: {status: VALID, latestValidHash: forkchoiceState.headBlockHash, validationError: null}, payloadId: null}.
return &privateapi.PayloadStatus{
Status: remote.EngineStatus_VALID,
LatestValidHash: headerHash,
}, nil
}
if cfg.memoryOverlay && headerHash == cfg.forkValidator.ExtendingForkHeadHash() {
log.Info("Flushing in-memory state")
@ -350,7 +319,7 @@ func startHandlingForkChoice(
cfg.hd.SetPendingPayloadHash(headerHash)
return nil, nil
} else {
return &privateapi.PayloadStatus{
return &engineapi.PayloadStatus{
CriticalError: &privateapi.InvalidForkchoiceStateErr,
}, nil
}
@ -369,7 +338,7 @@ func startHandlingForkChoice(
// TODO(yperbasis): what if some bodies are missing and we have to download them?
cfg.hd.SetPendingPayloadHash(headerHash)
} else {
cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}
cfg.hd.PayloadStatusCh <- engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}
}
}
@ -418,7 +387,7 @@ func finishHandlingForkChoice(
if !canonical {
if cfg.hd.GetPendingPayloadHash() != (common.Hash{}) {
cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{
cfg.hd.PayloadStatusCh <- engineapi.PayloadStatus{
CriticalError: &privateapi.InvalidForkchoiceStateErr,
}
}
@ -438,7 +407,7 @@ func handleNewPayload(
tx kv.RwTx,
cfg HeadersCfg,
headerInserter *headerdownload.HeaderInserter,
) (*privateapi.PayloadStatus, error) {
) (*engineapi.PayloadStatus, error) {
header := block.Header()
headerNumber := header.Number.Uint64()
headerHash := block.Hash()
@ -446,40 +415,6 @@ func handleNewPayload(
log.Debug(fmt.Sprintf("[%s] Handling new payload", s.LogPrefix()), "height", headerNumber, "hash", headerHash)
cfg.hd.UpdateTopSeenHeightPoS(headerNumber)
existingCanonicalHash, err := rawdb.ReadCanonicalHash(tx, headerNumber)
if err != nil {
log.Warn(fmt.Sprintf("[%s] New payload err", s.LogPrefix()), "err", err)
cfg.hd.BeaconRequestList.Remove(requestId)
return nil, err
}
if existingCanonicalHash != (common.Hash{}) && headerHash == existingCanonicalHash {
log.Info(fmt.Sprintf("[%s] New payload: previously received valid header %d", s.LogPrefix(), headerNumber))
cfg.hd.BeaconRequestList.Remove(requestId)
return &privateapi.PayloadStatus{
Status: remote.EngineStatus_VALID,
LatestValidHash: headerHash,
}, nil
}
bad, lastValidHash := cfg.hd.IsBadHeaderPoS(headerHash)
if bad {
log.Warn(fmt.Sprintf("[%s] Previously known bad block", s.LogPrefix()), "height", headerNumber, "hash", headerHash)
} else {
bad, lastValidHash = cfg.hd.IsBadHeaderPoS(header.ParentHash)
if bad {
log.Warn(fmt.Sprintf("[%s] Previously known bad parent", s.LogPrefix()), "height", headerNumber, "hash", headerHash, "parentHash", header.ParentHash)
}
}
if bad {
cfg.hd.BeaconRequestList.Remove(requestId)
cfg.hd.ReportBadHeaderPoS(headerHash, lastValidHash)
return &privateapi.PayloadStatus{
Status: remote.EngineStatus_INVALID,
LatestValidHash: lastValidHash,
}, nil
}
parent, err := cfg.blockReader.HeaderByHash(ctx, tx, header.ParentHash)
if err != nil {
return nil, err
@ -488,18 +423,7 @@ func handleNewPayload(
log.Info(fmt.Sprintf("[%s] New payload missing parent", s.LogPrefix()))
cfg.hd.SetPoSDownloaderTip(headerHash)
schedulePoSDownload(requestId, header.ParentHash, headerNumber-1, s, cfg)
return &privateapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}, nil
}
if headerNumber != parent.Number.Uint64()+1 {
log.Warn(fmt.Sprintf("[%s] Invalid block number", s.LogPrefix()), "headerNumber", headerNumber, "parentNumber", parent.Number.Uint64())
cfg.hd.BeaconRequestList.Remove(requestId)
cfg.hd.ReportBadHeaderPoS(headerHash, header.ParentHash)
return &privateapi.PayloadStatus{
Status: remote.EngineStatus_INVALID,
LatestValidHash: header.ParentHash,
ValidationError: errors.New("invalid block number"),
}, nil
return &engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}, nil
}
cfg.hd.BeaconRequestList.Remove(requestId)
@ -526,7 +450,7 @@ func verifyAndSaveNewPoSHeader(
cfg HeadersCfg,
block *types.Block,
headerInserter *headerdownload.HeaderInserter,
) (response *privateapi.PayloadStatus, success bool, err error) {
) (response *engineapi.PayloadStatus, success bool, err error) {
header := block.Header()
headerNumber := header.Number.Uint64()
headerHash := block.Hash()
@ -534,7 +458,7 @@ func verifyAndSaveNewPoSHeader(
if verificationErr := cfg.hd.VerifyHeader(header); verificationErr != nil {
log.Warn("Verification failed for header", "hash", headerHash, "height", headerNumber, "err", verificationErr)
cfg.hd.ReportBadHeaderPoS(headerHash, header.ParentHash)
return &privateapi.PayloadStatus{
return &engineapi.PayloadStatus{
Status: remote.EngineStatus_INVALID,
LatestValidHash: header.ParentHash,
ValidationError: verificationErr,
@ -565,7 +489,7 @@ func verifyAndSaveNewPoSHeader(
} else if err := headerInserter.FeedHeaderPoS(tx, header, headerHash); err != nil {
return nil, false, err
}
return &privateapi.PayloadStatus{
return &engineapi.PayloadStatus{
Status: status,
LatestValidHash: latestValidHash,
ValidationError: validationError,
@ -578,7 +502,7 @@ func verifyAndSaveNewPoSHeader(
if !canExtendCanonical {
log.Info("Side chain", "parentHash", header.ParentHash, "currentHead", currentHeadHash)
return &privateapi.PayloadStatus{Status: remote.EngineStatus_ACCEPTED}, true, nil
return &engineapi.PayloadStatus{Status: remote.EngineStatus_ACCEPTED}, true, nil
}
// OK, we're on the canonical chain
@ -725,7 +649,7 @@ func forkingPoint(
func handleInterrupt(interrupt engineapi.Interrupt, cfg HeadersCfg, tx kv.RwTx, headerInserter *headerdownload.HeaderInserter, useExternalTx bool) (bool, error) {
if interrupt != engineapi.None {
if interrupt == engineapi.Stopping {
cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: errors.New("server is stopping")}
cfg.hd.PayloadStatusCh <- engineapi.PayloadStatus{CriticalError: errors.New("server is stopping")}
}
if interrupt == engineapi.Synced {
verifyAndSaveDownloadedPoSHeaders(tx, cfg, headerInserter)

View File

@ -13,6 +13,7 @@ import (
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/engineapi"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
"github.com/stretchr/testify/require"
)
@ -89,11 +90,10 @@ func TestMockDownloadRequest(t *testing.T) {
require := require.New(t)
makeTestDb(ctx, db)
beaconRequestList := engineapi.NewRequestList()
statusCh := make(chan PayloadStatus)
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil)
events := NewEvents()
backend := NewEthBackendServer(ctx, nil, db, events, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, beaconRequestList, statusCh, nil, false)
backend := NewEthBackendServer(ctx, nil, db, events, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, nil, hd, false)
var err error
var reply *remote.EnginePayloadStatus
@ -104,8 +104,8 @@ func TestMockDownloadRequest(t *testing.T) {
done <- true
}()
beaconRequestList.WaitForRequest(true)
statusCh <- PayloadStatus{Status: remote.EngineStatus_SYNCING}
hd.BeaconRequestList.WaitForRequest(true)
hd.PayloadStatusCh <- engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}
<-done
require.NoError(err)
require.Equal(reply.Status, remote.EngineStatus_SYNCING)
@ -148,11 +148,10 @@ func TestMockValidExecution(t *testing.T) {
makeTestDb(ctx, db)
beaconRequestList := engineapi.NewRequestList()
statusCh := make(chan PayloadStatus)
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil)
events := NewEvents()
backend := NewEthBackendServer(ctx, nil, db, events, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, beaconRequestList, statusCh, nil, false)
backend := NewEthBackendServer(ctx, nil, db, events, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, nil, hd, false)
var err error
var reply *remote.EnginePayloadStatus
@ -163,9 +162,9 @@ func TestMockValidExecution(t *testing.T) {
done <- true
}()
beaconRequestList.WaitForRequest(true)
hd.BeaconRequestList.WaitForRequest(true)
statusCh <- PayloadStatus{
hd.PayloadStatusCh <- engineapi.PayloadStatus{
Status: remote.EngineStatus_VALID,
LatestValidHash: payload3Hash,
}
@ -184,11 +183,10 @@ func TestMockInvalidExecution(t *testing.T) {
makeTestDb(ctx, db)
beaconRequestList := engineapi.NewRequestList()
statusCh := make(chan PayloadStatus)
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil)
events := NewEvents()
backend := NewEthBackendServer(ctx, nil, db, events, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, beaconRequestList, statusCh, nil, false)
backend := NewEthBackendServer(ctx, nil, db, events, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, nil, hd, false)
var err error
var reply *remote.EnginePayloadStatus
@ -199,9 +197,9 @@ func TestMockInvalidExecution(t *testing.T) {
done <- true
}()
beaconRequestList.WaitForRequest(true)
hd.BeaconRequestList.WaitForRequest(true)
// Simulate invalid status
statusCh <- PayloadStatus{
hd.PayloadStatusCh <- engineapi.PayloadStatus{
Status: remote.EngineStatus_INVALID,
LatestValidHash: startingHeadHash,
}
@ -220,11 +218,10 @@ func TestNoTTD(t *testing.T) {
makeTestDb(ctx, db)
beaconRequestList := engineapi.NewRequestList()
statusCh := make(chan PayloadStatus)
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil)
events := NewEvents()
backend := NewEthBackendServer(ctx, nil, db, events, nil, &params.ChainConfig{}, beaconRequestList, statusCh, nil, false)
backend := NewEthBackendServer(ctx, nil, db, events, nil, &params.ChainConfig{}, nil, hd, false)
var err error

View File

@ -24,6 +24,7 @@ import (
"github.com/ledgerwatch/erigon/turbo/builder"
"github.com/ledgerwatch/erigon/turbo/engineapi"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
"github.com/ledgerwatch/log/v3"
"golang.org/x/exp/slices"
"google.golang.org/protobuf/types/known/emptypb"
@ -55,14 +56,12 @@ type EthBackendServer struct {
// Block proposing for proof-of-stake
payloadId uint64
builders map[uint64]*builder.BlockBuilder
// Send Beacon Chain requests to staged sync
requestList *engineapi.RequestList
// Replies to newPayload & forkchoice requests
statusCh <-chan PayloadStatus
builderFunc builder.BlockBuilderFunc
proposing bool
lock sync.Mutex // Engine API is asynchronous, we want to avoid CL to call different APIs at the same time
logsFilter *LogsFilterAggregator
hd *headerdownload.HeaderDownload
}
type EthBackend interface {
@ -73,23 +72,12 @@ type EthBackend interface {
Peers(ctx context.Context) (*remote.PeersReply, error)
}
// This is the status of a newly execute block.
// Hash: Block hash
// Status: block's status
type PayloadStatus struct {
Status remote.EngineStatus
LatestValidHash common.Hash
ValidationError error
CriticalError error
}
func NewEthBackendServer(ctx context.Context, eth EthBackend, db kv.RwDB, events *Events, blockReader services.BlockAndTxnReader,
config *params.ChainConfig, requestList *engineapi.RequestList, statusCh <-chan PayloadStatus,
builderFunc builder.BlockBuilderFunc, proposing bool,
config *params.ChainConfig, builderFunc builder.BlockBuilderFunc, hd *headerdownload.HeaderDownload, proposing bool,
) *EthBackendServer {
s := &EthBackendServer{ctx: ctx, eth: eth, events: events, db: db, blockReader: blockReader, config: config,
requestList: requestList, statusCh: statusCh, builders: make(map[uint64]*builder.BlockBuilder),
builderFunc: builderFunc, proposing: proposing, logsFilter: NewLogsFilterAggregator(events),
builders: make(map[uint64]*builder.BlockBuilder),
builderFunc: builderFunc, proposing: proposing, logsFilter: NewLogsFilterAggregator(events), hd: hd,
}
ch, clean := s.events.AddLogsSubscription()
@ -244,7 +232,7 @@ func (s *EthBackendServer) Block(ctx context.Context, req *remote.BlockRequest)
return &remote.BlockReply{BlockRlp: blockRlp, Senders: sendersBytes}, nil
}
func convertPayloadStatus(payloadStatus *PayloadStatus) *remote.EnginePayloadStatus {
func convertPayloadStatus(payloadStatus *engineapi.PayloadStatus) *remote.EnginePayloadStatus {
reply := remote.EnginePayloadStatus{Status: payloadStatus.Status}
if payloadStatus.LatestValidHash != (common.Hash{}) {
reply.LatestValidHash = gointerfaces.ConvertHashToH256(payloadStatus.LatestValidHash)
@ -257,7 +245,7 @@ func convertPayloadStatus(payloadStatus *PayloadStatus) *remote.EnginePayloadSta
func (s *EthBackendServer) stageLoopIsBusy() bool {
for i := 0; i < 20; i++ {
if !s.requestList.IsWaiting() {
if !s.hd.BeaconRequestList.IsWaiting() {
// This might happen, for example, in the following scenario:
// 1) CL sends NewPayload and immediately after that ForkChoiceUpdated.
// 2) We happily process NewPayload and stage loop is at the end.
@ -269,7 +257,7 @@ func (s *EthBackendServer) stageLoopIsBusy() bool {
time.Sleep(5 * time.Millisecond)
}
}
return !s.requestList.IsWaiting()
return !s.hd.BeaconRequestList.IsWaiting()
}
// EngineNewPayloadV1 validates and possibly executes payload
@ -344,12 +332,21 @@ func (s *EthBackendServer) EngineNewPayloadV1(ctx context.Context, req *types2.E
if err != nil {
return nil, err
}
tx.Rollback()
if parentTd != nil && parentTd.Cmp(s.config.TerminalTotalDifficulty) < 0 {
log.Warn("[NewPayload] TTD not reached yet", "height", header.Number, "hash", common.Hash(blockHash))
return &remote.EnginePayloadStatus{Status: remote.EngineStatus_INVALID, LatestValidHash: gointerfaces.ConvertHashToH256(common.Hash{})}, nil
}
tx.Rollback()
possibleStatus, err := s.getPayloadStatusFromHashIfPossible(blockHash, req.BlockNumber, header.ParentHash, true)
if err != nil {
return nil, err
}
if possibleStatus != nil {
return convertPayloadStatus(possibleStatus), nil
}
// If another payload is already commissioned then we just reply with syncing
if s.stageLoopIsBusy() {
// We are still syncing a commissioned payload
@ -360,17 +357,13 @@ func (s *EthBackendServer) EngineNewPayloadV1(ctx context.Context, req *types2.E
log.Debug("[NewPayload] stage loop is busy")
return &remote.EnginePayloadStatus{Status: remote.EngineStatus_SYNCING}, nil
}
// Lock the thread (We modify shared resources).
log.Debug("[NewPayload] acquiring lock")
s.lock.Lock()
defer s.lock.Unlock()
log.Debug("[NewPayload] lock acquired")
log.Debug("[NewPayload] sending block", "height", header.Number, "hash", common.Hash(blockHash))
s.requestList.AddPayloadRequest(block)
s.hd.BeaconRequestList.AddPayloadRequest(block)
payloadStatus := <-s.statusCh
payloadStatus := <-s.hd.PayloadStatusCh
log.Debug("[NewPayload] got reply", "payloadStatus", payloadStatus)
if payloadStatus.CriticalError != nil {
@ -380,6 +373,100 @@ func (s *EthBackendServer) EngineNewPayloadV1(ctx context.Context, req *types2.E
return convertPayloadStatus(&payloadStatus), nil
}
// Check if we can make out a status from the payload hash/head hash.
func (s *EthBackendServer) getPayloadStatusFromHashIfPossible(blockHash common.Hash, blockNumber uint64, parentHash common.Hash, newPayload bool) (*engineapi.PayloadStatus, error) {
if s.hd == nil {
return nil, nil
}
var prefix string
if newPayload {
prefix = "NewPayload"
} else {
prefix = "ForkChoiceUpdated"
}
tx, err := s.db.BeginRo(s.ctx)
if err != nil {
return nil, err
}
defer tx.Rollback()
header, err := rawdb.ReadHeaderByHash(tx, blockHash)
if err != nil {
return nil, err
}
var parent *types.Header
if newPayload {
parent, err = rawdb.ReadHeaderByHash(tx, parentHash)
}
if err != nil {
return nil, err
}
var canonicalHash common.Hash
if header != nil {
canonicalHash, err = rawdb.ReadCanonicalHash(tx, header.Number.Uint64())
}
if err != nil {
return nil, err
}
if newPayload && parent != nil && blockNumber != parent.Number.Uint64()+1 {
log.Warn(fmt.Sprintf("[%s] Invalid block number", prefix), "headerNumber", blockNumber, "parentNumber", parent.Number.Uint64())
s.hd.ReportBadHeaderPoS(blockHash, parent.Hash())
return &engineapi.PayloadStatus{
Status: remote.EngineStatus_INVALID,
LatestValidHash: parent.Hash(),
ValidationError: errors.New("invalid block number"),
}, nil
}
// Check if we already determined if the hash is attributed to a previously received invalid header.
bad, lastValidHash := s.hd.IsBadHeaderPoS(blockHash)
if bad {
log.Warn(fmt.Sprintf("[%s] Previously known bad block", prefix), "hash", blockHash)
} else if newPayload {
bad, lastValidHash = s.hd.IsBadHeaderPoS(parentHash)
if bad {
log.Warn(fmt.Sprintf("[%s] Previously known bad block", prefix), "hash", blockHash, "parentHash", parentHash)
}
}
if bad {
s.hd.ReportBadHeaderPoS(blockHash, lastValidHash)
return &engineapi.PayloadStatus{Status: remote.EngineStatus_INVALID, LatestValidHash: lastValidHash}, nil
}
// If header is already validated or has a missing parent, you can either return VALID or SYNCING.
if newPayload {
if header != nil && canonicalHash == blockHash {
return &engineapi.PayloadStatus{Status: remote.EngineStatus_VALID, LatestValidHash: blockHash}, nil
}
if parent == nil && s.hd.PosStatus() == headerdownload.Syncing {
return &engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}, nil
}
return nil, nil
}
if header == nil {
if s.hd.PosStatus() == headerdownload.Syncing {
return &engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}, nil
}
return nil, nil
}
headHash := rawdb.ReadHeadBlockHash(tx)
if err != nil {
return nil, err
}
if blockHash != headHash && canonicalHash == blockHash {
return &engineapi.PayloadStatus{Status: remote.EngineStatus_VALID, LatestValidHash: blockHash}, nil
}
return nil, nil
}
// EngineGetPayloadV1 retrieves previously assembled payload (Validators only)
func (s *EthBackendServer) EngineGetPayloadV1(ctx context.Context, req *remote.EngineGetPayloadRequest) (*types2.ExecutionPayload, error) {
if !s.proposing {
@ -451,6 +538,7 @@ func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *r
return nil, err
}
defer tx1.Rollback()
td, err := rawdb.ReadTdByHash(tx1, forkChoice.HeadBlockHash)
tx1.Rollback()
if err != nil {
@ -463,31 +551,38 @@ func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *r
}, nil
}
if s.stageLoopIsBusy() {
log.Debug("[ForkChoiceUpdated] stage loop is busy")
return &remote.EngineForkChoiceUpdatedReply{
PayloadStatus: &remote.EnginePayloadStatus{Status: remote.EngineStatus_SYNCING},
}, nil
status, err := s.getPayloadStatusFromHashIfPossible(forkChoice.HeadBlockHash, 0, common.Hash{}, false)
if err != nil {
return nil, err
}
if status == nil {
if s.stageLoopIsBusy() {
log.Debug("[ForkChoiceUpdated] stage loop is busy")
return &remote.EngineForkChoiceUpdatedReply{
PayloadStatus: &remote.EnginePayloadStatus{Status: remote.EngineStatus_SYNCING},
}, nil
}
s.lock.Lock()
defer s.lock.Unlock()
log.Debug("[ForkChoiceUpdated] acquiring lock")
s.lock.Lock()
defer s.lock.Unlock()
log.Debug("[ForkChoiceUpdated] lock acquired")
log.Debug("[ForkChoiceUpdated] sending forkChoiceMessage", "head", forkChoice.HeadBlockHash)
s.hd.BeaconRequestList.AddForkChoiceRequest(&forkChoice)
log.Debug("[ForkChoiceUpdated] sending forkChoiceMessage", "head", forkChoice.HeadBlockHash)
s.requestList.AddForkChoiceRequest(&forkChoice)
statusRef := <-s.hd.PayloadStatusCh
status = &statusRef
log.Debug("[ForkChoiceUpdated] got reply", "payloadStatus", status)
status := <-s.statusCh
log.Debug("[ForkChoiceUpdated] got reply", "payloadStatus", status)
if status.CriticalError != nil {
return nil, status.CriticalError
if status.CriticalError != nil {
return nil, status.CriticalError
}
} else {
s.lock.Lock()
defer s.lock.Unlock()
}
// No need for payload building
if req.PayloadAttributes == nil || status.Status != remote.EngineStatus_VALID {
return &remote.EngineForkChoiceUpdatedReply{PayloadStatus: convertPayloadStatus(&status)}, nil
return &remote.EngineForkChoiceUpdatedReply{PayloadStatus: convertPayloadStatus(status)}, nil
}
if !s.proposing {
@ -514,7 +609,7 @@ func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *r
log.Warn("Skipping payload building because forkchoiceState.headBlockHash is not the head of the canonical chain",
"forkChoice.HeadBlockHash", forkChoice.HeadBlockHash, "headHeader.Hash", headHeader.Hash())
return &remote.EngineForkChoiceUpdatedReply{PayloadStatus: convertPayloadStatus(&status)}, nil
return &remote.EngineForkChoiceUpdatedReply{PayloadStatus: convertPayloadStatus(status)}, nil
}
if headHeader.Time >= req.PayloadAttributes.Timestamp {

View File

@ -6,10 +6,21 @@ import (
"github.com/emirpasic/gods/maps/treemap"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/core/types"
)
// This is the status of a newly execute block.
// Hash: Block hash
// Status: block's status
type PayloadStatus struct {
Status remote.EngineStatus
LatestValidHash common.Hash
ValidationError error
CriticalError error
}
// The message we are going to send to the stage sync in ForkchoiceUpdated
type ForkChoiceMessage struct {
HeadBlockHash common.Hash

View File

@ -26,7 +26,6 @@ import (
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/ethdb/privateapi"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/engineapi"
@ -1134,13 +1133,13 @@ func (hd *HeaderDownload) ClearPendingPayloadHash() {
hd.pendingPayloadHash = common.Hash{}
}
func (hd *HeaderDownload) GetPendingPayloadStatus() *privateapi.PayloadStatus {
func (hd *HeaderDownload) GetPendingPayloadStatus() *engineapi.PayloadStatus {
hd.lock.RLock()
defer hd.lock.RUnlock()
return hd.pendingPayloadStatus
}
func (hd *HeaderDownload) SetPendingPayloadStatus(response *privateapi.PayloadStatus) {
func (hd *HeaderDownload) SetPendingPayloadStatus(response *engineapi.PayloadStatus) {
hd.lock.Lock()
defer hd.lock.Unlock()
hd.pendingPayloadStatus = response

View File

@ -12,7 +12,6 @@ import (
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/ethdb/privateapi"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/engineapi"
"github.com/ledgerwatch/erigon/turbo/services"
@ -305,16 +304,16 @@ type HeaderDownload struct {
requestId int
posAnchor *Anchor
posStatus SyncStatus
posSync bool // Whether the chain is syncing in the PoS mode
headersCollector *etl.Collector // ETL collector for headers
BeaconRequestList *engineapi.RequestList // Requests from ethbackend to staged sync
PayloadStatusCh chan privateapi.PayloadStatus // Responses (validation/execution status)
pendingPayloadHash common.Hash // Header whose status we still should send to PayloadStatusCh
pendingPayloadStatus *privateapi.PayloadStatus // Alternatively, there can be an already prepared response to send to PayloadStatusCh
unsettledForkChoice *engineapi.ForkChoiceMessage // Forkchoice to process after unwind
unsettledHeadHeight uint64 // Height of unsettledForkChoice.headBlockHash
posDownloaderTip common.Hash // See https://hackmd.io/GDc0maGsQeKfP8o2C7L52w
badPoSHeaders map[common.Hash]common.Hash // Invalid Tip -> Last Valid Ancestor
posSync bool // Whether the chain is syncing in the PoS mode
headersCollector *etl.Collector // ETL collector for headers
BeaconRequestList *engineapi.RequestList // Requests from ethbackend to staged sync
PayloadStatusCh chan engineapi.PayloadStatus // Responses (validation/execution status)
pendingPayloadHash common.Hash // Header whose status we still should send to PayloadStatusCh
pendingPayloadStatus *engineapi.PayloadStatus // Alternatively, there can be an already prepared response to send to PayloadStatusCh
unsettledForkChoice *engineapi.ForkChoiceMessage // Forkchoice to process after unwind
unsettledHeadHeight uint64 // Height of unsettledForkChoice.headBlockHash
posDownloaderTip common.Hash // See https://hackmd.io/GDc0maGsQeKfP8o2C7L52w
badPoSHeaders map[common.Hash]common.Hash // Invalid Tip -> Last Valid Ancestor
}
// HeaderRecord encapsulates two forms of the same header - raw RLP encoding (to avoid duplicated decodings and encodings), and parsed value types.Header
@ -343,7 +342,7 @@ func NewHeaderDownload(
DeliveryNotify: make(chan struct{}, 1),
QuitPoWMining: make(chan struct{}),
BeaconRequestList: engineapi.NewRequestList(),
PayloadStatusCh: make(chan privateapi.PayloadStatus, 1),
PayloadStatusCh: make(chan engineapi.PayloadStatus, 1),
headerReader: headerReader,
badPoSHeaders: make(map[common.Hash]common.Hash),
}

View File

@ -530,7 +530,7 @@ func (ms *MockSentry) SendForkChoiceRequest(message *engineapi.ForkChoiceMessage
ms.sentriesClient.Hd.BeaconRequestList.AddForkChoiceRequest(message)
}
func (ms *MockSentry) ReceivePayloadStatus() privateapi.PayloadStatus {
func (ms *MockSentry) ReceivePayloadStatus() engineapi.PayloadStatus {
return <-ms.sentriesClient.Hd.PayloadStatusCh
}

View File

@ -669,11 +669,10 @@ func TestPoSSyncWithInvalidHeader(t *testing.T) {
FinalizedBlockHash: invalidTip.Hash(),
}
m.SendForkChoiceRequest(&forkChoiceMessage)
headBlockHash, err = stages.StageLoopStep(m.Ctx, m.DB, m.Sync, 0, m.Notifications, false, m.UpdateHead, nil)
_, err = stages.StageLoopStep(m.Ctx, m.DB, m.Sync, 0, m.Notifications, false, m.UpdateHead, nil)
require.NoError(t, err)
stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err)
payloadStatus2 := m.ReceivePayloadStatus()
require.Equal(t, remote.EngineStatus_INVALID, payloadStatus2.Status)
assert.Equal(t, lastValidHeader.Hash(), payloadStatus2.LatestValidHash)
bad, lastValidHash := m.HeaderDownload().IsBadHeaderPoS(invalidTip.Hash())
assert.True(t, bad)
assert.Equal(t, lastValidHash, lastValidHeader.Hash())
}

View File

@ -23,7 +23,6 @@ import (
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/ethdb/privateapi"
"github.com/ledgerwatch/erigon/p2p"
"github.com/ledgerwatch/erigon/turbo/engineapi"
"github.com/ledgerwatch/erigon/turbo/services"
@ -35,13 +34,13 @@ import (
func SendPayloadStatus(hd *headerdownload.HeaderDownload, headBlockHash common.Hash, err error) {
if pendingPayloadStatus := hd.GetPendingPayloadStatus(); pendingPayloadStatus != nil {
if err != nil {
hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: err}
hd.PayloadStatusCh <- engineapi.PayloadStatus{CriticalError: err}
} else {
hd.PayloadStatusCh <- *pendingPayloadStatus
}
} else if pendingPayloadHash := hd.GetPendingPayloadHash(); pendingPayloadHash != (common.Hash{}) {
if err != nil {
hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: err}
hd.PayloadStatusCh <- engineapi.PayloadStatus{CriticalError: err}
} else {
var status remote.EngineStatus
if headBlockHash == pendingPayloadHash {
@ -50,7 +49,7 @@ func SendPayloadStatus(hd *headerdownload.HeaderDownload, headBlockHash common.H
log.Warn("Failed to execute pending payload", "pendingPayload", pendingPayloadHash, "headBlock", headBlockHash)
status = remote.EngineStatus_INVALID
}
hd.PayloadStatusCh <- privateapi.PayloadStatus{
hd.PayloadStatusCh <- engineapi.PayloadStatus{
Status: status,
LatestValidHash: headBlockHash,
}