mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-06 19:12:19 +00:00
79ed8cad35
This change introduces additional processes to manage snapshot uploading for E2 snapshots: ## erigon snapshots upload The `snapshots uploader` command starts a version of erigon customized for uploading snapshot files to a remote location. It breaks the stage execution process after the senders stage and then uses the snapshot stage to send uploaded headers, bodies and (in the case of polygon) bor spans and events to snapshot files. Because this process avoids execution in run signifigantly faster than a standard erigon configuration. The uploader uses rclone to send seedable (100K or 500K blocks) to a remote storage location specified in the rclone config file. The **uploader** is configured to minimize disk usage by doing the following: * It removes snapshots once they are loaded * It aggressively prunes the database once entities are transferred to snapshots in addition to this it has the following performance related features: * maximizes the workers allocated to snapshot processing to improve throughput * Can be started from scratch by downloading the latest snapshots from the remote location to seed processing ## snapshots command Is a stand alone command for managing remote snapshots it has the following sub commands * **cmp** - compare snapshots * **copy** - copy snapshots * **verify** - verify snapshots * **manifest** - manage the manifest file in the root of remote snapshot locations * **torrent** - manage snapshot torrent files
371 lines
12 KiB
Go
371 lines
12 KiB
Go
package eth1
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
libcommon "github.com/ledgerwatch/erigon-lib/common"
|
|
"github.com/ledgerwatch/erigon-lib/gointerfaces"
|
|
"github.com/ledgerwatch/erigon-lib/gointerfaces/execution"
|
|
"github.com/ledgerwatch/erigon-lib/kv"
|
|
"github.com/ledgerwatch/erigon-lib/kv/rawdbv3"
|
|
"github.com/ledgerwatch/erigon/core/rawdb"
|
|
"github.com/ledgerwatch/erigon/eth/stagedsync"
|
|
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
|
|
)
|
|
|
|
type forkchoiceOutcome struct {
|
|
receipt *execution.ForkChoiceReceipt
|
|
err error
|
|
}
|
|
|
|
func sendForkchoiceReceiptWithoutWaiting(ch chan forkchoiceOutcome, receipt *execution.ForkChoiceReceipt) {
|
|
select {
|
|
case ch <- forkchoiceOutcome{receipt: receipt}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func sendForkchoiceErrorWithoutWaiting(ch chan forkchoiceOutcome, err error) {
|
|
select {
|
|
case ch <- forkchoiceOutcome{err: err}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// verifyForkchoiceHashes verifies the finalized and safe hash of the forkchoice state
|
|
func (e *EthereumExecutionModule) verifyForkchoiceHashes(ctx context.Context, tx kv.Tx, blockHash, finalizedHash, safeHash libcommon.Hash) (bool, error) {
|
|
// Client software MUST return -38002: Invalid forkchoice state error if the payload referenced by
|
|
// forkchoiceState.headBlockHash is VALID and a payload referenced by either forkchoiceState.finalizedBlockHash or
|
|
// forkchoiceState.safeBlockHash does not belong to the chain defined by forkchoiceState.headBlockHash
|
|
headNumber := rawdb.ReadHeaderNumber(tx, blockHash)
|
|
finalizedNumber := rawdb.ReadHeaderNumber(tx, finalizedHash)
|
|
safeNumber := rawdb.ReadHeaderNumber(tx, safeHash)
|
|
|
|
if finalizedHash != (libcommon.Hash{}) && finalizedHash != blockHash {
|
|
canonical, err := e.isCanonicalHash(ctx, tx, finalizedHash)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if !canonical || *headNumber <= *finalizedNumber {
|
|
return false, nil
|
|
}
|
|
|
|
}
|
|
if safeHash != (libcommon.Hash{}) && safeHash != blockHash {
|
|
canonical, err := e.isCanonicalHash(ctx, tx, safeHash)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if !canonical || *headNumber <= *safeNumber {
|
|
return false, nil
|
|
}
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func (e *EthereumExecutionModule) UpdateForkChoice(ctx context.Context, req *execution.ForkChoice) (*execution.ForkChoiceReceipt, error) {
|
|
blockHash := gointerfaces.ConvertH256ToHash(req.HeadBlockHash)
|
|
safeHash := gointerfaces.ConvertH256ToHash(req.SafeBlockHash)
|
|
finalizedHash := gointerfaces.ConvertH256ToHash(req.FinalizedBlockHash)
|
|
|
|
outcomeCh := make(chan forkchoiceOutcome, 1)
|
|
|
|
// So we wait at most the amount specified by req.Timeout before just sending out
|
|
go e.updateForkChoice(ctx, blockHash, safeHash, finalizedHash, outcomeCh)
|
|
fcuTimer := time.NewTimer(time.Duration(req.Timeout) * time.Millisecond)
|
|
|
|
select {
|
|
case <-fcuTimer.C:
|
|
e.logger.Debug("treating forkChoiceUpdated as asynchronous as it is taking too long")
|
|
return &execution.ForkChoiceReceipt{
|
|
LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}),
|
|
Status: execution.ExecutionStatus_Busy,
|
|
}, nil
|
|
case outcome := <-outcomeCh:
|
|
return outcome.receipt, outcome.err
|
|
}
|
|
|
|
}
|
|
|
|
func writeForkChoiceHashes(tx kv.RwTx, blockHash, safeHash, finalizedHash libcommon.Hash) {
|
|
if finalizedHash != (libcommon.Hash{}) {
|
|
rawdb.WriteForkchoiceFinalized(tx, finalizedHash)
|
|
}
|
|
if safeHash != (libcommon.Hash{}) {
|
|
rawdb.WriteForkchoiceSafe(tx, safeHash)
|
|
}
|
|
rawdb.WriteHeadBlockHash(tx, blockHash)
|
|
rawdb.WriteForkchoiceHead(tx, blockHash)
|
|
}
|
|
|
|
func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, blockHash, safeHash, finalizedHash libcommon.Hash, outcomeCh chan forkchoiceOutcome) {
|
|
if !e.semaphore.TryAcquire(1) {
|
|
sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{
|
|
LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}),
|
|
Status: execution.ExecutionStatus_Busy,
|
|
})
|
|
return
|
|
}
|
|
defer e.semaphore.Release(1)
|
|
|
|
type canonicalEntry struct {
|
|
hash libcommon.Hash
|
|
number uint64
|
|
}
|
|
tx, err := e.db.BeginRwNosync(ctx)
|
|
if err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
defer e.forkValidator.ClearWithUnwind(e.accumulator, e.stateChangeConsumer)
|
|
// Step one, find reconnection point, and mark all of those headers as canonical.
|
|
fcuHeader, err := e.blockReader.HeaderByHash(ctx, tx, blockHash)
|
|
if err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
if fcuHeader == nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, fmt.Errorf("forkchoice: block %x not found or was marked invalid", blockHash))
|
|
return
|
|
}
|
|
canonicalHash, err := e.blockReader.CanonicalHash(ctx, tx, fcuHeader.Number.Uint64())
|
|
if err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
|
|
if canonicalHash == blockHash {
|
|
// if block hash is part of the canonical chain treat it as no-op.
|
|
writeForkChoiceHashes(tx, blockHash, safeHash, finalizedHash)
|
|
valid, err := e.verifyForkchoiceHashes(ctx, tx, blockHash, finalizedHash, safeHash)
|
|
if err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
if !valid {
|
|
sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{
|
|
LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}),
|
|
Status: execution.ExecutionStatus_InvalidForkchoice,
|
|
})
|
|
return
|
|
}
|
|
sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{
|
|
LatestValidHash: gointerfaces.ConvertHashToH256(blockHash),
|
|
Status: execution.ExecutionStatus_Success,
|
|
})
|
|
return
|
|
}
|
|
|
|
// If we don't have it, too bad
|
|
if fcuHeader == nil {
|
|
sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{
|
|
LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}),
|
|
Status: execution.ExecutionStatus_MissingSegment,
|
|
})
|
|
return
|
|
}
|
|
currentParentHash := fcuHeader.ParentHash
|
|
currentParentNumber := fcuHeader.Number.Uint64() - 1
|
|
isCanonicalHash, err := rawdb.IsCanonicalHash(tx, currentParentHash, currentParentNumber)
|
|
if err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
// Find such point, and collect all hashes
|
|
newCanonicals := make([]*canonicalEntry, 0, 64)
|
|
newCanonicals = append(newCanonicals, &canonicalEntry{
|
|
hash: fcuHeader.Hash(),
|
|
number: fcuHeader.Number.Uint64(),
|
|
})
|
|
for !isCanonicalHash {
|
|
newCanonicals = append(newCanonicals, &canonicalEntry{
|
|
hash: currentParentHash,
|
|
number: currentParentNumber,
|
|
})
|
|
currentHeader, err := e.blockReader.Header(ctx, tx, currentParentHash, currentParentNumber)
|
|
if err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
if currentHeader == nil {
|
|
sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{
|
|
LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}),
|
|
Status: execution.ExecutionStatus_MissingSegment,
|
|
})
|
|
return
|
|
}
|
|
currentParentHash = currentHeader.ParentHash
|
|
currentParentNumber = currentHeader.Number.Uint64() - 1
|
|
isCanonicalHash, err = rawdb.IsCanonicalHash(tx, currentParentHash, currentParentNumber)
|
|
if err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
}
|
|
|
|
e.executionPipeline.UnwindTo(currentParentNumber, stagedsync.ForkChoice)
|
|
if e.historyV3 {
|
|
if err := rawdbv3.TxNums.Truncate(tx, currentParentNumber); err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
}
|
|
|
|
var finishProgressBefore, headersProgressBefore uint64
|
|
if finishProgressBefore, err = stages.GetStageProgress(tx, stages.Finish); err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
if headersProgressBefore, err = stages.GetStageProgress(tx, stages.Headers); err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
|
|
isSynced := finishProgressBefore > 0 && finishProgressBefore > e.blockReader.FrozenBlocks() && finishProgressBefore == headersProgressBefore
|
|
if e.hook != nil {
|
|
if err = e.hook.BeforeRun(tx, isSynced); err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
}
|
|
|
|
// Run the unwind
|
|
if err := e.executionPipeline.RunUnwind(e.db, tx); err != nil {
|
|
err = fmt.Errorf("updateForkChoice: %w", err)
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
|
|
// Truncate tx nums
|
|
if e.historyV3 {
|
|
if err := rawdbv3.TxNums.Truncate(tx, currentParentNumber); err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
}
|
|
// Mark all new canonicals as canonicals
|
|
for _, canonicalSegment := range newCanonicals {
|
|
chainReader := stagedsync.NewChainReaderImpl(e.config, tx, e.blockReader, e.logger)
|
|
|
|
b, _, _ := rawdb.ReadBody(tx, canonicalSegment.hash, canonicalSegment.number)
|
|
h := rawdb.ReadHeader(tx, canonicalSegment.hash, canonicalSegment.number)
|
|
|
|
if b == nil || h == nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, fmt.Errorf("unexpected chain cap: %d", canonicalSegment.number))
|
|
return
|
|
}
|
|
|
|
if err := e.engine.VerifyHeader(chainReader, h, true); err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
|
|
if err := e.engine.VerifyUncles(chainReader, h, b.Uncles); err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
|
|
if err := rawdb.WriteCanonicalHash(tx, canonicalSegment.hash, canonicalSegment.number); err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
if e.historyV3 {
|
|
if err := rawdb.AppendCanonicalTxNums(tx, canonicalSegment.number); err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
// Set Progress for headers and bodies accordingly.
|
|
if err := stages.SaveStageProgress(tx, stages.Headers, fcuHeader.Number.Uint64()); err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
if err := stages.SaveStageProgress(tx, stages.BlockHashes, fcuHeader.Number.Uint64()); err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
if err := stages.SaveStageProgress(tx, stages.Bodies, fcuHeader.Number.Uint64()); err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
if err = rawdb.WriteHeadHeaderHash(tx, blockHash); err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
if blockHash == e.forkValidator.ExtendingForkHeadHash() {
|
|
e.logger.Info("[updateForkchoice] Fork choice update: flushing in-memory state (built by previous newPayload)")
|
|
if err := e.forkValidator.FlushExtendingFork(tx, e.accumulator); err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
}
|
|
// Run the forkchoice
|
|
if _, err := e.executionPipeline.Run(e.db, tx, false); err != nil {
|
|
err = fmt.Errorf("updateForkChoice: %w", err)
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
// if head hash was set then success otherwise no
|
|
headHash := rawdb.ReadHeadBlockHash(tx)
|
|
headNumber := rawdb.ReadHeaderNumber(tx, headHash)
|
|
log := headNumber != nil && e.logger != nil
|
|
// Update forks...
|
|
writeForkChoiceHashes(tx, blockHash, safeHash, finalizedHash)
|
|
status := execution.ExecutionStatus_Success
|
|
if headHash != blockHash {
|
|
status = execution.ExecutionStatus_BadBlock
|
|
if log {
|
|
e.logger.Warn("bad forkchoice", "head", headHash, "hash", blockHash)
|
|
}
|
|
} else {
|
|
valid, err := e.verifyForkchoiceHashes(ctx, tx, blockHash, finalizedHash, safeHash)
|
|
if err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
if !valid {
|
|
sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{
|
|
Status: execution.ExecutionStatus_InvalidForkchoice,
|
|
LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}),
|
|
})
|
|
return
|
|
}
|
|
if err := rawdb.TruncateCanonicalChain(ctx, tx, *headNumber+1); err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
if e.hook != nil {
|
|
if err := e.db.View(ctx, func(tx kv.Tx) error {
|
|
return e.hook.AfterRun(tx, finishProgressBefore)
|
|
}); err != nil {
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
}
|
|
if log {
|
|
e.logger.Info("head updated", "hash", headHash, "number", *headNumber)
|
|
}
|
|
|
|
if err := e.db.Update(ctx, func(tx kv.RwTx) error { return e.executionPipeline.RunPrune(e.db, tx, false) }); err != nil {
|
|
err = fmt.Errorf("updateForkChoice: %w", err)
|
|
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
|
|
return
|
|
}
|
|
}
|
|
|
|
sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{
|
|
LatestValidHash: gointerfaces.ConvertHashToH256(headHash),
|
|
Status: status,
|
|
})
|
|
}
|