Prefetch block bodies (#1013)

* prefetched blocks 1

* fixups

* fix logs

* fixups

* linters

* remove logs

* fix an NPE in tests

* setup the correct origin

* run blocks on prefetched bodies

* fix log

* smaller cache size
This commit is contained in:
Igor Mandrigin 2020-08-30 19:34:40 +02:00 committed by GitHub
parent ada52a8cd5
commit 7bc892e886
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 92 additions and 7 deletions

View File

@ -14,7 +14,12 @@ import (
)
// externsions for downloader needed for staged sync
func (d *Downloader) SpawnBodyDownloadStage(id string, s *stagedsync.StageState, u stagedsync.Unwinder) (bool, error) {
func (d *Downloader) SpawnBodyDownloadStage(
id string,
s *stagedsync.StageState,
u stagedsync.Unwinder,
prefetchedBlocks *stagedsync.PrefetchedBlocks,
) (bool, error) {
d.bodiesState = s
d.bodiesUnwinder = u
defer func() {
@ -85,6 +90,28 @@ func (d *Downloader) SpawnBodyDownloadStage(id string, s *stagedsync.StageState,
// No more bodies to download
return false, nil
}
prefetchedHashes := 0
for prefetchedHashes < hashCount {
h := hashes[prefetchedHashes]
if block := prefetchedBlocks.Pop(h); block != nil {
fr := fetchResultFromBlock(block)
execute := false
_, err := d.importBlockResults([]*fetchResult{fr}, execute)
if err != nil {
return false, err
}
prefetchedHashes++
} else {
break
}
}
if prefetchedHashes > 0 {
log.Debug("Used prefetched bodies", "count", prefetchedHashes, "to", origin+uint64(prefetchedHashes))
return true, nil
}
log.Info("Downloading block bodies", "count", hashCount)
from := origin + 1
d.queue.Prepare(from, d.getMode())
d.queue.ScheduleBodies(from, hashes[:hashCount], headers)
@ -106,9 +133,18 @@ func (d *Downloader) SpawnBodyDownloadStage(id string, s *stagedsync.StageState,
if err := d.spawnSync(fetchers); err != nil {
return false, err
}
return true, nil
}
func fetchResultFromBlock(b *types.Block) *fetchResult {
return &fetchResult{
Header: b.Header(),
Uncles: b.Uncles(),
Transactions: b.Transactions(),
}
}
// processBodiesStage takes fetch results from the queue and imports them into the chain.
// it doesn't execute blocks
func (d *Downloader) processBodiesStage(to uint64) error {

View File

@ -16,6 +16,7 @@ import (
"github.com/ledgerwatch/turbo-geth/core/state"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/core/vm"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/event"
"github.com/ledgerwatch/turbo-geth/log"
@ -41,6 +42,7 @@ func newStagedSyncTester() (*stagedSyncTester, func()) {
rawdb.WriteTd(tester.db, tester.genesis.Hash(), tester.genesis.NumberU64(), tester.genesis.Difficulty())
rawdb.WriteBlock(context.Background(), tester.db, testGenesis)
tester.downloader = New(uint64(StagedSync), tester.db, new(event.TypeMux), params.TestChainConfig, tester, nil, tester.dropPeer, ethdb.DefaultStorageMode)
tester.downloader.SetStagedSync(stagedsync.New())
clear := func() {
tester.db.Close()
}

View File

@ -858,6 +858,12 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := pm.blockFetcher.Enqueue(p.id, request.Block); err != nil {
return err
}
} else {
log.Debug("Adding block to staged sync prefetch",
"number", request.Block.NumberU64,
"hash", request.Block.Hash().Hex(),
)
pm.stagedSync.PrefetchedBlocks.Add(request.Block)
}
// Assuming the block is importable by the peer, but possibly not yet done so,

View File

@ -0,0 +1,38 @@
package stagedsync
import (
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/core/types"
lru "github.com/hashicorp/golang-lru"
)
type PrefetchedBlocks struct {
blocks *lru.Cache
}
func NewPrefetchedBlocks() *PrefetchedBlocks {
cache, err := lru.New(1000)
if err != nil {
panic("error creating prefetching cache for blocks")
}
return &PrefetchedBlocks{blocks: cache}
}
func (pb *PrefetchedBlocks) Pop(hash common.Hash) *types.Block {
if val, ok := pb.blocks.Get(hash); ok && val != nil {
pb.blocks.Remove(hash)
if block, ok := val.(*types.Block); ok {
return block
}
}
return nil
}
func (pb *PrefetchedBlocks) Add(b *types.Block) {
if b == nil {
return
}
hash := b.Hash()
pb.blocks.ContainsOrAdd(hash, b)
}

View File

@ -6,8 +6,8 @@ import (
"github.com/ledgerwatch/turbo-geth/ethdb"
)
func spawnBodyDownloadStage(s *StageState, u Unwinder, d DownloaderGlue, pid string) error {
cont, err := d.SpawnBodyDownloadStage(pid, s, u)
func spawnBodyDownloadStage(s *StageState, u Unwinder, d DownloaderGlue, pid string, pb *PrefetchedBlocks) error {
cont, err := d.SpawnBodyDownloadStage(pid, s, u, pb)
if err != nil {
return err
}

View File

@ -15,13 +15,16 @@ import (
const prof = false // whether to profile
type StagedSync struct {
PrefetchedBlocks *PrefetchedBlocks
}
func New() *StagedSync {
return &StagedSync{}
return &StagedSync{
PrefetchedBlocks: NewPrefetchedBlocks(),
}
}
func (*StagedSync) Prepare(
func (stagedSync *StagedSync) Prepare(
d DownloaderGlue,
chainConfig *params.ChainConfig,
chainContext core.ChainContext,
@ -64,7 +67,7 @@ func (*StagedSync) Prepare(
ID: stages.Bodies,
Description: "Download block bodies",
ExecFunc: func(s *StageState, u Unwinder) error {
return spawnBodyDownloadStage(s, u, d, pid)
return spawnBodyDownloadStage(s, u, d, pid, stagedSync.PrefetchedBlocks)
},
UnwindFunc: func(u *UnwindState, s *StageState) error {
return unwindBodyDownloadStage(u, db)

View File

@ -2,5 +2,5 @@ package stagedsync
type DownloaderGlue interface {
SpawnHeaderDownloadStage([]func() error, *StageState, Unwinder) error
SpawnBodyDownloadStage(string, *StageState, Unwinder) (bool, error)
SpawnBodyDownloadStage(string, *StageState, Unwinder, *PrefetchedBlocks) (bool, error)
}