mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-15 15:28:19 +00:00
fd77eaf86a
* introduce PlainStateReader with fallbacks * no 10.000 changes in tests * even less iterations * remove even more iterations * add `go run ./cmd/geth --syncmode staged --plainstate` flag * fix serialization calls * make a more sensible file default doesn’t affect anything, because this flag is always overriden when parsing CLI. but still.
2510 lines
87 KiB
Go
2510 lines
87 KiB
Go
// Copyright 2014 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
// Package core implements the Ethereum consensus protocol.
|
|
package core
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math/big"
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
lru "github.com/hashicorp/golang-lru"
|
|
|
|
"github.com/ledgerwatch/turbo-geth/common"
|
|
"github.com/ledgerwatch/turbo-geth/common/debug"
|
|
"github.com/ledgerwatch/turbo-geth/common/mclock"
|
|
"github.com/ledgerwatch/turbo-geth/common/prque"
|
|
"github.com/ledgerwatch/turbo-geth/consensus"
|
|
"github.com/ledgerwatch/turbo-geth/consensus/misc"
|
|
"github.com/ledgerwatch/turbo-geth/core/rawdb"
|
|
"github.com/ledgerwatch/turbo-geth/core/state"
|
|
"github.com/ledgerwatch/turbo-geth/core/types"
|
|
"github.com/ledgerwatch/turbo-geth/core/vm"
|
|
"github.com/ledgerwatch/turbo-geth/ethdb"
|
|
"github.com/ledgerwatch/turbo-geth/event"
|
|
"github.com/ledgerwatch/turbo-geth/log"
|
|
"github.com/ledgerwatch/turbo-geth/metrics"
|
|
"github.com/ledgerwatch/turbo-geth/params"
|
|
"github.com/ledgerwatch/turbo-geth/rlp"
|
|
)
|
|
|
|
var (
|
|
headBlockGauge = metrics.NewRegisteredGauge("chain/head/block", nil)
|
|
headHeaderGauge = metrics.NewRegisteredGauge("chain/head/header", nil)
|
|
headFastBlockGauge = metrics.NewRegisteredGauge("chain/head/receipt", nil)
|
|
|
|
accountReadTimer = metrics.NewRegisteredTimer("chain/account/reads", nil)
|
|
accountHashTimer = metrics.NewRegisteredTimer("chain/account/hashes", nil)
|
|
accountUpdateTimer = metrics.NewRegisteredTimer("chain/account/updates", nil)
|
|
accountCommitTimer = metrics.NewRegisteredTimer("chain/account/commits", nil)
|
|
|
|
storageReadTimer = metrics.NewRegisteredTimer("chain/storage/reads", nil)
|
|
storageHashTimer = metrics.NewRegisteredTimer("chain/storage/hashes", nil)
|
|
storageUpdateTimer = metrics.NewRegisteredTimer("chain/storage/updates", nil)
|
|
storageCommitTimer = metrics.NewRegisteredTimer("chain/storage/commits", nil)
|
|
|
|
blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil)
|
|
blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil)
|
|
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
|
|
blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)
|
|
blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil)
|
|
blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil)
|
|
|
|
blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil)
|
|
blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil)
|
|
|
|
errInsertionInterrupted = errors.New("insertion is interrupted")
|
|
|
|
// ErrNotFound is returned when sought data isn't found.
|
|
ErrNotFound = errors.New("data not found")
|
|
)
|
|
|
|
const (
|
|
bodyCacheLimit = 256
|
|
blockCacheLimit = 256
|
|
receiptsCacheLimit = 32
|
|
txLookupCacheLimit = 1024
|
|
maxFutureBlocks = 256
|
|
maxTimeFutureBlocks = 30
|
|
badBlockLimit = 10
|
|
triesInMemory = 128
|
|
|
|
// BlockChainVersion ensures that an incompatible database forces a resync from scratch.
|
|
//
|
|
// Changelog:
|
|
//
|
|
// - Version 4
|
|
// The following incompatible database changes were added:
|
|
// * the `BlockNumber`, `TxHash`, `TxIndex`, `BlockHash` and `Index` fields of log are deleted
|
|
// * the `Bloom` field of receipt is deleted
|
|
// * the `BlockIndex` and `TxIndex` fields of txlookup are deleted
|
|
// - Version 5
|
|
// The following incompatible database changes were added:
|
|
// * the `TxHash`, `GasCost`, and `ContractAddress` fields are no longer stored for a receipt
|
|
// * the `TxHash`, `GasCost`, and `ContractAddress` fields are computed by looking up the
|
|
// receipts' corresponding block
|
|
// - Version 6
|
|
// The following incompatible database changes were added:
|
|
// * Transaction lookup information stores the corresponding block number instead of block hash
|
|
// - Version 7
|
|
// The following incompatible database changes were added:
|
|
// * Use freezer as the ancient database to maintain all ancient data
|
|
BlockChainVersion uint64 = 7
|
|
)
|
|
|
|
// CacheConfig contains the configuration values for the trie caching/pruning
|
|
// that's resident in a blockchain.
|
|
type CacheConfig struct {
|
|
Pruning bool
|
|
TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
|
|
TrieCleanNoPrefetch bool // Whether to disable heuristic state prefetching for followup blocks
|
|
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
|
|
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
|
|
|
|
BlocksBeforePruning uint64
|
|
BlocksToPrune uint64
|
|
PruneTimeout time.Duration
|
|
ArchiveSyncInterval uint64
|
|
DownloadOnly bool
|
|
NoHistory bool
|
|
}
|
|
|
|
// BlockChain represents the canonical chain given a database with a genesis
|
|
// block. The Blockchain manages chain imports, reverts, chain reorganisations.
|
|
//
|
|
// Importing blocks in to the block chain happens according to the set of rules
|
|
// defined by the two stage Validator. Processing of blocks is done using the
|
|
// Processor which processes the included transaction. The validation of the state
|
|
// is done in the second part of the Validator. Failing results in aborting of
|
|
// the import.
|
|
//
|
|
// The BlockChain also helps in returning blocks from **any** chain included
|
|
// in the database as well as blocks that represents the canonical chain. It's
|
|
// important to note that GetBlock can return any block and does not need to be
|
|
// included in the canonical one where as GetBlockByNumber always represents the
|
|
// canonical chain.
|
|
type BlockChain struct {
|
|
chainConfig *params.ChainConfig // Chain & network configuration
|
|
cacheConfig *CacheConfig // Cache configuration for pruning
|
|
|
|
db ethdb.DbWithPendingMutations // Low level persistent database to store final content in
|
|
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
|
|
gcproc time.Duration // Accumulates canonical block processing for trie dumping
|
|
|
|
hc *HeaderChain
|
|
rmLogsFeed event.Feed
|
|
chainFeed event.Feed
|
|
chainSideFeed event.Feed
|
|
chainHeadFeed event.Feed
|
|
logsFeed event.Feed
|
|
blockProcFeed event.Feed
|
|
scope event.SubscriptionScope
|
|
genesisBlock *types.Block
|
|
|
|
chainmu sync.RWMutex // blockchain insertion lock
|
|
|
|
currentBlock atomic.Value // Current head of the block chain
|
|
committedBlock atomic.Value // Committed head of the block chain
|
|
currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!)
|
|
|
|
trieDbState *state.TrieDbState
|
|
bodyCache *lru.Cache // Cache for the most recent block bodies
|
|
bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format
|
|
receiptsCache *lru.Cache // Cache for the most recent receipts per block
|
|
blockCache *lru.Cache // Cache for the most recent entire blocks
|
|
txLookupCache *lru.Cache // Cache for the most recent transaction lookup data.
|
|
futureBlocks *lru.Cache // future blocks are blocks added for later processing
|
|
|
|
quit chan struct{} // blockchain quit channel
|
|
running int32 // running must be called atomically
|
|
// procInterrupt must be atomically called
|
|
procInterrupt int32 // interrupt signaler for block processing
|
|
wg sync.WaitGroup // chain processing wait group for shutting down
|
|
quitMu sync.RWMutex
|
|
|
|
engine consensus.Engine
|
|
validator Validator // Block and state validator interface
|
|
prefetcher Prefetcher // Block state prefetcher interface
|
|
processor Processor // Block transaction processor interface
|
|
vmConfig vm.Config
|
|
|
|
badBlocks *lru.Cache // Bad block cache
|
|
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
|
|
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
|
|
highestKnownBlock uint64
|
|
highestKnownBlockMu sync.Mutex
|
|
enableReceipts bool // Whether receipts need to be written to the database
|
|
enableTxLookupIndex bool // Whether we store tx lookup index into the database
|
|
enablePreimages bool // Whether we store preimages into the database
|
|
resolveReads bool
|
|
pruner Pruner
|
|
}
|
|
|
|
// NewBlockChain returns a fully initialised block chain using information
|
|
// available in the database. It initialises the default Ethereum Validator and
|
|
// Processor.
|
|
func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool) (*BlockChain, error) {
|
|
if cacheConfig == nil {
|
|
cacheConfig = &CacheConfig{
|
|
Pruning: false,
|
|
BlocksBeforePruning: 1024,
|
|
TrieCleanLimit: 256,
|
|
TrieDirtyLimit: 256,
|
|
TrieTimeLimit: 5 * time.Minute,
|
|
DownloadOnly: false,
|
|
NoHistory: false,
|
|
}
|
|
}
|
|
if cacheConfig.ArchiveSyncInterval == 0 {
|
|
cacheConfig.ArchiveSyncInterval = 1024
|
|
}
|
|
|
|
bodyCache, _ := lru.New(bodyCacheLimit)
|
|
bodyRLPCache, _ := lru.New(bodyCacheLimit)
|
|
receiptsCache, _ := lru.New(receiptsCacheLimit)
|
|
blockCache, _ := lru.New(blockCacheLimit)
|
|
txLookupCache, _ := lru.New(txLookupCacheLimit)
|
|
futureBlocks, _ := lru.New(maxFutureBlocks)
|
|
badBlocks, _ := lru.New(badBlockLimit)
|
|
cdb := db.NewBatch()
|
|
|
|
bc := &BlockChain{
|
|
chainConfig: chainConfig,
|
|
cacheConfig: cacheConfig,
|
|
db: cdb,
|
|
triegc: prque.New(nil),
|
|
quit: make(chan struct{}),
|
|
shouldPreserve: shouldPreserve,
|
|
bodyCache: bodyCache,
|
|
bodyRLPCache: bodyRLPCache,
|
|
receiptsCache: receiptsCache,
|
|
blockCache: blockCache,
|
|
txLookupCache: txLookupCache,
|
|
futureBlocks: futureBlocks,
|
|
engine: engine,
|
|
vmConfig: vmConfig,
|
|
badBlocks: badBlocks,
|
|
enableTxLookupIndex: true,
|
|
enableReceipts: false,
|
|
enablePreimages: true,
|
|
}
|
|
bc.validator = NewBlockValidator(chainConfig, bc, engine)
|
|
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
|
|
bc.processor = NewStateProcessor(chainConfig, bc, engine)
|
|
|
|
var err error
|
|
bc.hc, err = NewHeaderChain(cdb, chainConfig, engine, bc.getProcInterrupt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
bc.genesisBlock = bc.GetBlockByNumber(0)
|
|
if bc.genesisBlock == nil {
|
|
return nil, ErrNoGenesis
|
|
}
|
|
|
|
if err := bc.loadLastState(); err != nil {
|
|
return nil, err
|
|
}
|
|
// The first thing the node will do is reconstruct the verification data for
|
|
// the head block (ethash cache or clique voting snapshot). Might as well do
|
|
// it in advance.
|
|
bc.engine.VerifyHeader(bc, bc.CurrentHeader(), true)
|
|
|
|
if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 {
|
|
var (
|
|
needRewind bool
|
|
low uint64
|
|
)
|
|
// The head full block may be rolled back to a very low height due to
|
|
// blockchain repair. If the head full block is even lower than the ancient
|
|
// chain, truncate the ancient store.
|
|
fullBlock := bc.CurrentBlock()
|
|
if fullBlock != nil && fullBlock != bc.genesisBlock && fullBlock.NumberU64() < frozen-1 {
|
|
needRewind = true
|
|
low = fullBlock.NumberU64()
|
|
}
|
|
// In fast sync, it may happen that ancient data has been written to the
|
|
// ancient store, but the LastFastBlock has not been updated, truncate the
|
|
// extra data here.
|
|
fastBlock := bc.CurrentFastBlock()
|
|
if fastBlock != nil && fastBlock.NumberU64() < frozen-1 {
|
|
needRewind = true
|
|
if fastBlock.NumberU64() < low || low == 0 {
|
|
low = fastBlock.NumberU64()
|
|
}
|
|
}
|
|
if needRewind {
|
|
var hashes []common.Hash
|
|
previous := bc.CurrentHeader().Number.Uint64()
|
|
for i := low + 1; i <= bc.CurrentHeader().Number.Uint64(); i++ {
|
|
hashes = append(hashes, rawdb.ReadCanonicalHash(bc.db, i))
|
|
}
|
|
bc.Rollback(hashes)
|
|
log.Warn("Truncate ancient chain", "from", previous, "to", low)
|
|
}
|
|
}
|
|
// Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain
|
|
for hash := range BadHashes {
|
|
if header := bc.GetHeaderByHash(hash); header != nil {
|
|
// get the canonical block corresponding to the offending header's number
|
|
headerByNumber := bc.GetHeaderByNumber(header.Number.Uint64())
|
|
// make sure the headerByNumber (if present) is in our current canonical chain
|
|
if headerByNumber != nil && headerByNumber.Hash() == header.Hash() {
|
|
log.Error("Found bad hash, rewinding chain", "number", header.Number, "hash", header.ParentHash)
|
|
bc.SetHead(header.Number.Uint64() - 1)
|
|
log.Error("Chain rewind was successful, resuming normal operation")
|
|
}
|
|
}
|
|
}
|
|
// Take ownership of this particular state
|
|
go bc.update()
|
|
if cacheConfig.Pruning {
|
|
var innerErr error
|
|
bc.pruner, innerErr = NewBasicPruner(db, bc, bc.cacheConfig)
|
|
if innerErr != nil {
|
|
log.Error("Pruner init error", "err", innerErr)
|
|
return nil, innerErr
|
|
}
|
|
|
|
innerErr = bc.pruner.Start()
|
|
if innerErr != nil {
|
|
log.Error("Pruner start error", "err", innerErr)
|
|
return nil, innerErr
|
|
}
|
|
}
|
|
return bc, nil
|
|
}
|
|
|
|
func (bc *BlockChain) SetResolveReads(rr bool) {
|
|
bc.resolveReads = rr
|
|
}
|
|
|
|
func (bc *BlockChain) EnableReceipts(er bool) {
|
|
bc.enableReceipts = er
|
|
}
|
|
|
|
func (bc *BlockChain) EnableTxLookupIndex(et bool) {
|
|
bc.enableTxLookupIndex = et
|
|
}
|
|
|
|
func (bc *BlockChain) EnablePreimages(ep bool) {
|
|
bc.enablePreimages = ep
|
|
}
|
|
|
|
func (bc *BlockChain) GetTrieDbState() (*state.TrieDbState, error) {
|
|
if bc.trieDbState == nil && !bc.cacheConfig.DownloadOnly {
|
|
currentBlockNr := bc.CurrentBlock().NumberU64()
|
|
trieDbState, err := bc.GetTrieDbStateByBlock(bc.CurrentBlock().Header().Root, currentBlockNr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
bc.setTrieDbState(trieDbState)
|
|
log.Info("Creation complete.")
|
|
}
|
|
return bc.trieDbState, nil
|
|
}
|
|
|
|
func (bc *BlockChain) setTrieDbState(trieDbState *state.TrieDbState) {
|
|
log.Warn("trieDbState has been changed", "isNil", trieDbState == nil, "callers", debug.Callers(20))
|
|
bc.trieDbState = trieDbState
|
|
}
|
|
|
|
func (bc *BlockChain) GetTrieDbStateByBlock(root common.Hash, blockNr uint64) (*state.TrieDbState, error) {
|
|
if bc.trieDbState == nil || bc.trieDbState.LastRoot() != root || bc.trieDbState.GetBlockNr() != blockNr {
|
|
log.Info("Creating IntraBlockState from latest state", "block", blockNr, "isNIl", bc.trieDbState == nil, "callers", debug.Callers(20))
|
|
tds := state.NewTrieDbState(root, bc.db, blockNr)
|
|
tds.SetNoHistory(bc.NoHistory())
|
|
tds.SetResolveReads(bc.resolveReads)
|
|
tds.EnablePreimages(bc.enablePreimages)
|
|
|
|
log.Info("Creation complete.")
|
|
return tds, nil
|
|
}
|
|
return bc.trieDbState, nil
|
|
}
|
|
|
|
func (bc *BlockChain) getProcInterrupt() bool {
|
|
return atomic.LoadInt32(&bc.procInterrupt) == 1
|
|
}
|
|
|
|
// GetVMConfig returns the block chain VM config.
|
|
func (bc *BlockChain) GetVMConfig() *vm.Config {
|
|
return &bc.vmConfig
|
|
}
|
|
|
|
// empty returns an indicator whether the blockchain is empty.
|
|
// Note, it's a special case that we connect a non-empty ancient
|
|
// database with an empty node, so that we can plugin the ancient
|
|
// into node seamlessly.
|
|
func (bc *BlockChain) empty() bool {
|
|
genesis := bc.genesisBlock.Hash()
|
|
for _, hash := range []common.Hash{rawdb.ReadHeadBlockHash(bc.db), rawdb.ReadHeadHeaderHash(bc.db), rawdb.ReadHeadFastBlockHash(bc.db)} {
|
|
if hash != genesis {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// loadLastState loads the last known chain state from the database. This method
|
|
// assumes that the chain manager mutex is held.
|
|
func (bc *BlockChain) loadLastState() error {
|
|
// Restore the last known head block
|
|
head := rawdb.ReadHeadBlockHash(bc.db)
|
|
if head == (common.Hash{}) {
|
|
// Corrupt or empty database, init from scratch
|
|
log.Warn("Empty database, resetting chain")
|
|
return bc.Reset()
|
|
}
|
|
// Make sure the entire head block is available
|
|
currentBlock := bc.GetBlockByHash(head)
|
|
if currentBlock == nil {
|
|
// Corrupt or empty database, init from scratch
|
|
log.Warn("Head block missing, resetting chain", "hash", head)
|
|
return bc.Reset()
|
|
}
|
|
// Make sure the state associated with the block is available
|
|
// Everything seems to be fine, set as the head block
|
|
bc.currentBlock.Store(currentBlock)
|
|
headBlockGauge.Update(int64(currentBlock.NumberU64()))
|
|
|
|
// Restore the last known head header
|
|
currentHeader := currentBlock.Header()
|
|
if head := rawdb.ReadHeadHeaderHash(bc.db); head != (common.Hash{}) {
|
|
if header := bc.GetHeaderByHash(head); header != nil {
|
|
currentHeader = header
|
|
}
|
|
}
|
|
bc.hc.SetCurrentHeader(bc.db, currentHeader)
|
|
|
|
// Restore the last known head fast block
|
|
bc.currentFastBlock.Store(currentBlock)
|
|
headFastBlockGauge.Update(int64(currentBlock.NumberU64()))
|
|
|
|
if head := rawdb.ReadHeadFastBlockHash(bc.db); head != (common.Hash{}) {
|
|
if block := bc.GetBlockByHash(head); block != nil {
|
|
bc.currentFastBlock.Store(block)
|
|
headFastBlockGauge.Update(int64(block.NumberU64()))
|
|
}
|
|
}
|
|
// Issue a status log for the user
|
|
currentFastBlock := bc.CurrentFastBlock()
|
|
|
|
headerTd := bc.GetTd(currentHeader.Hash(), currentHeader.Number.Uint64())
|
|
blockTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
|
|
fastTd := bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64())
|
|
|
|
log.Info("Loaded most recent local header", "number", currentHeader.Number, "hash", currentHeader.Hash(), "td", headerTd, "age", common.PrettyAge(time.Unix(int64(currentHeader.Time), 0)))
|
|
log.Info("Loaded most recent local full block", "number", currentBlock.Number(), "hash", currentBlock.Hash(), "td", blockTd, "age", common.PrettyAge(time.Unix(int64(currentBlock.Time()), 0)))
|
|
log.Info("Loaded most recent local fast block", "number", currentFastBlock.Number(), "hash", currentFastBlock.Hash(), "td", fastTd, "age", common.PrettyAge(time.Unix(int64(currentFastBlock.Time()), 0)))
|
|
|
|
return nil
|
|
}
|
|
|
|
// SetHead rewinds the local chain to a new head. In the case of headers, everything
|
|
// above the new head will be deleted and the new one set. In the case of blocks
|
|
// though, the head may be further rewound if block bodies are missing (non-archive
|
|
// nodes after a fast sync).
|
|
func (bc *BlockChain) SetHead(head uint64) error {
|
|
log.Warn("Rewinding blockchain", "target", head)
|
|
|
|
bc.chainmu.Lock()
|
|
defer bc.chainmu.Unlock()
|
|
|
|
updateFn := func(db rawdb.DatabaseWriter, header *types.Header) {
|
|
// Rewind the block chain, ensuring we don't end up with a stateless head block
|
|
if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() < currentBlock.NumberU64() {
|
|
newHeadBlock := bc.GetBlock(header.Hash(), header.Number.Uint64())
|
|
if newHeadBlock == nil {
|
|
newHeadBlock = bc.genesisBlock
|
|
}
|
|
rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash())
|
|
|
|
// Degrade the chain markers if they are explicitly reverted.
|
|
// In theory we should update all in-memory markers in the
|
|
// last step, however the direction of SetHead is from high
|
|
// to low, so it's safe the update in-memory markers directly.
|
|
bc.currentBlock.Store(newHeadBlock)
|
|
headBlockGauge.Update(int64(newHeadBlock.NumberU64()))
|
|
}
|
|
|
|
// Rewind the fast block in a simpleton way to the target head
|
|
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && header.Number.Uint64() < currentFastBlock.NumberU64() {
|
|
newHeadFastBlock := bc.GetBlock(header.Hash(), header.Number.Uint64())
|
|
// If either blocks reached nil, reset to the genesis state
|
|
if newHeadFastBlock == nil {
|
|
newHeadFastBlock = bc.genesisBlock
|
|
}
|
|
rawdb.WriteHeadFastBlockHash(db, newHeadFastBlock.Hash())
|
|
|
|
// Degrade the chain markers if they are explicitly reverted.
|
|
// In theory we should update all in-memory markers in the
|
|
// last step, however the direction of SetHead is from high
|
|
// to low, so it's safe the update in-memory markers directly.
|
|
bc.currentFastBlock.Store(newHeadFastBlock)
|
|
headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64()))
|
|
}
|
|
}
|
|
|
|
// Rewind the header chain, deleting all block bodies until then
|
|
delFn := func(db rawdb.DatabaseDeleter, hash common.Hash, num uint64) {
|
|
// Ignore the error here since light client won't hit this path
|
|
frozen, _ := bc.db.Ancients()
|
|
if num+1 <= frozen {
|
|
// Truncate all relative data(header, total difficulty, body, receipt
|
|
// and canonical hash) from ancient store.
|
|
if err := bc.db.TruncateAncients(num + 1); err != nil {
|
|
log.Crit("Failed to truncate ancient data", "number", num, "err", err)
|
|
}
|
|
|
|
// Remove the hash <-> number mapping from the active store.
|
|
rawdb.DeleteHeaderNumber(db, hash)
|
|
} else {
|
|
// Remove relative body and receipts from the active store.
|
|
// The header, total difficulty and canonical hash will be
|
|
// removed in the hc.SetHead function.
|
|
rawdb.DeleteBody(db, hash, num)
|
|
rawdb.DeleteReceipts(db, hash, num)
|
|
}
|
|
// Todo(rjl493456442) txlookup, bloombits, etc
|
|
}
|
|
bc.hc.SetHead(head, updateFn, delFn)
|
|
|
|
// Clear out any stale content from the caches
|
|
bc.bodyCache.Purge()
|
|
bc.bodyRLPCache.Purge()
|
|
bc.receiptsCache.Purge()
|
|
bc.blockCache.Purge()
|
|
bc.txLookupCache.Purge()
|
|
bc.futureBlocks.Purge()
|
|
|
|
return bc.loadLastState()
|
|
}
|
|
|
|
// FastSyncCommitHead sets the current head block to the one defined by the hash
|
|
// irrelevant what the chain contents were prior.
|
|
func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {
|
|
// Make sure that both the block as well at its state trie exists
|
|
block := bc.GetBlockByHash(hash)
|
|
if block == nil {
|
|
return fmt.Errorf("non existent block [%x…]", hash[:4])
|
|
}
|
|
// If all checks out, manually set the head block
|
|
bc.chainmu.Lock()
|
|
bc.currentBlock.Store(block)
|
|
headBlockGauge.Update(int64(block.NumberU64()))
|
|
bc.chainmu.Unlock()
|
|
|
|
log.Info("Committed new head block", "number", block.Number(), "hash", hash)
|
|
return nil
|
|
}
|
|
|
|
// GasLimit returns the gas limit of the current HEAD block.
|
|
func (bc *BlockChain) GasLimit() uint64 {
|
|
return bc.CurrentBlock().GasLimit()
|
|
}
|
|
|
|
// CurrentBlock retrieves the current head block of the canonical chain. The
|
|
// block is retrieved from the blockchain's internal cache.
|
|
func (bc *BlockChain) CurrentBlock() *types.Block {
|
|
return bc.currentBlock.Load().(*types.Block)
|
|
}
|
|
|
|
// CurrentFastBlock retrieves the current fast-sync head block of the canonical
|
|
// chain. The block is retrieved from the blockchain's internal cache.
|
|
func (bc *BlockChain) CurrentFastBlock() *types.Block {
|
|
return bc.currentFastBlock.Load().(*types.Block)
|
|
}
|
|
|
|
// Validator returns the current validator.
|
|
func (bc *BlockChain) Validator() Validator {
|
|
return bc.validator
|
|
}
|
|
|
|
// Processor returns the current processor.
|
|
func (bc *BlockChain) Processor() Processor {
|
|
return bc.processor
|
|
}
|
|
|
|
// State returns a new mutable state based on the current HEAD block.
|
|
func (bc *BlockChain) State() (*state.IntraBlockState, *state.DbState, error) {
|
|
return bc.StateAt(bc.CurrentBlock().NumberU64())
|
|
}
|
|
|
|
// StateAt returns a new mutable state based on a particular point in time.
|
|
func (bc *BlockChain) StateAt(blockNr uint64) (*state.IntraBlockState, *state.DbState, error) {
|
|
dbstate := state.NewDbState(bc.db, blockNr)
|
|
return state.New(dbstate), dbstate, nil
|
|
}
|
|
|
|
// GetAddressFromItsHash returns the preimage of a given address hash.
|
|
func (bc *BlockChain) GetAddressFromItsHash(hash common.Hash) (common.Address, error) {
|
|
var addr common.Address
|
|
|
|
_, dbstate, err := bc.State()
|
|
if err != nil {
|
|
return addr, err
|
|
}
|
|
|
|
key := dbstate.GetKey(hash.Bytes())
|
|
if len(key) != common.AddressLength {
|
|
return addr, ErrNotFound
|
|
}
|
|
|
|
addr.SetBytes(key)
|
|
return addr, nil
|
|
}
|
|
|
|
// Reset purges the entire blockchain, restoring it to its genesis state.
|
|
func (bc *BlockChain) Reset() error {
|
|
return bc.ResetWithGenesisBlock(bc.genesisBlock)
|
|
}
|
|
|
|
// ResetWithGenesisBlock purges the entire blockchain, restoring it to the
|
|
// specified genesis state.
|
|
func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
|
|
// Dump the entire block chain and purge the caches
|
|
if err := bc.SetHead(0); err != nil {
|
|
return err
|
|
}
|
|
bc.chainmu.Lock()
|
|
defer bc.chainmu.Unlock()
|
|
|
|
rawdb.WriteTd(bc.db, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty())
|
|
rawdb.WriteBlock(context.Background(), bc.db, genesis)
|
|
bc.writeHeadBlock(genesis)
|
|
|
|
// Last update all in-memory chain markers
|
|
bc.genesisBlock = genesis
|
|
bc.currentBlock.Store(bc.genesisBlock)
|
|
headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
|
|
bc.hc.SetGenesis(bc.genesisBlock.Header())
|
|
bc.hc.SetCurrentHeader(bc.db, bc.genesisBlock.Header())
|
|
bc.currentFastBlock.Store(bc.genesisBlock)
|
|
headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
|
|
return nil
|
|
}
|
|
|
|
// Export writes the active chain to the given writer.
|
|
func (bc *BlockChain) Export(w io.Writer) error {
|
|
return bc.ExportN(w, uint64(0), bc.CurrentBlock().NumberU64())
|
|
}
|
|
|
|
// ExportN writes a subset of the active chain to the given writer.
|
|
func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
|
|
bc.chainmu.RLock()
|
|
defer bc.chainmu.RUnlock()
|
|
|
|
if first > last {
|
|
return fmt.Errorf("export failed: first (%d) is greater than last (%d)", first, last)
|
|
}
|
|
log.Info("Exporting batch of blocks", "count", last-first+1)
|
|
|
|
start, reported := time.Now(), time.Now()
|
|
for nr := first; nr <= last; nr++ {
|
|
block := bc.GetBlockByNumber(nr)
|
|
if block == nil {
|
|
return fmt.Errorf("export failed on #%d: not found", nr)
|
|
}
|
|
if err := block.EncodeRLP(w); err != nil {
|
|
return err
|
|
}
|
|
if time.Since(reported) >= statsReportLimit {
|
|
log.Info("Exporting blocks", "exported", block.NumberU64()-first, "elapsed", common.PrettyDuration(time.Since(start)))
|
|
reported = time.Now()
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// writeHeadBlock injects a new head block into the current block chain. This method
|
|
// assumes that the block is indeed a true head. It will also reset the head
|
|
// header and the head fast sync block to this very same block if they are older
|
|
// or if they are on a different side chain.
|
|
//
|
|
// Note, this function assumes that the `mu` mutex is held!
|
|
func (bc *BlockChain) writeHeadBlock(block *types.Block) {
|
|
// If the block is on a side chain or an unknown one, force other heads onto it too
|
|
updateHeads := rawdb.ReadCanonicalHash(bc.db, block.NumberU64()) != block.Hash()
|
|
|
|
// Add the block to the canonical chain number scheme and mark as the head
|
|
rawdb.WriteCanonicalHash(bc.db, block.Hash(), block.NumberU64())
|
|
if bc.enableTxLookupIndex && !bc.cacheConfig.DownloadOnly {
|
|
rawdb.WriteTxLookupEntries(bc.db, block)
|
|
}
|
|
rawdb.WriteHeadBlockHash(bc.db, block.Hash())
|
|
|
|
if updateHeads {
|
|
bc.hc.SetCurrentHeader(bc.db, block.Header())
|
|
rawdb.WriteHeadFastBlockHash(bc.db, block.Hash())
|
|
}
|
|
|
|
// If the block is better than our head or is on a different chain, force update heads
|
|
if updateHeads {
|
|
bc.currentFastBlock.Store(block)
|
|
headFastBlockGauge.Update(int64(block.NumberU64()))
|
|
}
|
|
bc.currentBlock.Store(block)
|
|
headBlockGauge.Update(int64(block.NumberU64()))
|
|
}
|
|
|
|
// Genesis retrieves the chain's genesis block.
|
|
func (bc *BlockChain) Genesis() *types.Block {
|
|
return bc.genesisBlock
|
|
}
|
|
|
|
// GetBody retrieves a block body (transactions and uncles) from the database by
|
|
// hash, caching it if found.
|
|
func (bc *BlockChain) GetBody(hash common.Hash) *types.Body {
|
|
// Short circuit if the body's already in the cache, retrieve otherwise
|
|
if cached, ok := bc.bodyCache.Get(hash); ok {
|
|
body := cached.(*types.Body)
|
|
return body
|
|
}
|
|
number := bc.hc.GetBlockNumber(bc.db, hash)
|
|
if number == nil {
|
|
return nil
|
|
}
|
|
body := rawdb.ReadBody(bc.db, hash, *number)
|
|
if body == nil {
|
|
return nil
|
|
}
|
|
// Cache the found body for next time and return
|
|
bc.bodyCache.Add(hash, body)
|
|
return body
|
|
}
|
|
|
|
// GetBodyRLP retrieves a block body in RLP encoding from the database by hash,
|
|
// caching it if found.
|
|
func (bc *BlockChain) GetBodyRLP(hash common.Hash) rlp.RawValue {
|
|
// Short circuit if the body's already in the cache, retrieve otherwise
|
|
if cached, ok := bc.bodyRLPCache.Get(hash); ok {
|
|
return cached.(rlp.RawValue)
|
|
}
|
|
number := bc.hc.GetBlockNumber(bc.db, hash)
|
|
if number == nil {
|
|
return nil
|
|
}
|
|
body := rawdb.ReadBodyRLP(bc.db, hash, *number)
|
|
if len(body) == 0 {
|
|
return nil
|
|
}
|
|
// Cache the found body for next time and return
|
|
bc.bodyRLPCache.Add(hash, body)
|
|
return body
|
|
}
|
|
|
|
// HasBlock checks if a block is fully present in the database or not.
|
|
func (bc *BlockChain) HasBlock(hash common.Hash, number uint64) bool {
|
|
if bc.blockCache.Contains(hash) {
|
|
return true
|
|
}
|
|
return rawdb.HasBody(bc.db, hash, number)
|
|
}
|
|
|
|
// HasFastBlock checks if a fast block is fully present in the database or not.
|
|
func (bc *BlockChain) HasFastBlock(hash common.Hash, number uint64) bool {
|
|
if !bc.HasBlock(hash, number) {
|
|
return false
|
|
}
|
|
if bc.receiptsCache.Contains(hash) {
|
|
return true
|
|
}
|
|
return rawdb.HasReceipts(bc.db, hash, number)
|
|
}
|
|
|
|
// HasBlockAndState checks if a block and associated state trie is fully present
|
|
// in the database or not, caching it if present.
|
|
func (bc *BlockChain) HasBlockAndState(hash common.Hash, number uint64) bool {
|
|
// Check first that the block itself is known
|
|
block := bc.GetBlock(hash, number)
|
|
if block == nil {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// CachedBlocks returns the hashes of the cached blocks.
|
|
func (bc *BlockChain) CachedBlocks() []common.Hash {
|
|
a := bc.blockCache.Keys()
|
|
b := make([]common.Hash, len(a))
|
|
for i := range a {
|
|
b[i] = a[i].(common.Hash)
|
|
}
|
|
return b
|
|
}
|
|
|
|
// AvailableBlocks returns the hashes of easily available blocks.
|
|
func (bc *BlockChain) AvailableBlocks() []common.Hash {
|
|
var res []common.Hash
|
|
blockNbr := bc.CurrentBlock().NumberU64()
|
|
for i := 0; i < blockCacheLimit; i++ {
|
|
block := bc.GetBlockByNumber(blockNbr)
|
|
if block == nil {
|
|
break
|
|
}
|
|
res = append(res, block.Hash())
|
|
|
|
if blockNbr == 0 {
|
|
break
|
|
}
|
|
blockNbr--
|
|
}
|
|
return res
|
|
}
|
|
|
|
// GetBlock retrieves a block from the database by hash and number,
|
|
// caching it if found.
|
|
func (bc *BlockChain) GetBlock(hash common.Hash, number uint64) *types.Block {
|
|
// Short circuit if the block's already in the cache, retrieve otherwise
|
|
if block, ok := bc.blockCache.Get(hash); ok {
|
|
return block.(*types.Block)
|
|
}
|
|
block := rawdb.ReadBlock(bc.db, hash, number)
|
|
if block == nil {
|
|
return nil
|
|
}
|
|
// Cache the found block for next time and return
|
|
bc.blockCache.Add(block.Hash(), block)
|
|
return block
|
|
}
|
|
|
|
// GetBlockByHash retrieves a block from the database by hash, caching it if found.
|
|
func (bc *BlockChain) GetBlockByHash(hash common.Hash) *types.Block {
|
|
number := bc.hc.GetBlockNumber(bc.db, hash)
|
|
if number == nil {
|
|
return nil
|
|
}
|
|
return bc.GetBlock(hash, *number)
|
|
}
|
|
|
|
// GetBlockByNumber retrieves a block from the database by number, caching it
|
|
// (associated with its hash) if found.
|
|
func (bc *BlockChain) GetBlockByNumber(number uint64) *types.Block {
|
|
hash := rawdb.ReadCanonicalHash(bc.db, number)
|
|
if hash == (common.Hash{}) {
|
|
return nil
|
|
}
|
|
return bc.GetBlock(hash, number)
|
|
}
|
|
|
|
// GetReceiptsByHash retrieves the receipts for all transactions in a given block.
|
|
func (bc *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts {
|
|
if receipts, ok := bc.receiptsCache.Get(hash); ok {
|
|
return receipts.(types.Receipts)
|
|
}
|
|
number := rawdb.ReadHeaderNumber(bc.db, hash)
|
|
if number == nil {
|
|
return nil
|
|
}
|
|
receipts := rawdb.ReadReceipts(bc.db, hash, *number, bc.chainConfig)
|
|
if receipts == nil {
|
|
return nil
|
|
}
|
|
bc.receiptsCache.Add(hash, receipts)
|
|
return receipts
|
|
}
|
|
|
|
// GetBlocksFromHash returns the block corresponding to hash and up to n-1 ancestors.
|
|
// [deprecated by eth/62]
|
|
func (bc *BlockChain) GetBlocksFromHash(hash common.Hash, n int) (blocks []*types.Block) {
|
|
number := bc.hc.GetBlockNumber(bc.db, hash)
|
|
if number == nil {
|
|
return nil
|
|
}
|
|
for i := 0; i < n; i++ {
|
|
block := bc.GetBlock(hash, *number)
|
|
if block == nil {
|
|
break
|
|
}
|
|
blocks = append(blocks, block)
|
|
hash = block.ParentHash()
|
|
*number--
|
|
}
|
|
return
|
|
}
|
|
|
|
// GetUnclesInChain retrieves all the uncles from a given block backwards until
|
|
// a specific distance is reached.
|
|
func (bc *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types.Header {
|
|
uncles := []*types.Header{}
|
|
for i := 0; block != nil && i < length; i++ {
|
|
uncles = append(uncles, block.Uncles()...)
|
|
block = bc.GetBlock(block.ParentHash(), block.NumberU64()-1)
|
|
}
|
|
return uncles
|
|
}
|
|
|
|
// ByteCode retrieves the runtime byte code associated with an account.
|
|
func (bc *BlockChain) ByteCode(addr common.Address) ([]byte, error) {
|
|
stateDB, _, err := bc.State()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return stateDB.GetCode(addr), nil
|
|
}
|
|
|
|
// Stop stops the blockchain service. If any imports are currently in progress
|
|
// it will abort them using the procInterrupt.
|
|
func (bc *BlockChain) Stop() {
|
|
if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) {
|
|
return
|
|
}
|
|
// Unsubscribe all subscriptions registered from blockchain
|
|
bc.scope.Close()
|
|
close(bc.quit)
|
|
|
|
bc.waitJobs()
|
|
|
|
if bc.pruner != nil {
|
|
bc.pruner.Stop()
|
|
}
|
|
log.Info("Blockchain stopped")
|
|
}
|
|
|
|
func (bc *BlockChain) procFutureBlocks() {
|
|
blocks := make([]*types.Block, 0, bc.futureBlocks.Len())
|
|
for _, hash := range bc.futureBlocks.Keys() {
|
|
if block, exist := bc.futureBlocks.Peek(hash); exist {
|
|
blocks = append(blocks, block.(*types.Block))
|
|
}
|
|
}
|
|
if len(blocks) > 0 {
|
|
sort.Slice(blocks, func(i, j int) bool {
|
|
return blocks[i].NumberU64() < blocks[j].NumberU64()
|
|
})
|
|
// Insert one by one as chain insertion needs contiguous ancestry between blocks
|
|
for i := range blocks {
|
|
_, _ = bc.InsertChain(context.Background(), blocks[i:i+1])
|
|
}
|
|
}
|
|
}
|
|
|
|
// WriteStatus status of write
|
|
type WriteStatus byte
|
|
|
|
const (
|
|
NonStatTy WriteStatus = iota
|
|
CanonStatTy
|
|
SideStatTy
|
|
)
|
|
|
|
// Rollback is designed to remove a chain of links from the database that aren't
|
|
// certain enough to be valid.
|
|
func (bc *BlockChain) Rollback(chain []common.Hash) {
|
|
bc.chainmu.Lock()
|
|
defer bc.chainmu.Unlock()
|
|
|
|
for i := len(chain) - 1; i >= 0; i-- {
|
|
hash := chain[i]
|
|
|
|
// Degrade the chain markers if they are explicitly reverted.
|
|
// In theory we should update all in-memory markers in the
|
|
// last step, however the direction of rollback is from high
|
|
// to low, so it's safe the update in-memory markers directly.
|
|
currentHeader := bc.hc.CurrentHeader()
|
|
if currentHeader.Hash() == hash {
|
|
bc.hc.SetCurrentHeader(bc.db, bc.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1))
|
|
}
|
|
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock.Hash() == hash {
|
|
newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1)
|
|
rawdb.WriteHeadFastBlockHash(bc.db, currentFastBlock.ParentHash())
|
|
bc.currentFastBlock.Store(newFastBlock)
|
|
headFastBlockGauge.Update(int64(newFastBlock.NumberU64()))
|
|
}
|
|
if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash {
|
|
newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1)
|
|
rawdb.WriteHeadBlockHash(bc.db, currentBlock.ParentHash())
|
|
bc.currentBlock.Store(newBlock)
|
|
headBlockGauge.Update(int64(newBlock.NumberU64()))
|
|
}
|
|
}
|
|
if _, err := bc.db.Commit(); err != nil {
|
|
log.Crit("Failed to rollback chain markers", "err", err)
|
|
}
|
|
// Truncate ancient data which exceeds the current header.
|
|
//
|
|
// Notably, it can happen that system crashes without truncating the ancient data
|
|
// but the head indicator has been updated in the active store. Regarding this issue,
|
|
// system will self recovery by truncating the extra data during the setup phase.
|
|
//if err := bc.truncateAncient(bc.hc.CurrentHeader().Number.Uint64()); err != nil {
|
|
// log.Crit("Truncate ancient store failed", "err", err)
|
|
//}
|
|
}
|
|
|
|
// truncateAncient rewinds the blockchain to the specified header and deletes all
|
|
// data in the ancient store that exceeds the specified header.
|
|
func (bc *BlockChain) truncateAncient(head uint64) error {
|
|
frozen, err := bc.db.Ancients()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Short circuit if there is no data to truncate in ancient store.
|
|
if frozen <= head+1 {
|
|
return nil
|
|
}
|
|
// Truncate all the data in the freezer beyond the specified head
|
|
if err := bc.db.TruncateAncients(head + 1); err != nil {
|
|
return err
|
|
}
|
|
// Clear out any stale content from the caches
|
|
bc.hc.headerCache.Purge()
|
|
bc.hc.tdCache.Purge()
|
|
bc.hc.numberCache.Purge()
|
|
|
|
// Clear out any stale content from the caches
|
|
bc.bodyCache.Purge()
|
|
bc.bodyRLPCache.Purge()
|
|
bc.receiptsCache.Purge()
|
|
bc.blockCache.Purge()
|
|
bc.txLookupCache.Purge()
|
|
bc.futureBlocks.Purge()
|
|
|
|
log.Info("Rewind ancient data", "number", head)
|
|
return nil
|
|
}
|
|
|
|
// numberHash is just a container for a number and a hash, to represent a block
|
|
type numberHash struct {
|
|
number uint64
|
|
hash common.Hash
|
|
}
|
|
|
|
// InsertReceiptChain attempts to complete an already existing header chain with
|
|
// transaction and receipt data.
|
|
func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts, ancientLimit uint64) (int, error) {
|
|
// We don't require the chainMu here since we want to maximize the
|
|
// concurrency of header insertion and receipt insertion.
|
|
if err := bc.addJob(); err != nil {
|
|
return 0, err
|
|
}
|
|
defer bc.doneJob()
|
|
|
|
var (
|
|
ancientBlocks, liveBlocks types.Blocks
|
|
ancientReceipts, liveReceipts []types.Receipts
|
|
)
|
|
// Do a sanity check that the provided chain is actually ordered and linked
|
|
for i := 0; i < len(blockChain); i++ {
|
|
if i != 0 {
|
|
if blockChain[i].NumberU64() != blockChain[i-1].NumberU64()+1 || blockChain[i].ParentHash() != blockChain[i-1].Hash() {
|
|
log.Error("Non contiguous receipt insert", "number", blockChain[i].Number(), "hash", blockChain[i].Hash(), "parent", blockChain[i].ParentHash(),
|
|
"prevnumber", blockChain[i-1].Number(), "prevhash", blockChain[i-1].Hash())
|
|
return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, blockChain[i-1].NumberU64(),
|
|
blockChain[i-1].Hash().Bytes()[:4], i, blockChain[i].NumberU64(), blockChain[i].Hash().Bytes()[:4], blockChain[i].ParentHash().Bytes()[:4])
|
|
}
|
|
}
|
|
if blockChain[i].NumberU64() <= ancientLimit {
|
|
ancientBlocks, ancientReceipts = append(ancientBlocks, blockChain[i]), append(ancientReceipts, receiptChain[i])
|
|
} else {
|
|
liveBlocks, liveReceipts = append(liveBlocks, blockChain[i]), append(liveReceipts, receiptChain[i])
|
|
}
|
|
}
|
|
|
|
var (
|
|
stats = struct{ processed, ignored int32 }{}
|
|
start = time.Now()
|
|
size = 0
|
|
)
|
|
// updateHead updates the head fast sync block if the inserted blocks are better
|
|
// and returns a indicator whether the inserted blocks are canonical.
|
|
updateHead := func(head *types.Block) bool {
|
|
bc.chainmu.Lock()
|
|
|
|
// Rewind may have occurred, skip in that case.
|
|
if bc.CurrentHeader().Number.Cmp(head.Number()) >= 0 {
|
|
currentFastBlock, td := bc.CurrentFastBlock(), bc.GetTd(head.Hash(), head.NumberU64())
|
|
if bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64()).Cmp(td) < 0 {
|
|
rawdb.WriteHeadFastBlockHash(bc.db, head.Hash())
|
|
bc.currentFastBlock.Store(head)
|
|
headFastBlockGauge.Update(int64(head.NumberU64()))
|
|
bc.chainmu.Unlock()
|
|
return true
|
|
}
|
|
}
|
|
bc.chainmu.Unlock()
|
|
return false
|
|
}
|
|
// writeAncient writes blockchain and corresponding receipt chain into ancient store.
|
|
//
|
|
// this function only accepts canonical chain data. All side chain will be reverted
|
|
// eventually.
|
|
/*
|
|
writeAncient := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
|
|
var (
|
|
previous = bc.CurrentFastBlock()
|
|
)
|
|
// If any error occurs before updating the head or we are inserting a side chain,
|
|
// all the data written this time wll be rolled back.
|
|
defer func() {
|
|
if previous != nil {
|
|
if err := bc.truncateAncient(previous.NumberU64()); err != nil {
|
|
log.Crit("Truncate ancient store failed", "err", err)
|
|
}
|
|
}
|
|
}()
|
|
var deleted []*numberHash
|
|
for i, block := range blockChain {
|
|
// Short circuit insertion if shutting down or processing failed
|
|
if bc.getProcInterrupt() {
|
|
return 0, errInsertionInterrupted
|
|
}
|
|
// Short circuit insertion if it is required(used in testing only)
|
|
if bc.terminateInsert != nil && bc.terminateInsert(block.Hash(), block.NumberU64()) {
|
|
return i, errors.New("insertion is terminated for testing purpose")
|
|
}
|
|
// Short circuit if the owner header is unknown
|
|
if !bc.HasHeader(block.Hash(), block.NumberU64()) {
|
|
return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4])
|
|
}
|
|
|
|
// Turbo-Geth doesn't have fast sync support
|
|
|
|
// Flush data into ancient database.
|
|
size += rawdb.WriteAncientBlock(bc.db, block, receiptChain[i], bc.GetTd(block.Hash(), block.NumberU64()))
|
|
if bc.enableTxLookupIndex {
|
|
rawdb.WriteTxLookupEntries(bc.db, block)
|
|
}
|
|
|
|
stats.processed++
|
|
}
|
|
|
|
if !updateHead(blockChain[len(blockChain)-1]) {
|
|
return 0, errors.New("side blocks can't be accepted as the ancient chain data")
|
|
}
|
|
previous = nil // disable rollback explicitly
|
|
|
|
// Wipe out canonical block data.
|
|
for _, nh := range deleted {
|
|
rawdb.DeleteBlockWithoutNumber(bc.db, nh.hash, nh.number)
|
|
rawdb.DeleteCanonicalHash(bc.db, nh.number)
|
|
}
|
|
for _, block := range blockChain {
|
|
// Always keep genesis block in active database.
|
|
if block.NumberU64() != 0 {
|
|
rawdb.DeleteBlockWithoutNumber(bc.db, block.Hash(), block.NumberU64())
|
|
rawdb.DeleteCanonicalHash(bc.db, block.NumberU64())
|
|
}
|
|
}
|
|
|
|
// Wipe out side chain too.
|
|
for _, nh := range deleted {
|
|
for _, hash := range rawdb.ReadAllHashes(bc.db, nh.number) {
|
|
rawdb.DeleteBlock(bc.db, hash, nh.number)
|
|
}
|
|
}
|
|
for _, block := range blockChain {
|
|
// Always keep genesis block in active database.
|
|
if block.NumberU64() != 0 {
|
|
for _, hash := range rawdb.ReadAllHashes(bc.db, block.NumberU64()) {
|
|
rawdb.DeleteBlock(bc.db, hash, block.NumberU64())
|
|
}
|
|
}
|
|
}
|
|
return 0, nil
|
|
}
|
|
*/
|
|
// writeLive writes blockchain and corresponding receipt chain into active store.
|
|
writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
|
|
batch := bc.db.NewBatch()
|
|
for i, block := range blockChain {
|
|
// Short circuit insertion if shutting down or processing failed
|
|
if bc.getProcInterrupt() {
|
|
return 0, errInsertionInterrupted
|
|
}
|
|
// Short circuit if the owner header is unknown
|
|
if !bc.HasHeader(block.Hash(), block.NumberU64()) {
|
|
return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4])
|
|
}
|
|
if bc.HasBlock(block.Hash(), block.NumberU64()) {
|
|
stats.ignored++
|
|
continue
|
|
}
|
|
// Write all the data out into the database
|
|
rawdb.WriteBody(context.Background(), batch, block.Hash(), block.NumberU64(), block.Body())
|
|
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i])
|
|
if bc.enableTxLookupIndex {
|
|
rawdb.WriteTxLookupEntries(batch, block)
|
|
}
|
|
|
|
stats.processed++
|
|
if batch.BatchSize() >= batch.IdealBatchSize() {
|
|
if _, err := batch.Commit(); err != nil {
|
|
return 0, err
|
|
}
|
|
size += batch.BatchSize()
|
|
batch = bc.db.NewBatch()
|
|
}
|
|
stats.processed++
|
|
}
|
|
if batch.BatchSize() > 0 {
|
|
size += batch.BatchSize()
|
|
if _, err := batch.Commit(); err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
updateHead(blockChain[len(blockChain)-1])
|
|
return 0, nil
|
|
}
|
|
|
|
// Write downloaded chain data and corresponding receipt chain data.
|
|
/*
|
|
if len(ancientBlocks) > 0 {
|
|
if n, err := writeAncient(ancientBlocks, ancientReceipts); err != nil {
|
|
if err == errInsertionInterrupted {
|
|
return 0, nil
|
|
}
|
|
return n, err
|
|
}
|
|
}
|
|
*/
|
|
if len(liveBlocks) > 0 {
|
|
if n, err := writeLive(liveBlocks, liveReceipts); err != nil {
|
|
if err == errInsertionInterrupted {
|
|
return 0, nil
|
|
}
|
|
return n, err
|
|
}
|
|
}
|
|
|
|
head := blockChain[len(blockChain)-1]
|
|
context := []interface{}{
|
|
"count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)),
|
|
"number", head.Number(), "hash", head.Hash(), "age", common.PrettyAge(time.Unix(int64(head.Time()), 0)),
|
|
"size", common.StorageSize(size),
|
|
}
|
|
if stats.ignored > 0 {
|
|
context = append(context, []interface{}{"ignored", stats.ignored}...)
|
|
}
|
|
log.Info("Imported new block receipts", context...)
|
|
|
|
return 0, nil
|
|
}
|
|
|
|
// WriteBlockWithState writes the block and all associated state to the database.
|
|
func (bc *BlockChain) WriteBlockWithState(ctx context.Context, block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.IntraBlockState, tds *state.TrieDbState, emitHeadEvent bool, execute bool) (status WriteStatus, err error) {
|
|
if err = bc.addJob(); err != nil {
|
|
return NonStatTy, err
|
|
}
|
|
defer bc.doneJob()
|
|
|
|
bc.chainmu.Lock()
|
|
defer bc.chainmu.Unlock()
|
|
|
|
return bc.writeBlockWithState(ctx, block, receipts, logs, state, tds, emitHeadEvent, execute)
|
|
}
|
|
|
|
// writeBlockWithState writes the block and all associated state to the database,
|
|
// but is expects the chain mutex to be held.
|
|
func (bc *BlockChain) writeBlockWithState(ctx context.Context, block *types.Block, receipts []*types.Receipt, logs []*types.Log, stateDb *state.IntraBlockState, tds *state.TrieDbState, emitHeadEvent bool, execute bool) (status WriteStatus, err error) {
|
|
// Make sure no inconsistent state is leaked during insertion
|
|
currentBlock := bc.CurrentBlock()
|
|
|
|
// Calculate the total difficulty of the block
|
|
ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
|
|
if ptd == nil {
|
|
return NonStatTy, consensus.ErrUnknownAncestor
|
|
}
|
|
externTd := new(big.Int).Add(block.Difficulty(), ptd)
|
|
|
|
// Irrelevant of the canonical status, write the block itself to the database
|
|
if common.IsCanceled(ctx) {
|
|
return NonStatTy, ctx.Err()
|
|
}
|
|
if execute {
|
|
rawdb.WriteTd(bc.db, block.Hash(), block.NumberU64(), externTd)
|
|
}
|
|
rawdb.WriteBody(ctx, bc.db, block.Hash(), block.NumberU64(), block.Body())
|
|
if execute {
|
|
rawdb.WriteHeader(ctx, bc.db, block.Header())
|
|
}
|
|
|
|
if tds != nil {
|
|
tds.SetBlockNr(block.NumberU64())
|
|
}
|
|
|
|
ctx = bc.WithContext(ctx, block.Number())
|
|
if stateDb != nil && execute {
|
|
blockWriter := tds.DbStateWriter()
|
|
if err := stateDb.CommitBlock(ctx, blockWriter); err != nil {
|
|
return NonStatTy, err
|
|
}
|
|
// Always write changesets
|
|
if err := blockWriter.WriteChangeSets(); err != nil {
|
|
return NonStatTy, err
|
|
}
|
|
// Optionally write history
|
|
if !bc.NoHistory() {
|
|
if err := blockWriter.WriteHistory(); err != nil {
|
|
return NonStatTy, err
|
|
}
|
|
}
|
|
}
|
|
if bc.enableReceipts && !bc.cacheConfig.DownloadOnly && execute {
|
|
rawdb.WriteReceipts(bc.db, block.Hash(), block.NumberU64(), receipts)
|
|
}
|
|
|
|
// If the total difficulty is higher than our known, add it to the canonical chain
|
|
// Second clause in the if statement reduces the vulnerability to selfish mining.
|
|
// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
|
|
//reorg := externTd.Cmp(localTd) > 0
|
|
//currentBlock = bc.CurrentBlock()
|
|
//if !reorg && externTd.Cmp(localTd) == 0 {
|
|
// // Split same-difficulty blocks by number, then preferentially select
|
|
// // the block generated by the local miner as the canonical block.
|
|
// if block.NumberU64() < currentBlock.NumberU64() {
|
|
// reorg = true
|
|
// } else if block.NumberU64() == currentBlock.NumberU64() {
|
|
// var currentPreserve, blockPreserve bool
|
|
// if bc.shouldPreserve != nil {
|
|
// currentPreserve, blockPreserve = bc.shouldPreserve(currentBlock), bc.shouldPreserve(block)
|
|
// }
|
|
// reorg = !currentPreserve && (blockPreserve || mrand.Float64() < 0.5)
|
|
// }
|
|
//}
|
|
//if reorg {
|
|
// Reorganise the chain if the parent is not the head block
|
|
|
|
if execute && block.ParentHash() != currentBlock.Hash() {
|
|
if err := bc.reorg(currentBlock, block); err != nil {
|
|
return NonStatTy, err
|
|
}
|
|
}
|
|
// Write the positional metadata for transaction/receipt lookups and preimages
|
|
|
|
if stateDb != nil && bc.enablePreimages && !bc.cacheConfig.DownloadOnly && execute {
|
|
rawdb.WritePreimages(bc.db, stateDb.Preimages())
|
|
}
|
|
|
|
status = CanonStatTy
|
|
|
|
// Set new head.
|
|
if execute {
|
|
bc.writeHeadBlock(block)
|
|
}
|
|
bc.futureBlocks.Remove(block.Hash())
|
|
|
|
if execute {
|
|
bc.chainFeed.Send(ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
|
|
if len(logs) > 0 {
|
|
bc.logsFeed.Send(logs)
|
|
}
|
|
}
|
|
// In theory we should fire a ChainHeadEvent when we inject
|
|
// a canonical block, but sometimes we can insert a batch of
|
|
// canonicial blocks. Avoid firing too much ChainHeadEvents,
|
|
// we will fire an accumulated ChainHeadEvent and disable fire
|
|
// event here.
|
|
if emitHeadEvent && execute {
|
|
bc.chainHeadFeed.Send(ChainHeadEvent{Block: block})
|
|
}
|
|
return status, nil
|
|
}
|
|
|
|
// addFutureBlock checks if the block is within the max allowed window to get
|
|
// accepted for future processing, and returns an error if the block is too far
|
|
// ahead and was not added.
|
|
func (bc *BlockChain) addFutureBlock(block *types.Block) error {
|
|
max := uint64(time.Now().Unix() + maxTimeFutureBlocks)
|
|
if block.Time() > max {
|
|
return fmt.Errorf("future block timestamp %v > allowed %v", block.Time(), max)
|
|
}
|
|
bc.futureBlocks.Add(block.Hash(), block)
|
|
return nil
|
|
}
|
|
|
|
// InsertBodyChain attempts to insert the given batch of block into the
|
|
// canonical chain, without executing those blocks
|
|
func (bc *BlockChain) InsertBodyChain(ctx context.Context, chain types.Blocks) (int, error) {
|
|
// Sanity check that we have something meaningful to import
|
|
if len(chain) == 0 {
|
|
return 0, nil
|
|
}
|
|
|
|
// Remove already known canon-blocks
|
|
var (
|
|
block, prev *types.Block
|
|
)
|
|
// Do a sanity check that the provided chain is actually ordered and linked
|
|
for i := 1; i < len(chain); i++ {
|
|
block = chain[i]
|
|
prev = chain[i-1]
|
|
if block.NumberU64() != prev.NumberU64()+1 || block.ParentHash() != prev.Hash() {
|
|
// Chain broke ancestry, log a message (programming error) and skip insertion
|
|
log.Error("Non contiguous block insert", "number", block.Number(), "hash", block.Hash(),
|
|
"parent", block.ParentHash(), "prevnumber", prev.Number(), "prevhash", prev.Hash())
|
|
|
|
return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, prev.NumberU64(),
|
|
prev.Hash().Bytes()[:4], i, block.NumberU64(), block.Hash().Bytes()[:4], block.ParentHash().Bytes()[:4])
|
|
}
|
|
}
|
|
// Only insert if the difficulty of the inserted chain is bigger than existing chain
|
|
// Pre-checks passed, start the full block imports
|
|
if err := bc.addJob(); err != nil {
|
|
return 0, err
|
|
}
|
|
ctx = bc.WithContext(ctx, chain[0].Number())
|
|
bc.chainmu.Lock()
|
|
defer func() {
|
|
bc.chainmu.Unlock()
|
|
bc.doneJob()
|
|
}()
|
|
n, err := bc.insertChain(ctx, chain, false, false /* execute */)
|
|
|
|
return n, err
|
|
}
|
|
|
|
// InsertChain attempts to insert the given batch of blocks in to the canonical
|
|
// chain or, otherwise, create a fork. If an error is returned it will return
|
|
// the index number of the failing block as well an error describing what went
|
|
// wrong.
|
|
//
|
|
// After insertion is done, all accumulated events will be fired.
|
|
func (bc *BlockChain) InsertChain(ctx context.Context, chain types.Blocks) (int, error) {
|
|
// Sanity check that we have something meaningful to import
|
|
if len(chain) == 0 {
|
|
return 0, nil
|
|
}
|
|
|
|
bc.blockProcFeed.Send(true)
|
|
defer bc.blockProcFeed.Send(false)
|
|
|
|
// Remove already known canon-blocks
|
|
var (
|
|
block, prev *types.Block
|
|
)
|
|
// Do a sanity check that the provided chain is actually ordered and linked
|
|
for i := 1; i < len(chain); i++ {
|
|
block = chain[i]
|
|
prev = chain[i-1]
|
|
if block.NumberU64() != prev.NumberU64()+1 || block.ParentHash() != prev.Hash() {
|
|
// Chain broke ancestry, log a message (programming error) and skip insertion
|
|
log.Error("Non contiguous block insert", "number", block.Number(), "hash", block.Hash(),
|
|
"parent", block.ParentHash(), "prevnumber", prev.Number(), "prevhash", prev.Hash())
|
|
|
|
return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, prev.NumberU64(),
|
|
prev.Hash().Bytes()[:4], i, block.NumberU64(), block.Hash().Bytes()[:4], block.ParentHash().Bytes()[:4])
|
|
}
|
|
}
|
|
// Only insert if the difficulty of the inserted chain is bigger than existing chain
|
|
// Pre-checks passed, start the full block imports
|
|
if err := bc.addJob(); err != nil {
|
|
return 0, err
|
|
}
|
|
ctx = bc.WithContext(ctx, chain[0].Number())
|
|
bc.chainmu.Lock()
|
|
defer func() {
|
|
bc.chainmu.Unlock()
|
|
bc.doneJob()
|
|
}()
|
|
n, err := bc.insertChain(ctx, chain, true, true /* execute */)
|
|
|
|
return n, err
|
|
}
|
|
|
|
// insertChain is the internal implementation of InsertChain, which assumes that
|
|
// 1) chains are contiguous, and 2) The chain mutex is held.
|
|
//
|
|
// This method is split out so that import batches that require re-injecting
|
|
// historical blocks can do so without releasing the lock, which could lead to
|
|
// racey behaviour. If a sidechain import is in progress, and the historic state
|
|
// is imported, but then new canon-head is added before the actual sidechain
|
|
// completes, then the historic state could be pruned again
|
|
func (bc *BlockChain) insertChain(ctx context.Context, chain types.Blocks, verifySeals bool, execute bool) (int, error) {
|
|
log.Info("Inserting chain",
|
|
"start", chain[0].NumberU64(), "end", chain[len(chain)-1].NumberU64(),
|
|
"current", bc.CurrentBlock().Number().Uint64(), "currentHeader", bc.CurrentHeader().Number.Uint64())
|
|
|
|
// If the chain is terminating, don't even bother starting u
|
|
if bc.getProcInterrupt() {
|
|
return 0, nil
|
|
}
|
|
// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
|
|
senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain)
|
|
|
|
// A queued approach to delivering events. This is generally
|
|
// faster than direct delivery and requires much less mutex
|
|
// acquiring.
|
|
var (
|
|
stats = insertStats{startTime: mclock.Now()}
|
|
lastCanon *types.Block
|
|
)
|
|
// Fire a single chain head event if we've progressed the chain
|
|
if execute {
|
|
defer func() {
|
|
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
|
|
bc.chainHeadFeed.Send(ChainHeadEvent{lastCanon})
|
|
}
|
|
}()
|
|
}
|
|
// Start the parallel header verifier
|
|
headers := make([]*types.Header, len(chain))
|
|
seals := make([]bool, len(chain))
|
|
|
|
for i, block := range chain {
|
|
headers[i] = block.Header()
|
|
seals[i] = verifySeals
|
|
}
|
|
var abort chan<- struct{}
|
|
var results <-chan error
|
|
if execute {
|
|
abort, results = bc.engine.VerifyHeaders(bc, headers, seals)
|
|
defer close(abort)
|
|
}
|
|
|
|
externTd := big.NewInt(0)
|
|
if len(chain) > 0 && chain[0].NumberU64() > 0 {
|
|
d := bc.GetTd(chain[0].ParentHash(), chain[0].NumberU64()-1)
|
|
if d != nil {
|
|
externTd = externTd.Set(d)
|
|
}
|
|
}
|
|
|
|
localTd := bc.GetTd(bc.CurrentBlock().Hash(), bc.CurrentBlock().NumberU64())
|
|
|
|
var verifyFrom int
|
|
for verifyFrom = 0; verifyFrom < len(chain) && localTd.Cmp(externTd) >= 0; verifyFrom++ {
|
|
header := chain[verifyFrom].Header()
|
|
if execute {
|
|
err := <-results
|
|
if err != nil {
|
|
bc.reportBlock(chain[verifyFrom], nil, err)
|
|
return 0, err
|
|
}
|
|
}
|
|
externTd = externTd.Add(externTd, header.Difficulty)
|
|
}
|
|
|
|
if execute && localTd.Cmp(externTd) >= 0 {
|
|
log.Warn("Ignoring the chain segment because of insufficient difficulty",
|
|
"external", externTd,
|
|
"local", localTd,
|
|
"insertingNumber", chain[0].NumberU64(),
|
|
"currentNumber", bc.CurrentBlock().Number().Uint64(),
|
|
)
|
|
|
|
// But we still write the blocks to the database because others might build on top of them
|
|
td := bc.GetTd(chain[0].ParentHash(), chain[0].NumberU64()-1)
|
|
for _, block := range chain {
|
|
log.Warn("Saving", "block", block.NumberU64(), "hash", block.Hash())
|
|
td = new(big.Int).Add(block.Difficulty(), td)
|
|
rawdb.WriteBlock(ctx, bc.db, block)
|
|
rawdb.WriteTd(bc.db, block.Hash(), block.NumberU64(), td)
|
|
}
|
|
return 0, nil
|
|
}
|
|
var offset int
|
|
var parent *types.Block
|
|
var parentNumber = chain[0].NumberU64() - 1
|
|
// Find correct insertion point for this chain
|
|
|
|
preBlocks := []*types.Block{}
|
|
parentHash := chain[0].ParentHash()
|
|
parent = bc.GetBlock(parentHash, parentNumber)
|
|
if parent == nil {
|
|
log.Error("chain segment could not be inserted, missing parent", "hash", parentHash)
|
|
return 0, fmt.Errorf("chain segment could not be inserted, missing parent %x", parentHash)
|
|
}
|
|
|
|
canonicalHash := rawdb.ReadCanonicalHash(bc.db, parentNumber)
|
|
for canonicalHash != parentHash {
|
|
log.Warn("Chain segment's parent not on canonical hash, adding to pre-blocks", "block", parentNumber, "hash", parentHash)
|
|
preBlocks = append(preBlocks, parent)
|
|
parentNumber--
|
|
parentHash = parent.ParentHash()
|
|
parent = bc.GetBlock(parentHash, parentNumber)
|
|
if parent == nil {
|
|
log.Error("chain segment could not be inserted, missing parent", "hash", parentHash)
|
|
return 0, fmt.Errorf("chain segment could not be inserted, missing parent %x", parentHash)
|
|
}
|
|
canonicalHash = rawdb.ReadCanonicalHash(bc.db, parentNumber)
|
|
}
|
|
|
|
for left, right := 0, len(preBlocks)-1; left < right; left, right = left+1, right-1 {
|
|
preBlocks[left], preBlocks[right] = preBlocks[right], preBlocks[left]
|
|
}
|
|
|
|
offset = len(preBlocks)
|
|
if offset > 0 {
|
|
chain = append(preBlocks, chain...)
|
|
}
|
|
|
|
var k int
|
|
var committedK int
|
|
// Iterate over the blocks and insert when the verifier permits
|
|
for i, block := range chain {
|
|
start := time.Now()
|
|
if i >= offset {
|
|
k = i - offset
|
|
} else {
|
|
k = 0
|
|
}
|
|
|
|
// If the chain is terminating, stop processing blocks
|
|
if bc.getProcInterrupt() {
|
|
log.Debug("Premature abort during blocks processing")
|
|
break
|
|
}
|
|
|
|
// If the header is a banned one, straight out abort
|
|
if BadHashes[block.Hash()] {
|
|
bc.reportBlock(block, nil, ErrBlacklistedHash)
|
|
return k, ErrBlacklistedHash
|
|
}
|
|
|
|
// Wait for the block's verification to complete
|
|
var err error
|
|
if i >= offset && k >= verifyFrom && execute {
|
|
err = <-results
|
|
}
|
|
if err == nil {
|
|
ctx, _ = params.GetNoHistoryByBlock(ctx, block.Number())
|
|
err = bc.Validator().ValidateBody(ctx, block)
|
|
}
|
|
|
|
switch {
|
|
case err == ErrKnownBlock:
|
|
// Block and state both already known. However if the current block is below
|
|
// this number we did a rollback and we should reimport it nonetheless.
|
|
if bc.CurrentBlock().NumberU64() >= block.NumberU64() {
|
|
//fmt.Printf("Skipped known block %d\n", block.NumberU64())
|
|
stats.ignored++
|
|
continue
|
|
}
|
|
|
|
case err == consensus.ErrFutureBlock:
|
|
// Allow up to MaxFuture second in the future blocks. If this limit is exceeded
|
|
// the chain is discarded and processed at a later time if given.
|
|
max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks)
|
|
if block.Time() > max.Uint64() {
|
|
return k, fmt.Errorf("future block: %v > %v", block.Time(), max)
|
|
}
|
|
bc.futureBlocks.Add(block.Hash(), block)
|
|
stats.queued++
|
|
continue
|
|
|
|
case err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(block.ParentHash()):
|
|
bc.futureBlocks.Add(block.Hash(), block)
|
|
stats.queued++
|
|
continue
|
|
|
|
case err == consensus.ErrPrunedAncestor:
|
|
// Block competing with the canonical chain, store in the db, but don't process
|
|
// until the competitor TD goes above the canonical TD
|
|
panic(err)
|
|
|
|
case err != nil:
|
|
bc.reportBlock(block, nil, err)
|
|
return k, err
|
|
}
|
|
// Create a new statedb using the parent block and report an
|
|
// error if it fails.
|
|
if i > 0 {
|
|
parent = chain[i-1]
|
|
}
|
|
readBlockNr := parentNumber
|
|
var root common.Hash
|
|
if bc.trieDbState == nil && !bc.cacheConfig.DownloadOnly && execute {
|
|
if _, err = bc.GetTrieDbState(); err != nil {
|
|
return k, err
|
|
}
|
|
}
|
|
|
|
if !bc.cacheConfig.DownloadOnly && execute {
|
|
root = bc.trieDbState.LastRoot()
|
|
}
|
|
|
|
var parentRoot common.Hash
|
|
if parent != nil {
|
|
parentRoot = parent.Root()
|
|
}
|
|
|
|
if parent != nil && root != parentRoot && !bc.cacheConfig.DownloadOnly && execute {
|
|
log.Info("Rewinding from", "block", bc.CurrentBlock().NumberU64(), "to block", readBlockNr,
|
|
"root", root.String(), "parentRoot", parentRoot.String())
|
|
|
|
if _, err = bc.db.Commit(); err != nil {
|
|
log.Error("Could not commit chainDb before rewinding", "error", err)
|
|
if bc.committedBlock.Load() != nil {
|
|
bc.currentBlock.Store(bc.committedBlock.Load())
|
|
}
|
|
bc.db.Rollback()
|
|
bc.setTrieDbState(nil)
|
|
return k, err
|
|
}
|
|
bc.committedBlock.Store(bc.currentBlock.Load())
|
|
|
|
if err = bc.trieDbState.UnwindTo(readBlockNr); err != nil {
|
|
bc.db.Rollback()
|
|
log.Error("Could not rewind", "error", err)
|
|
bc.setTrieDbState(nil)
|
|
return k, err
|
|
}
|
|
|
|
root := bc.trieDbState.LastRoot()
|
|
if root != parentRoot {
|
|
log.Error("Incorrect rewinding", "root", fmt.Sprintf("%x", root), "expected", fmt.Sprintf("%x", parentRoot))
|
|
bc.db.Rollback()
|
|
bc.setTrieDbState(nil)
|
|
return k, fmt.Errorf("incorrect rewinding: wrong root %x, expected %x", root, parentRoot)
|
|
}
|
|
currentBlock := bc.CurrentBlock()
|
|
if err = bc.reorg(currentBlock, parent); err != nil {
|
|
bc.db.Rollback()
|
|
bc.setTrieDbState(nil)
|
|
return k, err
|
|
}
|
|
|
|
if _, err = bc.db.Commit(); err != nil {
|
|
log.Error("Could not commit chainDb after rewinding", "error", err)
|
|
bc.db.Rollback()
|
|
bc.setTrieDbState(nil)
|
|
if bc.committedBlock.Load() != nil {
|
|
bc.currentBlock.Store(bc.committedBlock.Load())
|
|
}
|
|
return k, err
|
|
}
|
|
committedK = k
|
|
bc.committedBlock.Store(bc.currentBlock.Load())
|
|
}
|
|
|
|
var stateDB *state.IntraBlockState
|
|
var receipts types.Receipts
|
|
var usedGas uint64
|
|
var logs []*types.Log
|
|
if !bc.cacheConfig.DownloadOnly && execute {
|
|
stateDB = state.New(bc.trieDbState)
|
|
// Process block using the parent state as reference point.
|
|
receipts, logs, usedGas, root, err = bc.processor.PreProcess(block, stateDB, bc.trieDbState, bc.vmConfig)
|
|
reuseTrieDbState := true
|
|
if err != nil {
|
|
bc.rollbackBadBlock(block, receipts, err, reuseTrieDbState)
|
|
return k, err
|
|
}
|
|
|
|
err = bc.Validator().ValidateGasAndRoot(block, root, usedGas, bc.trieDbState)
|
|
if err != nil {
|
|
bc.rollbackBadBlock(block, receipts, err, reuseTrieDbState)
|
|
return k, err
|
|
}
|
|
|
|
reuseTrieDbState = false
|
|
err = bc.processor.PostProcess(block, bc.trieDbState, receipts)
|
|
if err != nil {
|
|
bc.rollbackBadBlock(block, receipts, err, reuseTrieDbState)
|
|
return k, err
|
|
}
|
|
|
|
err = bc.Validator().ValidateReceipts(block, receipts)
|
|
if err != nil {
|
|
bc.rollbackBadBlock(block, receipts, err, reuseTrieDbState)
|
|
return k, err
|
|
}
|
|
}
|
|
proctime := time.Since(start)
|
|
|
|
// Write the block to the chain and get the status.
|
|
status, err := bc.writeBlockWithState(ctx, block, receipts, logs, stateDB, bc.trieDbState, false, execute)
|
|
if err != nil {
|
|
bc.db.Rollback()
|
|
bc.setTrieDbState(nil)
|
|
if bc.committedBlock.Load() != nil {
|
|
bc.currentBlock.Store(bc.committedBlock.Load())
|
|
}
|
|
return k, err
|
|
}
|
|
|
|
switch status {
|
|
case CanonStatTy:
|
|
log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(),
|
|
"uncles", len(block.Uncles()), "txs", len(block.Transactions()), "gas", block.GasUsed(),
|
|
"elapsed", common.PrettyDuration(time.Since(start)),
|
|
"root", block.Root())
|
|
|
|
lastCanon = block
|
|
|
|
// Only count canonical blocks for GC processing time
|
|
bc.gcproc += proctime
|
|
|
|
case SideStatTy:
|
|
log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(),
|
|
"diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)),
|
|
"txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()),
|
|
"root", block.Root())
|
|
|
|
default:
|
|
// This in theory is impossible, but lets be nice to our future selves and leave
|
|
// a log, instead of trying to track down blocks imports that don't emit logs.
|
|
log.Warn("Inserted block with unknown status", "number", block.Number(), "hash", block.Hash(),
|
|
"diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)),
|
|
"txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()),
|
|
"root", block.Root())
|
|
}
|
|
stats.processed++
|
|
stats.usedGas += usedGas
|
|
toCommit := stats.needToCommit(chain, bc.db, i)
|
|
stats.report(chain, i, bc.db, toCommit)
|
|
if toCommit {
|
|
var written uint64
|
|
if written, err = bc.db.Commit(); err != nil {
|
|
log.Error("Could not commit chainDb", "error", err)
|
|
bc.db.Rollback()
|
|
bc.setTrieDbState(nil)
|
|
if bc.committedBlock.Load() != nil {
|
|
bc.currentBlock.Store(bc.committedBlock.Load())
|
|
}
|
|
return k, err
|
|
}
|
|
bc.committedBlock.Store(bc.currentBlock.Load())
|
|
committedK = k
|
|
if bc.trieDbState != nil {
|
|
bc.trieDbState.EvictTries(false)
|
|
}
|
|
log.Info("Database", "size", bc.db.DiskSize(), "written", written)
|
|
}
|
|
}
|
|
|
|
return committedK + 1, nil
|
|
}
|
|
|
|
// statsReportLimit is the time limit during import and export after which we
|
|
// always print out progress. This avoids the user wondering what's going on.
|
|
const statsReportLimit = 8 * time.Second
|
|
const commitLimit = 60 * time.Second
|
|
|
|
func (st *insertStats) needToCommit(chain []*types.Block, db ethdb.DbWithPendingMutations, index int) bool {
|
|
var (
|
|
now = mclock.Now()
|
|
elapsed = time.Duration(now) - time.Duration(st.startTime)
|
|
)
|
|
if index == len(chain)-1 || elapsed >= commitLimit || db.BatchSize() >= db.IdealBatchSize() {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// report prints statistics if some number of blocks have been processed
|
|
// or more than a few seconds have passed since the last message.
|
|
func (st *insertStats) report(chain []*types.Block, index int, batch ethdb.DbWithPendingMutations, toCommit bool) {
|
|
// Fetch the timings for the batch
|
|
var (
|
|
now = mclock.Now()
|
|
elapsed = time.Duration(now) - time.Duration(st.startTime)
|
|
)
|
|
// If we're at the last block of the batch or report period reached, log
|
|
if index == len(chain)-1 || elapsed >= statsReportLimit || toCommit {
|
|
// Count the number of transactions in this segment
|
|
var txs int
|
|
for _, block := range chain[st.lastIndex : index+1] {
|
|
txs += len(block.Transactions())
|
|
}
|
|
end := chain[index]
|
|
context := []interface{}{
|
|
"blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000,
|
|
"elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed),
|
|
"number", end.Number(), "hash", end.Hash(), "batch", batch.BatchSize(),
|
|
}
|
|
if timestamp := time.Unix(int64(end.Time()), 0); time.Since(timestamp) > time.Minute {
|
|
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
|
|
}
|
|
if st.queued > 0 {
|
|
context = append(context, []interface{}{"queued", st.queued}...)
|
|
}
|
|
if st.ignored > 0 {
|
|
context = append(context, []interface{}{"ignored", st.ignored}...)
|
|
}
|
|
log.Info("Imported new chain segment", context...)
|
|
*st = insertStats{startTime: now, lastIndex: index + 1}
|
|
}
|
|
}
|
|
|
|
// reorg takes two blocks, an old chain and a new chain and will reconstruct the
|
|
// blocks and inserts them to be part of the new canonical chain and accumulates
|
|
// potential missing transactions and post an event about them.
|
|
func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
|
|
var (
|
|
newChain types.Blocks
|
|
oldChain types.Blocks
|
|
commonBlock *types.Block
|
|
|
|
deletedTxs types.Transactions
|
|
addedTxs types.Transactions
|
|
|
|
deletedLogs [][]*types.Log
|
|
rebirthLogs [][]*types.Log
|
|
|
|
// collectLogs collects the logs that were generated or removed during
|
|
// the processing of the block that corresponds with the given hash.
|
|
// These logs are later announced as deleted or reborn
|
|
collectLogs = func(hash common.Hash, removed bool) {
|
|
// Coalesce logs and set 'Removed'.
|
|
number := bc.hc.GetBlockNumber(bc.db, hash)
|
|
if number == nil {
|
|
return
|
|
}
|
|
receipts := rawdb.ReadReceipts(bc.db, hash, *number, bc.chainConfig)
|
|
|
|
var logs []*types.Log
|
|
for _, receipt := range receipts {
|
|
for _, log := range receipt.Logs {
|
|
l := *log
|
|
if removed {
|
|
l.Removed = true
|
|
} else {
|
|
}
|
|
logs = append(logs, &l)
|
|
}
|
|
}
|
|
if len(logs) > 0 {
|
|
if removed {
|
|
deletedLogs = append(deletedLogs, logs)
|
|
} else {
|
|
rebirthLogs = append(rebirthLogs, logs)
|
|
}
|
|
}
|
|
}
|
|
// mergeLogs returns a merged log slice with specified sort order.
|
|
mergeLogs = func(logs [][]*types.Log, reverse bool) []*types.Log {
|
|
var ret []*types.Log
|
|
if reverse {
|
|
for i := len(logs) - 1; i >= 0; i-- {
|
|
ret = append(ret, logs[i]...)
|
|
}
|
|
} else {
|
|
for i := 0; i < len(logs); i++ {
|
|
ret = append(ret, logs[i]...)
|
|
}
|
|
}
|
|
return ret
|
|
}
|
|
)
|
|
|
|
// Reduce the longer chain to the same number as the shorter one
|
|
// first reduce whoever is higher bound
|
|
if oldBlock.NumberU64() > newBlock.NumberU64() {
|
|
// Old chain is longer, gather all transactions and logs as deleted ones
|
|
for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) {
|
|
oldChain = append(oldChain, oldBlock)
|
|
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
|
|
collectLogs(oldBlock.Hash(), true)
|
|
}
|
|
} else {
|
|
// New chain is longer, stash all blocks away for subsequent insertion
|
|
for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) {
|
|
newChain = append(newChain, newBlock)
|
|
}
|
|
}
|
|
if oldBlock == nil {
|
|
return fmt.Errorf("invalid old chain")
|
|
}
|
|
if newBlock == nil {
|
|
return fmt.Errorf("invalid new chain")
|
|
}
|
|
// Both sides of the reorg are at the same number, reduce both until the common
|
|
// ancestor is found
|
|
for {
|
|
// If the common ancestor was found, bail out
|
|
if oldBlock.Hash() == newBlock.Hash() {
|
|
commonBlock = oldBlock
|
|
break
|
|
}
|
|
// Remove an old block as well as stash away a new block
|
|
oldChain = append(oldChain, oldBlock)
|
|
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
|
|
collectLogs(oldBlock.Hash(), true)
|
|
|
|
newChain = append(newChain, newBlock)
|
|
|
|
// Step back with both chains
|
|
oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1)
|
|
if oldBlock == nil {
|
|
return fmt.Errorf("invalid old chain")
|
|
}
|
|
newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1)
|
|
if newBlock == nil {
|
|
return fmt.Errorf("invalid new chain")
|
|
}
|
|
}
|
|
// Ensure the user sees large reorgs
|
|
if len(oldChain) > 0 && len(newChain) > 0 {
|
|
logFn := log.Info
|
|
msg := "Chain reorg detected"
|
|
if len(oldChain) > 63 {
|
|
msg = "Large chain reorg detected"
|
|
logFn = log.Warn
|
|
}
|
|
logFn(msg, "number", commonBlock.Number(), "hash", commonBlock.Hash(),
|
|
"drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].Hash())
|
|
blockReorgAddMeter.Mark(int64(len(newChain)))
|
|
blockReorgDropMeter.Mark(int64(len(oldChain)))
|
|
} else {
|
|
log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "newnum", newBlock.Number(), "newhash", newBlock.Hash())
|
|
}
|
|
// Delete the old chain
|
|
for _, oldBlock := range oldChain {
|
|
rawdb.DeleteCanonicalHash(bc.db, oldBlock.NumberU64())
|
|
}
|
|
bc.writeHeadBlock(commonBlock)
|
|
// Insert the new chain, taking care of the proper incremental order
|
|
for i := len(newChain) - 1; i >= 0; i-- {
|
|
// insert the block in the canonical way, re-writing history
|
|
bc.writeHeadBlock(newChain[i])
|
|
|
|
// Collect reborn logs due to chain reorg
|
|
collectLogs(newChain[i].Hash(), false)
|
|
|
|
// Write lookup entries for hash based transaction/receipt searches
|
|
if bc.enableTxLookupIndex {
|
|
rawdb.WriteTxLookupEntries(bc.db, newChain[i])
|
|
}
|
|
addedTxs = append(addedTxs, newChain[i].Transactions()...)
|
|
}
|
|
// When transactions get deleted from the database, the receipts that were
|
|
// created in the fork must also be deleted
|
|
for _, tx := range types.TxDifference(deletedTxs, addedTxs) {
|
|
rawdb.DeleteTxLookupEntry(bc.db, tx.Hash())
|
|
}
|
|
// Delete any canonical number assignments above the new head
|
|
number := bc.CurrentBlock().NumberU64()
|
|
for i := number + 1; ; i++ {
|
|
hash := rawdb.ReadCanonicalHash(bc.db, i)
|
|
if hash == (common.Hash{}) {
|
|
break
|
|
}
|
|
rawdb.DeleteCanonicalHash(bc.db, i)
|
|
}
|
|
|
|
if _, err := bc.db.Commit(); err != nil {
|
|
return err
|
|
}
|
|
// If any logs need to be fired, do it now. In theory we could avoid creating
|
|
// this goroutine if there are no events to fire, but realistcally that only
|
|
// ever happens if we're reorging empty blocks, which will only happen on idle
|
|
// networks where performance is not an issue either way.
|
|
if len(deletedLogs) > 0 {
|
|
bc.rmLogsFeed.Send(RemovedLogsEvent{mergeLogs(deletedLogs, true)})
|
|
}
|
|
if len(rebirthLogs) > 0 {
|
|
bc.logsFeed.Send(mergeLogs(rebirthLogs, false))
|
|
}
|
|
if len(oldChain) > 0 {
|
|
for i := len(oldChain) - 1; i >= 0; i-- {
|
|
bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]})
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// PostChainEvents iterates over the events generated by a chain insertion and
|
|
// posts them into the event feed.
|
|
// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock.
|
|
func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
|
|
// post event logs for further processing
|
|
if logs != nil {
|
|
bc.logsFeed.Send(logs)
|
|
}
|
|
for _, event := range events {
|
|
switch ev := event.(type) {
|
|
case ChainEvent:
|
|
bc.chainFeed.Send(ev)
|
|
|
|
case ChainHeadEvent:
|
|
bc.chainHeadFeed.Send(ev)
|
|
|
|
case ChainSideEvent:
|
|
bc.chainSideFeed.Send(ev)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (bc *BlockChain) update() {
|
|
futureTimer := time.NewTicker(5 * time.Second)
|
|
defer futureTimer.Stop()
|
|
for {
|
|
select {
|
|
case <-futureTimer.C:
|
|
bc.procFutureBlocks()
|
|
case <-bc.quit:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// BadBlocks returns a list of the last 'bad blocks' that the client has seen on the network
|
|
func (bc *BlockChain) BadBlocks() []*types.Block {
|
|
blocks := make([]*types.Block, 0, bc.badBlocks.Len())
|
|
for _, hash := range bc.badBlocks.Keys() {
|
|
if blk, exist := bc.badBlocks.Peek(hash); exist {
|
|
block := blk.(*types.Block)
|
|
blocks = append(blocks, block)
|
|
}
|
|
}
|
|
return blocks
|
|
}
|
|
|
|
// addBadBlock adds a bad block to the bad-block LRU cache
|
|
func (bc *BlockChain) addBadBlock(block *types.Block) {
|
|
bc.badBlocks.Add(block.Hash(), block)
|
|
}
|
|
|
|
// reportBlock logs a bad block error.
|
|
func (bc *BlockChain) reportBlock(block *types.Block, receipts types.Receipts, err error) {
|
|
bc.addBadBlock(block)
|
|
|
|
var receiptString string
|
|
for i, receipt := range receipts {
|
|
receiptString += fmt.Sprintf("\t %d: cumulative: %v gas: %v contract: %v status: %v tx: %v logs: %v bloom: %x state: %x\n",
|
|
i, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.ContractAddress.Hex(),
|
|
receipt.Status, receipt.TxHash.Hex(), receipt.Logs, receipt.Bloom, receipt.PostState)
|
|
}
|
|
log.Error(fmt.Sprintf(`
|
|
########## BAD BLOCK #########
|
|
Chain config: %v
|
|
|
|
Number: %v
|
|
Hash: 0x%x
|
|
%v
|
|
|
|
Error: %v
|
|
Callers: %v
|
|
##############################
|
|
`, bc.chainConfig, block.Number(), block.Hash(), receiptString, err, debug.Callers(20)))
|
|
}
|
|
|
|
func (bc *BlockChain) rollbackBadBlock(block *types.Block, receipts types.Receipts, err error, reuseTrieDbState bool) {
|
|
bc.db.Rollback()
|
|
if reuseTrieDbState {
|
|
bc.setTrieDbState(bc.trieDbState.WithNewBuffer())
|
|
} else {
|
|
bc.setTrieDbState(nil)
|
|
}
|
|
bc.reportBlock(block, receipts, err)
|
|
if bc.committedBlock.Load() != nil {
|
|
bc.currentBlock.Store(bc.committedBlock.Load())
|
|
}
|
|
}
|
|
|
|
func (bc *BlockChain) insertHeaderChainStaged(chain []*types.Header) (int, int, int, bool, uint64, error) {
|
|
// The function below will insert headers, and track the lowest block
|
|
// number that have replace the canonical chain. This number will be
|
|
// used to trigger invalidation of further sync stages
|
|
var newCanonical bool
|
|
var lowestCanonicalNumber uint64
|
|
whFunc := func(header *types.Header) error {
|
|
status, err := bc.hc.WriteHeader(context.Background(), header)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if status == CanonStatTy {
|
|
number := header.Number.Uint64()
|
|
if !newCanonical || number < lowestCanonicalNumber {
|
|
lowestCanonicalNumber = number
|
|
newCanonical = true
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
// Collect some import statistics to report on
|
|
stats := struct{ processed, ignored int }{}
|
|
// All headers passed verification, import them into the database
|
|
for i, header := range chain {
|
|
// Short circuit insertion if shutting down
|
|
if bc.hc.procInterrupt() {
|
|
log.Debug("Premature abort during headers import")
|
|
return i, stats.processed, stats.ignored, newCanonical, lowestCanonicalNumber, errors.New("aborted")
|
|
}
|
|
// If the header's already known, skip it, otherwise store
|
|
hash := header.Hash()
|
|
if bc.hc.HasHeader(hash, header.Number.Uint64()) {
|
|
externTd := bc.hc.GetTd(bc.hc.chainDb, hash, header.Number.Uint64())
|
|
localTd := bc.hc.GetTd(bc.hc.chainDb, bc.hc.currentHeaderHash, bc.hc.CurrentHeader().Number.Uint64())
|
|
if externTd == nil || externTd.Cmp(localTd) <= 0 {
|
|
stats.ignored++
|
|
continue
|
|
}
|
|
}
|
|
if err := whFunc(header); err != nil {
|
|
return i, stats.processed, stats.ignored, newCanonical, lowestCanonicalNumber, err
|
|
}
|
|
stats.processed++
|
|
}
|
|
// Everything processed without errors
|
|
return len(chain), stats.processed, stats.ignored, newCanonical, lowestCanonicalNumber, nil
|
|
}
|
|
|
|
// InsertHeaderChainStaged attempts to add the chunk of headers to the headerchain
|
|
// It return the following values
|
|
// 1. (type int) number of items in the input chain that have been successfully processed and committed
|
|
// 2. (type bool) whether the insertion of this chunk has changed the canonical chain
|
|
// 3. (type uint64) the lowest block number that has been displaced from the canonical chain (this is to be used to invalidate further sync stages)
|
|
// 4. (type error) error happed during processing
|
|
func (bc *BlockChain) InsertHeaderChainStaged(chain []*types.Header, checkFreq int) (int, bool, uint64, error) {
|
|
start := time.Now()
|
|
if i, err := bc.hc.ValidateHeaderChain(chain, checkFreq); err != nil {
|
|
return i, false, 0, err
|
|
}
|
|
|
|
// Make sure only one thread manipulates the chain at once
|
|
bc.chainmu.Lock()
|
|
defer bc.chainmu.Unlock()
|
|
|
|
if err := bc.addJob(); err != nil {
|
|
return 0, false, 0, err
|
|
}
|
|
defer bc.doneJob()
|
|
|
|
n, processed, ignored, newCanonical, lowestCanonicalNumber, err := bc.insertHeaderChainStaged(chain)
|
|
|
|
// Report some public statistics so the user has a clue what's going on
|
|
last := chain[len(chain)-1]
|
|
|
|
context := []interface{}{
|
|
"count", processed, "elapsed", common.PrettyDuration(time.Since(start)),
|
|
"number", last.Number, "hash", last.Hash(),
|
|
}
|
|
if timestamp := time.Unix(int64(last.Time), 0); time.Since(timestamp) > time.Minute {
|
|
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
|
|
}
|
|
if ignored > 0 {
|
|
context = append(context, []interface{}{"ignored", ignored}...)
|
|
}
|
|
log.Info("Imported new block headers", context...)
|
|
|
|
var written uint64
|
|
var err1 error
|
|
if written, err1 = bc.db.Commit(); err1 != nil {
|
|
log.Error("Could not commit chainDb", "error", err1)
|
|
return 0, false, 0, err1
|
|
}
|
|
log.Info("Database", "size", bc.db.DiskSize(), "written", written)
|
|
return n, newCanonical, lowestCanonicalNumber, err
|
|
}
|
|
|
|
// InsertHeaderChain attempts to insert the given header chain in to the local
|
|
// chain, possibly creating a reorg. If an error is returned, it will return the
|
|
// index number of the failing header as well an error describing what went wrong.
|
|
//
|
|
// The verify parameter can be used to fine tune whether nonce verification
|
|
// should be done or not. The reason behind the optional check is because some
|
|
// of the header retrieval mechanisms already need to verify nonces, as well as
|
|
// because nonces can be verified sparsely, not needing to check each.
|
|
func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
|
|
start := time.Now()
|
|
if i, err := bc.hc.ValidateHeaderChain(chain, checkFreq); err != nil {
|
|
return i, err
|
|
}
|
|
|
|
// Make sure only one thread manipulates the chain at once
|
|
bc.chainmu.Lock()
|
|
defer bc.chainmu.Unlock()
|
|
|
|
if err := bc.addJob(); err != nil {
|
|
return 0, err
|
|
}
|
|
defer bc.doneJob()
|
|
|
|
whFunc := func(header *types.Header) error {
|
|
_, err := bc.hc.WriteHeader(context.Background(), header)
|
|
return err
|
|
}
|
|
n, err := bc.hc.InsertHeaderChain(chain, whFunc, start)
|
|
var written uint64
|
|
var err1 error
|
|
if written, err1 = bc.db.Commit(); err1 != nil {
|
|
log.Error("Could not commit chainDb", "error", err1)
|
|
return 0, err1
|
|
}
|
|
log.Info("Database", "size", bc.db.DiskSize(), "written", written)
|
|
return n, err
|
|
}
|
|
|
|
// CurrentHeader retrieves the current head header of the canonical chain. The
|
|
// header is retrieved from the HeaderChain's internal cache.
|
|
func (bc *BlockChain) CurrentHeader() *types.Header {
|
|
return bc.hc.CurrentHeader()
|
|
}
|
|
|
|
// GetTd retrieves a block's total difficulty in the canonical chain from the
|
|
// database by hash and number, caching it if found.
|
|
func (bc *BlockChain) GetTd(hash common.Hash, number uint64) *big.Int {
|
|
return bc.hc.GetTd(bc.db, hash, number)
|
|
}
|
|
|
|
// GetTdByHash retrieves a block's total difficulty in the canonical chain from the
|
|
// database by hash, caching it if found.
|
|
func (bc *BlockChain) GetTdByHash(hash common.Hash) *big.Int {
|
|
return bc.hc.GetTdByHash(hash)
|
|
}
|
|
|
|
// GetHeader retrieves a block header from the database by hash and number,
|
|
// caching it if found.
|
|
func (bc *BlockChain) GetHeader(hash common.Hash, number uint64) *types.Header {
|
|
return bc.hc.GetHeader(hash, number)
|
|
}
|
|
|
|
// GetHeaderByHash retrieves a block header from the database by hash, caching it if
|
|
// found.
|
|
func (bc *BlockChain) GetHeaderByHash(hash common.Hash) *types.Header {
|
|
return bc.hc.GetHeaderByHash(hash)
|
|
}
|
|
|
|
// HasHeader checks if a block header is present in the database or not, caching
|
|
// it if present.
|
|
func (bc *BlockChain) HasHeader(hash common.Hash, number uint64) bool {
|
|
return bc.hc.HasHeader(hash, number)
|
|
}
|
|
|
|
// GetCanonicalHash returns the canonical hash for a given block number
|
|
func (bc *BlockChain) GetCanonicalHash(number uint64) common.Hash {
|
|
return bc.hc.GetCanonicalHash(number)
|
|
}
|
|
|
|
// GetBlockHashesFromHash retrieves a number of block hashes starting at a given
|
|
// hash, fetching towards the genesis block.
|
|
func (bc *BlockChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash {
|
|
return bc.hc.GetBlockHashesFromHash(hash, max)
|
|
}
|
|
|
|
// GetAncestor retrieves the Nth ancestor of a given block. It assumes that either the given block or
|
|
// a close ancestor of it is canonical. maxNonCanonical points to a downwards counter limiting the
|
|
// number of blocks to be individually checked before we reach the canonical chain.
|
|
//
|
|
// Note: ancestor == 0 returns the same block, 1 returns its parent and so on.
|
|
func (bc *BlockChain) GetAncestor(hash common.Hash, number, ancestor uint64, maxNonCanonical *uint64) (common.Hash, uint64) {
|
|
return bc.hc.GetAncestor(hash, number, ancestor, maxNonCanonical)
|
|
}
|
|
|
|
// GetHeaderByNumber retrieves a block header from the database by number,
|
|
// caching it (associated with its hash) if found.
|
|
func (bc *BlockChain) GetHeaderByNumber(number uint64) *types.Header {
|
|
return bc.hc.GetHeaderByNumber(number)
|
|
}
|
|
|
|
// Config retrieves the chain's fork configuration.
|
|
func (bc *BlockChain) Config() *params.ChainConfig { return bc.chainConfig }
|
|
|
|
// Engine retrieves the blockchain's consensus engine.
|
|
func (bc *BlockChain) Engine() consensus.Engine { return bc.engine }
|
|
|
|
// SubscribeRemovedLogsEvent registers a subscription of RemovedLogsEvent.
|
|
func (bc *BlockChain) SubscribeRemovedLogsEvent(ch chan<- RemovedLogsEvent) event.Subscription {
|
|
return bc.scope.Track(bc.rmLogsFeed.Subscribe(ch))
|
|
}
|
|
|
|
// SubscribeChainEvent registers a subscription of ChainEvent.
|
|
func (bc *BlockChain) SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription {
|
|
return bc.scope.Track(bc.chainFeed.Subscribe(ch))
|
|
}
|
|
|
|
// SubscribeChainHeadEvent registers a subscription of ChainHeadEvent.
|
|
func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription {
|
|
return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch))
|
|
}
|
|
|
|
// SubscribeChainSideEvent registers a subscription of ChainSideEvent.
|
|
func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription {
|
|
return bc.scope.Track(bc.chainSideFeed.Subscribe(ch))
|
|
}
|
|
|
|
// SubscribeLogsEvent registers a subscription of []*types.Log.
|
|
func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
|
|
return bc.scope.Track(bc.logsFeed.Subscribe(ch))
|
|
}
|
|
|
|
// SubscribeBlockProcessingEvent registers a subscription of bool where true means
|
|
// block processing has started while false means it has stopped.
|
|
func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription {
|
|
return bc.scope.Track(bc.blockProcFeed.Subscribe(ch))
|
|
}
|
|
|
|
func (bc *BlockChain) ChainDb() ethdb.Database {
|
|
return bc.db
|
|
}
|
|
|
|
func (bc *BlockChain) NoHistory() bool {
|
|
return bc.cacheConfig.NoHistory
|
|
}
|
|
|
|
func (bc *BlockChain) IsNoHistory(currentBlock *big.Int) bool {
|
|
if currentBlock == nil {
|
|
return bc.cacheConfig.NoHistory
|
|
}
|
|
|
|
if !bc.cacheConfig.NoHistory {
|
|
return false
|
|
}
|
|
|
|
var isArchiveInterval bool
|
|
currentBlockNumber := bc.CurrentBlock().Number().Uint64()
|
|
highestKnownBlock := bc.GetHeightKnownBlock()
|
|
if highestKnownBlock > currentBlockNumber {
|
|
isArchiveInterval = (currentBlock.Uint64() - highestKnownBlock) <= bc.cacheConfig.ArchiveSyncInterval
|
|
} else {
|
|
isArchiveInterval = (currentBlock.Uint64() - currentBlockNumber) <= bc.cacheConfig.ArchiveSyncInterval
|
|
}
|
|
|
|
return bc.cacheConfig.NoHistory || isArchiveInterval
|
|
}
|
|
|
|
func (bc *BlockChain) NotifyHeightKnownBlock(h uint64) {
|
|
bc.highestKnownBlockMu.Lock()
|
|
if bc.highestKnownBlock < h {
|
|
bc.highestKnownBlock = h
|
|
}
|
|
bc.highestKnownBlockMu.Unlock()
|
|
}
|
|
|
|
func (bc *BlockChain) GetHeightKnownBlock() uint64 {
|
|
bc.highestKnownBlockMu.Lock()
|
|
defer bc.highestKnownBlockMu.Unlock()
|
|
return bc.highestKnownBlock
|
|
}
|
|
|
|
func (bc *BlockChain) WithContext(ctx context.Context, blockNum *big.Int) context.Context {
|
|
ctx = bc.Config().WithEIPsFlags(ctx, blockNum)
|
|
ctx = params.WithNoHistory(ctx, bc.NoHistory(), bc.IsNoHistory)
|
|
return ctx
|
|
}
|
|
|
|
type Pruner interface {
|
|
Start() error
|
|
Stop()
|
|
}
|
|
|
|
// addJob should be called only for public methods
|
|
func (bc *BlockChain) addJob() error {
|
|
bc.quitMu.RLock()
|
|
defer bc.quitMu.RUnlock()
|
|
if bc.getProcInterrupt() {
|
|
return errors.New("blockchain is stopped")
|
|
}
|
|
bc.wg.Add(1)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (bc *BlockChain) doneJob() {
|
|
bc.wg.Done()
|
|
}
|
|
|
|
func (bc *BlockChain) waitJobs() {
|
|
bc.quitMu.Lock()
|
|
atomic.StoreInt32(&bc.procInterrupt, 1)
|
|
bc.wg.Wait()
|
|
bc.quitMu.Unlock()
|
|
}
|
|
|
|
// ExecuteBlockEuphemerally runs a block from provided stateReader and
|
|
// writes the result to the provided stateWriter
|
|
func ExecuteBlockEuphemerally(
|
|
chainConfig *params.ChainConfig,
|
|
vmConfig *vm.Config,
|
|
chainContext ChainContext,
|
|
engine consensus.Engine,
|
|
block *types.Block,
|
|
stateReader state.StateReader,
|
|
stateWriter state.WriterWithChangeSets,
|
|
) error {
|
|
ibs := state.New(stateReader)
|
|
header := block.Header()
|
|
var receipts types.Receipts
|
|
usedGas := new(uint64)
|
|
gp := new(GasPool).AddGas(block.GasLimit())
|
|
|
|
if chainConfig.DAOForkSupport && chainConfig.DAOForkBlock != nil && chainConfig.DAOForkBlock.Cmp(block.Number()) == 0 {
|
|
misc.ApplyDAOHardFork(ibs)
|
|
}
|
|
noop := state.NewNoopWriter()
|
|
for i, tx := range block.Transactions() {
|
|
ibs.Prepare(tx.Hash(), block.Hash(), i)
|
|
receipt, err := ApplyTransaction(chainConfig, chainContext, nil, gp, ibs, noop, header, tx, usedGas, *vmConfig)
|
|
if err != nil {
|
|
return fmt.Errorf("tx %x failed: %v", tx.Hash(), err)
|
|
}
|
|
receipts = append(receipts, receipt)
|
|
}
|
|
|
|
if chainConfig.IsByzantium(header.Number) {
|
|
receiptSha := types.DeriveSha(receipts)
|
|
if receiptSha != block.Header().ReceiptHash {
|
|
return fmt.Errorf("mismatched receipt headers for block %d", block.NumberU64())
|
|
}
|
|
}
|
|
|
|
// Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
|
|
if _, err := engine.FinalizeAndAssemble(chainConfig, header, ibs, block.Transactions(), block.Uncles(), receipts); err != nil {
|
|
return fmt.Errorf("finalize of block %d failed: %v", block.NumberU64(), err)
|
|
}
|
|
|
|
ctx := chainConfig.WithEIPsFlags(context.Background(), header.Number)
|
|
if err := ibs.CommitBlock(ctx, stateWriter); err != nil {
|
|
return fmt.Errorf("commiting block %d failed: %v", block.NumberU64(), err)
|
|
}
|
|
|
|
if err := stateWriter.WriteChangeSets(); err != nil {
|
|
return fmt.Errorf("writing changesets for block %d failed: %v", block.NumberU64(), err)
|
|
}
|
|
|
|
return nil
|
|
}
|