Update stage_finish.go : notifications to rpc daemon (#2755)

This commit is contained in:
Andrea Lanfranchi 2021-10-04 02:30:42 +02:00 committed by GitHub
parent ca3dda2a5f
commit f70dd63a30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 43 additions and 24 deletions

View File

@ -1,14 +1,18 @@
package stagedsync
import (
"bytes"
"context"
"fmt"
"encoding/binary"
"reflect"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/rlp"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/log/v3"
)
@ -124,29 +128,46 @@ func PruneFinish(u *PruneState, tx kv.RwTx, cfg FinishCfg, ctx context.Context)
return nil
}
func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync uint64, unwindTo *uint64, notifier ChainEventNotifier, tx kv.Tx) error {
notifyTo, err := stages.GetStageProgress(tx, stages.Finish) // because later stages can be disabled
if err != nil {
func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync uint64, finishStageAfterSync uint64, unwindTo *uint64, notifier ChainEventNotifier, tx kv.Tx) error {
if notifier == nil {
log.Trace("RPC Daemon notification channel not set. No headers notifications will be sent")
return nil
}
// Notify all headers we have (either canonical or not) in a maximum range span of 1024
var notifyFrom uint64
if unwindTo != nil && *unwindTo != 0 && (*unwindTo) < finishStageBeforeSync {
notifyFrom = *unwindTo
} else {
heightSpan := finishStageAfterSync - finishStageBeforeSync
if heightSpan > 1024 {
heightSpan = 1024
}
notifyFrom = finishStageAfterSync - heightSpan
}
startKey := make([]byte, reflect.TypeOf(notifyFrom).Size()+32)
var notifyTo uint64
binary.BigEndian.PutUint64(startKey, notifyFrom)
if err := tx.ForEach(kv.Headers, startKey, func(k, headerRLP []byte) error {
if len(headerRLP) == 0 {
return nil
}
header := new(types.Header)
if err := rlp.Decode(bytes.NewReader(headerRLP), header); err != nil {
log.Error("Invalid block header RLP", "err", err)
return err
}
notifyFrom := finishStageBeforeSync
if unwindTo != nil && *unwindTo != 0 && (*unwindTo) < finishStageBeforeSync {
notifyFrom = *unwindTo + 1
}
if notifier == nil {
log.Warn("rpc notifier is not set, rpc daemon won't be updated about headers")
return nil
}
for i := notifyFrom; i <= notifyTo; i++ {
header := rawdb.ReadHeaderByNumber(tx, i)
if header == nil {
return fmt.Errorf("could not find canonical header for number: %d", i)
}
notifyTo = header.Number.Uint64()
notifier.OnNewHeader(header)
return libcommon.Stopped(ctx.Done())
}); err != nil {
log.Error("RPC Daemon notification failed", "error", err)
return err
}
log.Info("Updated current block for the RPC API", "from", notifyFrom, "to", notifyTo)
log.Info("RPC Daemon notified of new headers", "from", notifyFrom, "to", notifyTo)
return nil
}

View File

@ -172,20 +172,18 @@ func StageLoopStep(
}
updateHead(ctx, head, headHash, headTd256)
if notifications.Accumulator != nil {
if notifications != nil && notifications.Accumulator != nil {
if err := db.View(ctx, func(tx kv.Tx) error {
header := rawdb.ReadCurrentHeader(tx)
if header == nil {
return nil
}
pendingBaseFee := misc.CalcBaseFee(notifications.Accumulator.ChainConfig(), header)
notifications.Accumulator.SendAndReset(ctx, notifications.StateChangesConsumer, pendingBaseFee.Uint64())
err = stagedsync.NotifyNewHeaders(ctx, finishProgressBefore, sync.PrevUnwindPoint(), notifications.Events, tx)
if err != nil {
return err
}
return nil
return stagedsync.NotifyNewHeaders(ctx, finishProgressBefore, head, sync.PrevUnwindPoint(), notifications.Events, tx)
}); err != nil {
return err
}