mirror of
https://gitlab.com/pulsechaincom/go-pulse.git
synced 2024-12-23 03:51:09 +00:00
28ec26094b
* eth/downloader: retrieve pivot header from local chain if necessary * eth/downloader: improve readability * eth/downloader: update fix * eth/downloader: add beacon sync tests * eth/downloader: remove duplicated code
313 lines
11 KiB
Go
313 lines
11 KiB
Go
// Copyright 2021 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package downloader
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/core/types"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
)
|
|
|
|
// beaconBackfiller is the chain and state backfilling that can be commenced once
|
|
// the skeleton syncer has successfully reverse downloaded all the headers up to
|
|
// the genesis block or an existing header in the database. Its operation is fully
|
|
// directed by the skeleton sync's head/tail events.
|
|
type beaconBackfiller struct {
|
|
downloader *Downloader // Downloader to direct via this callback implementation
|
|
syncMode SyncMode // Sync mode to use for backfilling the skeleton chains
|
|
success func() // Callback to run on successful sync cycle completion
|
|
filling bool // Flag whether the downloader is backfilling or not
|
|
started chan struct{} // Notification channel whether the downloader inited
|
|
lock sync.Mutex // Mutex protecting the sync lock
|
|
}
|
|
|
|
// newBeaconBackfiller is a helper method to create the backfiller.
|
|
func newBeaconBackfiller(dl *Downloader, success func()) backfiller {
|
|
return &beaconBackfiller{
|
|
downloader: dl,
|
|
success: success,
|
|
}
|
|
}
|
|
|
|
// suspend cancels any background downloader threads.
|
|
func (b *beaconBackfiller) suspend() {
|
|
// If no filling is running, don't waste cycles
|
|
b.lock.Lock()
|
|
filling := b.filling
|
|
started := b.started
|
|
b.lock.Unlock()
|
|
|
|
if !filling {
|
|
return
|
|
}
|
|
// A previous filling should be running, though it may happen that it hasn't
|
|
// yet started (being done on a new goroutine). Many concurrent beacon head
|
|
// announcements can lead to sync start/stop thrashing. In that case we need
|
|
// to wait for initialization before we can safely cancel it. It is safe to
|
|
// read this channel multiple times, it gets closed on startup.
|
|
<-started
|
|
|
|
// Now that we're sure the downloader successfully started up, we can cancel
|
|
// it safely without running the risk of data races.
|
|
b.downloader.Cancel()
|
|
}
|
|
|
|
// resume starts the downloader threads for backfilling state and chain data.
|
|
func (b *beaconBackfiller) resume() {
|
|
b.lock.Lock()
|
|
if b.filling {
|
|
// If a previous filling cycle is still running, just ignore this start
|
|
// request. // TODO(karalabe): We should make this channel driven
|
|
b.lock.Unlock()
|
|
return
|
|
}
|
|
b.filling = true
|
|
b.started = make(chan struct{})
|
|
mode := b.syncMode
|
|
b.lock.Unlock()
|
|
|
|
// Start the backfilling on its own thread since the downloader does not have
|
|
// its own lifecycle runloop.
|
|
go func() {
|
|
// Set the backfiller to non-filling when download completes
|
|
defer func() {
|
|
b.lock.Lock()
|
|
b.filling = false
|
|
b.lock.Unlock()
|
|
}()
|
|
// If the downloader fails, report an error as in beacon chain mode there
|
|
// should be no errors as long as the chain we're syncing to is valid.
|
|
if err := b.downloader.synchronise("", common.Hash{}, nil, nil, mode, true, b.started); err != nil {
|
|
log.Error("Beacon backfilling failed", "err", err)
|
|
return
|
|
}
|
|
// Synchronization succeeded. Since this happens async, notify the outer
|
|
// context to disable snap syncing and enable transaction propagation.
|
|
if b.success != nil {
|
|
b.success()
|
|
}
|
|
}()
|
|
}
|
|
|
|
// setMode updates the sync mode from the current one to the requested one. If
|
|
// there's an active sync in progress, it will be cancelled and restarted.
|
|
func (b *beaconBackfiller) setMode(mode SyncMode) {
|
|
// Update the old sync mode and track if it was changed
|
|
b.lock.Lock()
|
|
updated := b.syncMode != mode
|
|
filling := b.filling
|
|
b.syncMode = mode
|
|
b.lock.Unlock()
|
|
|
|
// If the sync mode was changed mid-sync, restart. This should never ever
|
|
// really happen, we just handle it to detect programming errors.
|
|
if !updated || !filling {
|
|
return
|
|
}
|
|
log.Error("Downloader sync mode changed mid-run", "old", mode.String(), "new", mode.String())
|
|
b.suspend()
|
|
b.resume()
|
|
}
|
|
|
|
// BeaconSync is the post-merge version of the chain synchronization, where the
|
|
// chain is not downloaded from genesis onward, rather from trusted head announces
|
|
// backwards.
|
|
//
|
|
// Internally backfilling and state sync is done the same way, but the header
|
|
// retrieval and scheduling is replaced.
|
|
func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header) error {
|
|
return d.beaconSync(mode, head, true)
|
|
}
|
|
|
|
// BeaconExtend is an optimistic version of BeaconSync, where an attempt is made
|
|
// to extend the current beacon chain with a new header, but in case of a mismatch,
|
|
// the old sync will not be terminated and reorged, rather the new head is dropped.
|
|
//
|
|
// This is useful if a beacon client is feeding us large chunks of payloads to run,
|
|
// but is not setting the head after each.
|
|
func (d *Downloader) BeaconExtend(mode SyncMode, head *types.Header) error {
|
|
return d.beaconSync(mode, head, false)
|
|
}
|
|
|
|
// beaconSync is the post-merge version of the chain synchronization, where the
|
|
// chain is not downloaded from genesis onward, rather from trusted head announces
|
|
// backwards.
|
|
//
|
|
// Internally backfilling and state sync is done the same way, but the header
|
|
// retrieval and scheduling is replaced.
|
|
func (d *Downloader) beaconSync(mode SyncMode, head *types.Header, force bool) error {
|
|
// When the downloader starts a sync cycle, it needs to be aware of the sync
|
|
// mode to use (full, snap). To keep the skeleton chain oblivious, inject the
|
|
// mode into the backfiller directly.
|
|
//
|
|
// Super crazy dangerous type cast. Should be fine (TM), we're only using a
|
|
// different backfiller implementation for skeleton tests.
|
|
d.skeleton.filler.(*beaconBackfiller).setMode(mode)
|
|
|
|
// Signal the skeleton sync to switch to a new head, however it wants
|
|
if err := d.skeleton.Sync(head, force); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// findBeaconAncestor tries to locate the common ancestor link of the local chain
|
|
// and the beacon chain just requested. In the general case when our node was in
|
|
// sync and on the correct chain, checking the top N links should already get us
|
|
// a match. In the rare scenario when we ended up on a long reorganisation (i.e.
|
|
// none of the head links match), we do a binary search to find the ancestor.
|
|
func (d *Downloader) findBeaconAncestor() (uint64, error) {
|
|
// Figure out the current local head position
|
|
var chainHead *types.Header
|
|
|
|
switch d.getMode() {
|
|
case FullSync:
|
|
chainHead = d.blockchain.CurrentBlock().Header()
|
|
case SnapSync:
|
|
chainHead = d.blockchain.CurrentFastBlock().Header()
|
|
default:
|
|
chainHead = d.lightchain.CurrentHeader()
|
|
}
|
|
number := chainHead.Number.Uint64()
|
|
|
|
// Retrieve the skeleton bounds and ensure they are linked to the local chain
|
|
beaconHead, beaconTail, err := d.skeleton.Bounds()
|
|
if err != nil {
|
|
// This is a programming error. The chain backfiller was called with an
|
|
// invalid beacon sync state. Ideally we would panic here, but erroring
|
|
// gives us at least a remote chance to recover. It's still a big fault!
|
|
log.Error("Failed to retrieve beacon bounds", "err", err)
|
|
return 0, err
|
|
}
|
|
var linked bool
|
|
switch d.getMode() {
|
|
case FullSync:
|
|
linked = d.blockchain.HasBlock(beaconTail.ParentHash, beaconTail.Number.Uint64()-1)
|
|
case SnapSync:
|
|
linked = d.blockchain.HasFastBlock(beaconTail.ParentHash, beaconTail.Number.Uint64()-1)
|
|
default:
|
|
linked = d.blockchain.HasHeader(beaconTail.ParentHash, beaconTail.Number.Uint64()-1)
|
|
}
|
|
if !linked {
|
|
// This is a programming error. The chain backfiller was called with a
|
|
// tail that's not linked to the local chain. Whilst this should never
|
|
// happen, there might be some weirdnesses if beacon sync backfilling
|
|
// races with the user (or beacon client) calling setHead. Whilst panic
|
|
// would be the ideal thing to do, it is safer long term to attempt a
|
|
// recovery and fix any noticed issue after the fact.
|
|
log.Error("Beacon sync linkup unavailable", "number", beaconTail.Number.Uint64()-1, "hash", beaconTail.ParentHash)
|
|
return 0, fmt.Errorf("beacon linkup unavailable locally: %d [%x]", beaconTail.Number.Uint64()-1, beaconTail.ParentHash)
|
|
}
|
|
// Binary search to find the ancestor
|
|
start, end := beaconTail.Number.Uint64()-1, number
|
|
if number := beaconHead.Number.Uint64(); end > number {
|
|
// This shouldn't really happen in a healty network, but if the consensus
|
|
// clients feeds us a shorter chain as the canonical, we should not attempt
|
|
// to access non-existent skeleton items.
|
|
log.Warn("Beacon head lower than local chain", "beacon", number, "local", end)
|
|
end = number
|
|
}
|
|
for start+1 < end {
|
|
// Split our chain interval in two, and request the hash to cross check
|
|
check := (start + end) / 2
|
|
|
|
h := d.skeleton.Header(check)
|
|
n := h.Number.Uint64()
|
|
|
|
var known bool
|
|
switch d.getMode() {
|
|
case FullSync:
|
|
known = d.blockchain.HasBlock(h.Hash(), n)
|
|
case SnapSync:
|
|
known = d.blockchain.HasFastBlock(h.Hash(), n)
|
|
default:
|
|
known = d.lightchain.HasHeader(h.Hash(), n)
|
|
}
|
|
if !known {
|
|
end = check
|
|
continue
|
|
}
|
|
start = check
|
|
}
|
|
return start, nil
|
|
}
|
|
|
|
// fetchBeaconHeaders feeds skeleton headers to the downloader queue for scheduling
|
|
// until sync errors or is finished.
|
|
func (d *Downloader) fetchBeaconHeaders(from uint64) error {
|
|
head, _, err := d.skeleton.Bounds()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for {
|
|
// Retrieve a batch of headers and feed it to the header processor
|
|
var (
|
|
headers = make([]*types.Header, 0, maxHeadersProcess)
|
|
hashes = make([]common.Hash, 0, maxHeadersProcess)
|
|
)
|
|
for i := 0; i < maxHeadersProcess && from <= head.Number.Uint64(); i++ {
|
|
header := d.skeleton.Header(from)
|
|
if header == nil {
|
|
header = d.lightchain.GetHeaderByNumber(from)
|
|
}
|
|
headers = append(headers, header)
|
|
hashes = append(hashes, headers[i].Hash())
|
|
from++
|
|
}
|
|
if len(headers) > 0 {
|
|
log.Trace("Scheduling new beacon headers", "count", len(headers), "from", from-uint64(len(headers)))
|
|
select {
|
|
case d.headerProcCh <- &headerTask{
|
|
headers: headers,
|
|
hashes: hashes,
|
|
}:
|
|
case <-d.cancelCh:
|
|
return errCanceled
|
|
}
|
|
}
|
|
// If we still have headers to import, loop and keep pushing them
|
|
if from <= head.Number.Uint64() {
|
|
continue
|
|
}
|
|
// If the pivot block is committed, signal header sync termination
|
|
if atomic.LoadInt32(&d.committed) == 1 {
|
|
select {
|
|
case d.headerProcCh <- nil:
|
|
return nil
|
|
case <-d.cancelCh:
|
|
return errCanceled
|
|
}
|
|
}
|
|
// State sync still going, wait a bit for new headers and retry
|
|
log.Trace("Pivot not yet committed, waiting...")
|
|
select {
|
|
case <-time.After(fsHeaderContCheck):
|
|
case <-d.cancelCh:
|
|
return errCanceled
|
|
}
|
|
head, _, err = d.skeleton.Bounds()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|