mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-12 05:50:06 +00:00
eth, eth/downloader: fix processing interrupt caused by temp cancel
This commit is contained in:
parent
ae36beb38f
commit
2f4cbe22f5
@ -87,6 +87,8 @@ type Downloader struct {
|
||||
checks map[common.Hash]*crossCheck // Pending cross checks to verify a hash chain
|
||||
banned *set.Set // Set of hashes we've received and banned
|
||||
|
||||
interrupt int32 // Atomic boolean to signal termination
|
||||
|
||||
// Statistics
|
||||
importStart time.Time // Instance when the last blocks were taken from the cache
|
||||
importQueue []*Block // Previously taken blocks to check import progress
|
||||
@ -245,12 +247,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash) error {
|
||||
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
|
||||
glog.V(logger.Info).Infoln("Block synchronisation started")
|
||||
}
|
||||
|
||||
// Create cancel channel for aborting mid-flight
|
||||
d.cancelLock.Lock()
|
||||
d.cancelCh = make(chan struct{})
|
||||
d.cancelLock.Unlock()
|
||||
|
||||
// Abort if the queue still contains some leftover data
|
||||
if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil {
|
||||
return errPendingQueue
|
||||
@ -260,12 +256,16 @@ func (d *Downloader) synchronise(id string, hash common.Hash) error {
|
||||
d.peers.Reset()
|
||||
d.checks = make(map[common.Hash]*crossCheck)
|
||||
|
||||
// Create cancel channel for aborting mid-flight
|
||||
d.cancelLock.Lock()
|
||||
d.cancelCh = make(chan struct{})
|
||||
d.cancelLock.Unlock()
|
||||
|
||||
// Retrieve the origin peer and initiate the downloading process
|
||||
p := d.peers.Peer(id)
|
||||
if p == nil {
|
||||
return errUnknownPeer
|
||||
}
|
||||
|
||||
return d.syncWithPeer(p, hash)
|
||||
}
|
||||
|
||||
@ -282,7 +282,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
|
||||
defer func() {
|
||||
// reset on error
|
||||
if err != nil {
|
||||
d.Cancel()
|
||||
d.cancel()
|
||||
d.mux.Post(FailedEvent{err})
|
||||
} else {
|
||||
d.mux.Post(DoneEvent{})
|
||||
@ -301,9 +301,9 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Cancel cancels all of the operations and resets the queue. It returns true
|
||||
// cancel cancels all of the operations and resets the queue. It returns true
|
||||
// if the cancel operation was completed.
|
||||
func (d *Downloader) Cancel() {
|
||||
func (d *Downloader) cancel() {
|
||||
// Close the current cancel channel
|
||||
d.cancelLock.Lock()
|
||||
if d.cancelCh != nil {
|
||||
@ -320,6 +320,12 @@ func (d *Downloader) Cancel() {
|
||||
d.queue.Reset()
|
||||
}
|
||||
|
||||
// Terminate interrupts the downloader, canceling all pending operations.
|
||||
func (d *Downloader) Terminate() {
|
||||
atomic.StoreInt32(&d.interrupt, 1)
|
||||
d.cancel()
|
||||
}
|
||||
|
||||
// fetchHahes starts retrieving hashes backwards from a specific peer and hash,
|
||||
// up until it finds a common ancestor. If the source peer times out, alternative
|
||||
// ones are tried for continuation.
|
||||
@ -737,12 +743,6 @@ func (d *Downloader) process() (err error) {
|
||||
|
||||
atomic.StoreInt32(&d.processing, 0)
|
||||
}()
|
||||
|
||||
// Fetch the current cancel channel to allow termination
|
||||
d.cancelLock.RLock()
|
||||
cancel := d.cancelCh
|
||||
d.cancelLock.RUnlock()
|
||||
|
||||
// Repeat the processing as long as there are blocks to import
|
||||
for {
|
||||
// Fetch the next batch of blocks
|
||||
@ -759,12 +759,10 @@ func (d *Downloader) process() (err error) {
|
||||
|
||||
// Actually import the blocks
|
||||
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number())
|
||||
for len(blocks) != 0 { // TODO: quit
|
||||
for len(blocks) != 0 {
|
||||
// Check for any termination requests
|
||||
select {
|
||||
case <-cancel:
|
||||
if atomic.LoadInt32(&d.interrupt) == 1 {
|
||||
return errCancelChainImport
|
||||
default:
|
||||
}
|
||||
// Retrieve the first batch of blocks to insert
|
||||
max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess)))
|
||||
@ -777,7 +775,7 @@ func (d *Downloader) process() (err error) {
|
||||
if err != nil {
|
||||
glog.V(logger.Debug).Infof("Block #%d import failed: %v", raw[index].NumberU64(), err)
|
||||
d.dropPeer(blocks[index].OriginPeer)
|
||||
d.Cancel()
|
||||
d.cancel()
|
||||
return errCancelChainImport
|
||||
}
|
||||
blocks = blocks[max:]
|
||||
|
@ -247,7 +247,7 @@ func TestCancel(t *testing.T) {
|
||||
tester.newPeer("peer", hashes, blocks)
|
||||
|
||||
// Make sure canceling works with a pristine downloader
|
||||
tester.downloader.Cancel()
|
||||
tester.downloader.cancel()
|
||||
hashCount, blockCount := tester.downloader.queue.Size()
|
||||
if hashCount > 0 || blockCount > 0 {
|
||||
t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount)
|
||||
@ -256,7 +256,7 @@ func TestCancel(t *testing.T) {
|
||||
if err := tester.sync("peer"); err != nil {
|
||||
t.Fatalf("failed to synchronise blocks: %v", err)
|
||||
}
|
||||
tester.downloader.Cancel()
|
||||
tester.downloader.cancel()
|
||||
hashCount, blockCount = tester.downloader.queue.Size()
|
||||
if hashCount > 0 || blockCount > 0 {
|
||||
t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount)
|
||||
|
@ -251,7 +251,7 @@ func (pm *ProtocolManager) fetcher() {
|
||||
// downloading hashes and blocks as well as retrieving cached ones.
|
||||
func (pm *ProtocolManager) syncer() {
|
||||
// Abort any pending syncs if we terminate
|
||||
defer pm.downloader.Cancel()
|
||||
defer pm.downloader.Terminate()
|
||||
|
||||
forceSync := time.Tick(forceSyncCycle)
|
||||
for {
|
||||
|
Loading…
Reference in New Issue
Block a user