Add senders recovery stage to staged sync (#494)

* Maintenance commit

* Fixes

* Fixes

* Fix linters

* Fix linter

* Fix linter
This commit is contained in:
ledgerwatch 2020-04-28 06:36:51 +01:00 committed by GitHub
parent a09547e822
commit 7211b0f261
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 115 additions and 10 deletions

View File

@ -236,6 +236,14 @@ func (tx *Transaction) AsMessage(s Signer) (Message, error) {
return msg, err
}
func (tx *Transaction) SetFrom(from common.Address) {
tx.from.Store(from)
}
func (tx *Transaction) HasFrom() bool {
return tx.from.Load() != nil
}
// WithSignature returns a new transaction with the given signature.
// This signature needs to be in the [R || S || V] format where V is 0 or 1.
func (tx *Transaction) WithSignature(signer Signer, sig []byte) (*Transaction, error) {

View File

@ -220,6 +220,9 @@ type BlockChain interface {
// GetBlockByNumber is necessary for staged sync
GetBlockByNumber(number uint64) *types.Block
// Config is necessary for staged sync
Config() *params.ChainConfig
}
// New creates a new downloader to fetch hashes and blocks from remote peers.

View File

@ -33,6 +33,7 @@ import (
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/event"
"github.com/ledgerwatch/turbo-geth/params"
"github.com/ledgerwatch/turbo-geth/trie"
)
@ -273,6 +274,10 @@ func (dl *downloadTester) InsertBodyChain(_ context.Context, blocks types.Blocks
return 0, nil
}
func (dl *downloadTester) Config() *params.ChainConfig {
return nil
}
// InsertChain injects a new batch of blocks into the simulated chain.
func (dl *downloadTester) InsertChain(_ context.Context, blocks types.Blocks) (i int, err error) {
dl.lock.Lock()

View File

@ -4,7 +4,7 @@ import "github.com/ledgerwatch/turbo-geth/log"
func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers []func() error) error {
log.Info("Sync stage 1/4. Downloading headers...")
log.Info("Sync stage 1/5. Downloading headers...")
var err error
@ -15,8 +15,8 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers
return err
}
log.Info("Sync stage 1/4. Downloading headers... Complete!")
log.Info("Sync stage 2/4. Downloading block bodies...")
log.Info("Sync stage 1/5. Downloading headers... Complete!")
log.Info("Sync stage 2/5. Downloading block bodies...")
/*
* Stage 2. Download Block bodies
@ -30,11 +30,22 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers
return err
}
log.Info("Sync stage 2/4. Downloading block bodies... Complete!")
log.Info("Sync stage 3/4. Executing blocks w/o hash checks...")
log.Info("Sync stage 2/5. Downloading block bodies... Complete!")
/*
* Stage 3. Recover senders from tx signatures
*/
log.Info("Sync stage 3/5. Recovering senders from tx signatures...")
err = d.spawnRecoverSendersStage()
if err != nil {
return err
}
log.Info("Sync stage 3/5. Recovering senders from tx signatures... Complete!")
log.Info("Sync stage 4/5. Executing blocks w/o hash checks...")
/*
* Stage 3. Execute block bodies w/o calculating trie roots
* Stage 4. Execute block bodies w/o calculating trie roots
*/
syncHeadNumber := uint64(0)
syncHeadNumber, err = d.spawnExecuteBlocksStage()
@ -42,14 +53,14 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers
return err
}
log.Info("Sync stage 3/4. Executing blocks w/o hash checks... Complete!")
log.Info("Sync stage 4/5. Executing blocks w/o hash checks... Complete!")
// Further stages go there
log.Info("Sync stage 4/4. Validating final hash")
log.Info("Sync stage 5/5. Validating final hash")
if err = d.spawnCheckFinalHashStage(syncHeadNumber); err != nil {
return err
}
log.Info("Sync stage 4/4. Validating final hash... Complete!")
log.Info("Sync stage 5/5. Validating final hash... Complete!")
return err
}

View File

@ -29,7 +29,8 @@ type SyncStage byte
const (
Headers SyncStage = iota // Headers are downloaded, their Proof-Of-Work validity and chaining is verified
Bodies // Block bodies are downloaded, TxHash and UncleHash are getting verified, "From" recovered from signatures
Bodies // Block bodies are downloaded, TxHash and UncleHash are getting verified
Senders // "From" recovered from signatures, bodies re-written
Execution // Executing each block w/o buildinf a trie
HashCheck // Checking the root hash
Finish // Nominal stage after all other stages

View File

@ -0,0 +1,77 @@
package downloader
import (
"context"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/log"
"math/big"
)
func (d *Downloader) spawnRecoverSendersStage() error {
lastProcessedBlockNumber, err := GetStageProgress(d.stateDB, Senders)
if err != nil {
return err
}
nextBlockNumber := lastProcessedBlockNumber + 1
mutation := d.stateDB.NewBatch()
defer func() {
_, dbErr := mutation.Commit()
if dbErr != nil {
log.Error("Sync (Senders): failed to write db commit", "err", dbErr)
}
}()
config := d.blockchain.Config()
emptyHash := common.Hash{}
var blockNumber big.Int
for {
hash := rawdb.ReadCanonicalHash(mutation, nextBlockNumber)
if hash == emptyHash {
break
}
body := rawdb.ReadBody(mutation, hash, nextBlockNumber)
if body == nil {
break
}
blockNumber.SetUint64(nextBlockNumber)
s := types.MakeSigner(config, &blockNumber)
for _, tx := range body.Transactions {
from, err1 := types.Sender(s, tx)
if err1 != nil {
log.Error("Recovering sender from signature", "tx", tx.Hash(), "block", nextBlockNumber, "error", err1)
break
}
tx.SetFrom(from)
if tx.Protected() && tx.ChainId().Cmp(s.ChainId()) != 0 {
log.Error("Invalid chainId", "tx", tx.Hash(), "block", nextBlockNumber, "tx.chainId", tx.ChainId(), "expected", s.ChainId())
break
}
}
rawdb.WriteBody(context.Background(), mutation, hash, nextBlockNumber, body)
if nextBlockNumber%1000 == 0 {
log.Info("Recovered for blocks:", "blockNumber", nextBlockNumber)
}
if err = SaveStageProgress(mutation, Senders, nextBlockNumber); err != nil {
return err
}
nextBlockNumber++
if mutation.BatchSize() >= mutation.IdealBatchSize() {
if _, err = mutation.Commit(); err != nil {
return err
}
mutation = d.stateDB.NewBatch()
}
}
return nil
}