mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-24 20:47:16 +00:00
eth/downloader: instreument and test the sync peer drop
This commit is contained in:
parent
faae8b7dd8
commit
80833f8137
@ -31,7 +31,6 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
errLowTd = errors.New("peers TD is too low")
|
|
||||||
errBusy = errors.New("busy")
|
errBusy = errors.New("busy")
|
||||||
errUnknownPeer = errors.New("peer is unknown or unhealthy")
|
errUnknownPeer = errors.New("peer is unknown or unhealthy")
|
||||||
errBadPeer = errors.New("action from bad peer ignored")
|
errBadPeer = errors.New("action from bad peer ignored")
|
||||||
@ -94,8 +93,9 @@ type Downloader struct {
|
|||||||
dropPeer peerDropFn // Retrieved the TD of our own chain
|
dropPeer peerDropFn // Retrieved the TD of our own chain
|
||||||
|
|
||||||
// Status
|
// Status
|
||||||
synchronising int32
|
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
|
||||||
notified int32
|
synchronising int32
|
||||||
|
notified int32
|
||||||
|
|
||||||
// Channels
|
// Channels
|
||||||
newPeerCh chan *peer
|
newPeerCh chan *peer
|
||||||
@ -202,7 +202,7 @@ func (d *Downloader) Synchronise(id string, head common.Hash) {
|
|||||||
case errBusy:
|
case errBusy:
|
||||||
glog.V(logger.Detail).Infof("Synchronisation already in progress")
|
glog.V(logger.Detail).Infof("Synchronisation already in progress")
|
||||||
|
|
||||||
case errTimeout, errBadPeer, errEmptyHashSet, errInvalidChain, errCrossCheckFailed:
|
case errTimeout, errBadPeer, errStallingPeer, errBannedHead, errEmptyHashSet, errPeersUnavailable, errInvalidChain, errCrossCheckFailed:
|
||||||
glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
|
glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
|
||||||
d.dropPeer(id)
|
d.dropPeer(id)
|
||||||
|
|
||||||
@ -218,6 +218,10 @@ func (d *Downloader) Synchronise(id string, head common.Hash) {
|
|||||||
// it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
|
// it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
|
||||||
// checks fail an error will be returned. This method is synchronous
|
// checks fail an error will be returned. This method is synchronous
|
||||||
func (d *Downloader) synchronise(id string, hash common.Hash) error {
|
func (d *Downloader) synchronise(id string, hash common.Hash) error {
|
||||||
|
// Mock out the synchonisation if testing
|
||||||
|
if d.synchroniseMock != nil {
|
||||||
|
return d.synchroniseMock(id, hash)
|
||||||
|
}
|
||||||
// Make sure only one goroutine is ever allowed past this point at once
|
// Make sure only one goroutine is ever allowed past this point at once
|
||||||
if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
|
if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
|
||||||
return errBusy
|
return errBusy
|
||||||
@ -226,7 +230,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash) error {
|
|||||||
|
|
||||||
// If the head hash is banned, terminate immediately
|
// If the head hash is banned, terminate immediately
|
||||||
if d.banned.Has(hash) {
|
if d.banned.Has(hash) {
|
||||||
return errInvalidChain
|
return errBannedHead
|
||||||
}
|
}
|
||||||
// Post a user notification of the sync (only once per session)
|
// Post a user notification of the sync (only once per session)
|
||||||
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
|
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
|
||||||
|
@ -2,6 +2,7 @@ package downloader
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -69,7 +70,7 @@ func newTester() *downloadTester {
|
|||||||
peerBlocks: make(map[string]map[common.Hash]*types.Block),
|
peerBlocks: make(map[string]map[common.Hash]*types.Block),
|
||||||
}
|
}
|
||||||
var mux event.TypeMux
|
var mux event.TypeMux
|
||||||
downloader := New(&mux, tester.hasBlock, tester.getBlock, nil)
|
downloader := New(&mux, tester.hasBlock, tester.getBlock, tester.dropPeer)
|
||||||
tester.downloader = downloader
|
tester.downloader = downloader
|
||||||
|
|
||||||
return tester
|
return tester
|
||||||
@ -130,6 +131,14 @@ func (dl *downloadTester) newPeer(id string, hashes []common.Hash, blocks map[co
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// dropPeer simulates a hard peer removal from the connection pool.
|
||||||
|
func (dl *downloadTester) dropPeer(id string) {
|
||||||
|
delete(dl.peerHashes, id)
|
||||||
|
delete(dl.peerBlocks, id)
|
||||||
|
|
||||||
|
dl.downloader.UnregisterPeer(id)
|
||||||
|
}
|
||||||
|
|
||||||
// peerGetBlocksFn constructs a getHashes function associated with a particular
|
// peerGetBlocksFn constructs a getHashes function associated with a particular
|
||||||
// peer in the download tester. The returned function can be used to retrieve
|
// peer in the download tester. The returned function can be used to retrieve
|
||||||
// batches of hashes from the particularly requested peer.
|
// batches of hashes from the particularly requested peer.
|
||||||
@ -544,14 +553,14 @@ func TestBannedChainStarvationAttack(t *testing.T) {
|
|||||||
for banned := tester.downloader.banned.Size(); ; {
|
for banned := tester.downloader.banned.Size(); ; {
|
||||||
// Try to sync with the attacker, check hash chain failure
|
// Try to sync with the attacker, check hash chain failure
|
||||||
if _, err := tester.syncTake("attack", hashes[0]); err != errInvalidChain {
|
if _, err := tester.syncTake("attack", hashes[0]); err != errInvalidChain {
|
||||||
|
if tester.downloader.banned.Has(hashes[0]) && err == errBannedHead {
|
||||||
|
break
|
||||||
|
}
|
||||||
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain)
|
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain)
|
||||||
}
|
}
|
||||||
// Check that the ban list grew with at least 1 new item, or all banned
|
// Check that the ban list grew with at least 1 new item, or all banned
|
||||||
bans := tester.downloader.banned.Size()
|
bans := tester.downloader.banned.Size()
|
||||||
if bans < banned+1 {
|
if bans < banned+1 {
|
||||||
if tester.downloader.banned.Has(hashes[0]) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
t.Fatalf("ban count mismatch: have %v, want %v+", bans, banned+1)
|
t.Fatalf("ban count mismatch: have %v, want %v+", bans, banned+1)
|
||||||
}
|
}
|
||||||
banned = bans
|
banned = bans
|
||||||
@ -606,3 +615,47 @@ func TestBannedChainMemoryExhaustionAttack(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tests that misbehaving peers are disconnected, whilst behaving ones are not.
|
||||||
|
func TestAttackerDropping(t *testing.T) {
|
||||||
|
// Define the disconnection requirement for individual errors
|
||||||
|
tests := []struct {
|
||||||
|
result error
|
||||||
|
drop bool
|
||||||
|
}{
|
||||||
|
{nil, false}, // Sync succeeded, all is well
|
||||||
|
{errBusy, false}, // Sync is already in progress, no problem
|
||||||
|
{errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop
|
||||||
|
{errBadPeer, true}, // Peer was deemed bad for some reason, drop it
|
||||||
|
{errStallingPeer, true}, // Peer was detected to be stalling, drop it
|
||||||
|
{errBannedHead, true}, // Peer's head hash is a known bad hash, drop it
|
||||||
|
{errNoPeers, false}, // No peers to download from, soft race, no issue
|
||||||
|
{errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue
|
||||||
|
{errTimeout, true}, // No hashes received in due time, drop the peer
|
||||||
|
{errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end
|
||||||
|
{errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
|
||||||
|
{errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
|
||||||
|
{errCrossCheckFailed, true}, // Hash-origin failed to pass a block cross check, drop
|
||||||
|
{errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
|
||||||
|
{errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
|
||||||
|
}
|
||||||
|
// Run the tests and check disconnection status
|
||||||
|
tester := newTester()
|
||||||
|
for i, tt := range tests {
|
||||||
|
// Register a new peer and ensure it's presence
|
||||||
|
id := fmt.Sprintf("test %d", i)
|
||||||
|
if err := tester.newPeer(id, []common.Hash{knownHash}, nil); err != nil {
|
||||||
|
t.Fatalf("test %d: failed to register new peer: %v", i, err)
|
||||||
|
}
|
||||||
|
if _, ok := tester.peerHashes[id]; !ok {
|
||||||
|
t.Fatalf("test %d: registered peer not found", i)
|
||||||
|
}
|
||||||
|
// Simulate a synchronisation and check the required result
|
||||||
|
tester.downloader.synchroniseMock = func(string, common.Hash) error { return tt.result }
|
||||||
|
|
||||||
|
tester.downloader.Synchronise(id, knownHash)
|
||||||
|
if _, ok := tester.peerHashes[id]; !ok != tt.drop {
|
||||||
|
t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.result, !ok, tt.drop)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user