added support of Clique consensus for block processing for erigon2 (#3705)

* added support of Clique consensus for block processing for erigon2

* erigon2: remove block reader which doesn't use system pre and post block transactions
This commit is contained in:
Artem Tsebrovskiy 2022-03-17 10:30:47 +03:00 committed by GitHub
parent 426d972356
commit f14df3faa3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 92 additions and 43 deletions

View File

@ -24,6 +24,7 @@ import (
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/interfaces"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/consensus/ethash"
"github.com/ledgerwatch/erigon/consensus/misc"
"github.com/ledgerwatch/erigon/core"
@ -40,8 +41,9 @@ import (
)
const (
aggregationStep = 15625 /* this is 500'000 / 32 */
unwindLimit = 90000 /* how it is in geth */
aggregationStep = 15625 /* this is 500'000 / 32 */
unwindLimit = 90000 /* how it is in geth */
logInterval = 30 * time.Second // time period to print aggregation stat to log
)
var (
@ -77,10 +79,6 @@ var erigon2Cmd = &cobra.Command{
},
}
const (
logInterval = 30 * time.Second
)
func Erigon2(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log.Logger) error {
sigs := make(chan os.Signal, 1)
interruptCh := make(chan bool, 1)
@ -96,12 +94,14 @@ func Erigon2(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log.
return fmt.Errorf("opening chaindata as read only: %v", err)
}
defer historyDb.Close()
ctx := context.Background()
historyTx, err1 := historyDb.BeginRo(ctx)
if err1 != nil {
return err1
}
defer historyTx.Rollback()
aggPath := filepath.Join(datadir, "aggregator")
if block == 0 {
if _, err = os.Stat(aggPath); err != nil {
@ -115,28 +115,13 @@ func Erigon2(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log.
return err
}
}
agg, err3 := aggregator.NewAggregator(aggPath, unwindLimit, aggregationStep, changesets, commitments, 100_000_000)
if err3 != nil {
return fmt.Errorf("create aggregator: %w", err3)
}
defer agg.Close()
var vmConfig vm.Config
var blockReader interfaces.FullBlockReader
if snapshotBlocks {
snConfig := snapshothashes.KnownConfig(chainConfig.ChainName)
snConfig.ExpectBlocks, err = eth.RestoreExpectedExternalSnapshot(historyDb, snConfig)
if err != nil {
return err
}
allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.NewSnapshotCfg(true, true), path.Join(datadir, "snapshots"))
defer allSnapshots.Close()
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
} else {
blockReader = snapshotsync.NewBlockReader()
}
interrupt := false
w := agg.MakeStateWriter(false /* beforeOn */)
var rootHash []byte
@ -172,9 +157,11 @@ func Erigon2(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log.
logger.Info("Initialised chain configuration", "config", chainConfig)
var (
blockNum uint64 = 0
txNum uint64 = 2
blockNum uint64
trace bool
vmConfig vm.Config
txNum uint64 = 2 // Consider that each block contains at least first system tx and enclosing transactions, except for Clique consensus engine
)
logEvery := time.NewTicker(logInterval)
@ -185,38 +172,57 @@ func Erigon2(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log.
prevTime: time.Now(),
}
for !interrupt {
select {
default:
case <-logEvery.C:
go func() {
for range logEvery.C {
aStats := agg.Stats()
statx.delta(aStats, blockNum).print(aStats, logger)
}
}()
engine := initConsensusEngine(chainConfig, logger)
var blockReader interfaces.FullBlockReader
if snapshotBlocks {
snConfig := snapshothashes.KnownConfig(chainConfig.ChainName)
snConfig.ExpectBlocks, err = eth.RestoreExpectedExternalSnapshot(historyDb, snConfig)
if err != nil {
return err
}
allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.NewSnapshotCfg(true, false), path.Join(datadir, "snapshots"))
defer allSnapshots.Close()
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
} else {
blockReader = snapshotsync.NewBlockReader()
}
for !interrupt {
blockNum++
trace = traceBlock != 0 && blockNum == uint64(traceBlock)
trace = traceBlock > 0 && blockNum == uint64(traceBlock)
blockHash, err := blockReader.CanonicalHash(ctx, historyTx, blockNum)
if err != nil {
return err
}
if blockNum <= block {
if blockNum < block {
_, _, txAmount := rawdb.ReadBody(historyTx, blockHash, blockNum)
// Skip that block, but increase txNum
txNum += uint64(txAmount) + 2
txNum += uint64(txAmount) + 2 // Pre and Post block transaction
continue
}
var b *types.Block
b, _, err = blockReader.BlockWithSenders(ctx, historyTx, blockHash, blockNum)
block, _, err := blockReader.BlockWithSenders(ctx, historyTx, blockHash, blockNum)
if err != nil {
return err
}
if b == nil {
if block == nil {
log.Info("history: block is nil", "block", blockNum)
break
}
r := agg.MakeStateReader(blockNum)
if err = w.Reset(blockNum); err != nil {
return err
}
r := agg.MakeStateReader(blockNum)
readWrapper := &ReaderWrapper{r: r, blockNum: blockNum}
writeWrapper := &WriterWrapper{w: w, blockNum: blockNum}
getHeader := func(hash common.Hash, number uint64) *types.Header {
@ -226,29 +232,34 @@ func Erigon2(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log.
}
return h
}
txNum++ // Pre-block transaction
if txNum, _, err = runBlock2(trace, txNum, readWrapper, writeWrapper, chainConfig, getHeader, b, vmConfig); err != nil {
return fmt.Errorf("block %d: %w", blockNum, err)
if txNum, _, err = processBlock(trace, txNum, readWrapper, writeWrapper, chainConfig, engine, getHeader, block, vmConfig); err != nil {
return fmt.Errorf("processing block %d: %w", blockNum, err)
}
if err := w.FinishTx(txNum, trace); err != nil {
return fmt.Errorf("final finish failed: %w", err)
return fmt.Errorf("failed to finish tx: %w", err)
}
if trace {
fmt.Printf("FinishTx called for %d block %d\n", txNum, blockNum)
}
txNum++ // Post-block transaction
// Check for interrupts
select {
case interrupt = <-interruptCh:
log.Info(fmt.Sprintf("interrupted, please wait for cleanup, next time start with --block %d", blockNum))
default:
}
if commitments && (interrupt || blockNum%uint64(commitmentFrequency) == 0) {
if rootHash, err = w.ComputeCommitment(trace /* trace */); err != nil {
return err
}
if !bytes.Equal(rootHash, b.Header().Root[:]) {
return fmt.Errorf("root hash mismatch for block %d, expected [%x], was [%x]", blockNum, b.Header().Root[:], rootHash)
if !bytes.Equal(rootHash, block.Header().Root[:]) {
return fmt.Errorf("root hash mismatch for block %d, expected [%x], was [%x]", blockNum, block.Header().Root[:], rootHash)
}
}
if err = w.Aggregate(trace); err != nil {
@ -315,17 +326,18 @@ func (s *stat) delta(aStats aggregator.FilesStats, blockNum uint64) *stat {
return s
}
func runBlock2(trace bool, txNumStart uint64, rw *ReaderWrapper, ww *WriterWrapper, chainConfig *params.ChainConfig, getHeader func(hash common.Hash, number uint64) *types.Header, block *types.Block, vmConfig vm.Config) (uint64, types.Receipts, error) {
func processBlock(trace bool, txNumStart uint64, rw *ReaderWrapper, ww *WriterWrapper, chainConfig *params.ChainConfig, engine consensus.Engine, getHeader func(hash common.Hash, number uint64) *types.Header, block *types.Block, vmConfig vm.Config) (uint64, types.Receipts, error) {
defer blockExecutionTimer.UpdateDuration(time.Now())
header := block.Header()
vmConfig.TraceJumpDest = true
engine := ethash.NewFullFaker()
gp := new(core.GasPool).AddGas(block.GasLimit())
usedGas := new(uint64)
var receipts types.Receipts
daoBlock := chainConfig.DAOForkSupport && chainConfig.DAOForkBlock != nil && chainConfig.DAOForkBlock.Cmp(block.Number()) == 0
rules := chainConfig.Rules(block.NumberU64())
txNum := txNumStart
for i, tx := range block.Transactions() {
ibs := state.New(rw)
if daoBlock {
@ -348,6 +360,7 @@ func runBlock2(trace bool, txNumStart uint64, rw *ReaderWrapper, ww *WriterWrapp
}
ibs := state.New(rw)
// Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
if _, _, _, err := engine.FinalizeAndAssemble(chainConfig, header, ibs, block.Transactions(), block.Uncles(), receipts, nil, nil, nil, nil); err != nil {
return 0, nil, fmt.Errorf("finalize of block %d failed: %w", block.NumberU64(), err)
@ -526,3 +539,26 @@ func (ww *WriterWrapper) WriteAccountStorage(address common.Address, incarnation
func (ww *WriterWrapper) CreateContract(address common.Address) error {
return nil
}
func initConsensusEngine(chainConfig *params.ChainConfig, logger log.Logger) (engine consensus.Engine) {
config := ethconfig.Defaults
switch {
case chainConfig.Clique != nil:
c := params.CliqueSnapshot
c.DBPath = filepath.Join(datadir, "clique", "db")
engine = ethconfig.CreateConsensusEngine(chainConfig, logger, c, config.Miner.Notify, config.Miner.Noverify, "", true, datadir)
case chainConfig.Aura != nil:
consensusConfig := &params.AuRaConfig{DBPath: filepath.Join(datadir, "aura")}
engine = ethconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadir)
case chainConfig.Parlia != nil:
consensusConfig := &params.ParliaConfig{DBPath: filepath.Join(datadir, "parlia")}
engine = ethconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadir)
case chainConfig.Bor != nil:
consensusConfig := &config.Bor
engine = ethconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "http://localhost:1317", false, datadir)
default: //ethash
engine = ethash.NewFaker()
}
return
}

View File

@ -26,13 +26,14 @@ import (
"time"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/interfaces"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/ethdb/cbor"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/log/v3"
)
// ReadCanonicalHash retrieves the hash assigned to a canonical block number.
@ -509,6 +510,7 @@ func ReadBody(db kv.Getter, hash common.Hash, number uint64) (*types.Body, uint6
}
body := new(types.Body)
body.Uncles = bodyForStorage.Uncles
if bodyForStorage.TxAmount < 2 {
panic(fmt.Sprintf("block body hash too few txs amount: %d, %d", number, bodyForStorage.TxAmount))
}

View File

@ -493,6 +493,9 @@ func InitPreverifiedHashes(chain string) (map[common.Hash]struct{}, uint64) {
case networkname.SepoliaChainName:
encodings = sepoliaPreverifiedHashes
height = sepoliaPreverifiedHeight
case networkname.GoerliChainName:
encodings = goerliPreverifiedHashes
height = goerliPreferifiedHeight
default:
log.Debug("Preverified hashes not found for", "chain", chain)
return nil, 0

View File

@ -0,0 +1,8 @@
package headerdownload
var goerliPreferifiedHeight uint64 = 2
var goerliPreverifiedHashes = []string{
"696d95da6726a67afd5be2a37d3883e9be8008491b30d5bd1069ea5922fa2a41",
"89b977ffab1052f5f6f778e4680354a8a1a1e0f623db9b2f20beba9f36f68721",
}