eth, eth/downloader: pass the eth protocol version through

This commit is contained in:
Péter Szilágyi 2015-06-29 17:37:55 +03:00
parent aac2b6ae4c
commit af51dc4d63
4 changed files with 45 additions and 36 deletions

View File

@ -175,7 +175,7 @@ func (d *Downloader) Synchronising() bool {
// RegisterPeer injects a new download peer into the set of block source to be // RegisterPeer injects a new download peer into the set of block source to be
// used for fetching hashes and blocks from. // used for fetching hashes and blocks from.
func (d *Downloader) RegisterPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error { func (d *Downloader) RegisterPeer(id string, version int, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error {
// If the peer wants to send a banned hash, reject // If the peer wants to send a banned hash, reject
if d.banned.Has(head) { if d.banned.Has(head) {
glog.V(logger.Debug).Infoln("Register rejected, head hash banned:", id) glog.V(logger.Debug).Infoln("Register rejected, head hash banned:", id)
@ -183,7 +183,7 @@ func (d *Downloader) RegisterPeer(id string, head common.Hash, getHashes hashFet
} }
// Otherwise try to construct and register the peer // Otherwise try to construct and register the peer
glog.V(logger.Detail).Infoln("Registering peer", id) glog.V(logger.Detail).Infoln("Registering peer", id)
if err := d.peers.Register(newPeer(id, head, getHashes, getBlocks)); err != nil { if err := d.peers.Register(newPeer(id, version, head, getHashes, getBlocks)); err != nil {
glog.V(logger.Error).Infoln("Register failed:", err) glog.V(logger.Error).Infoln("Register failed:", err)
return err return err
} }

View File

@ -16,6 +16,11 @@ import (
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
) )
const (
eth60 = 60
eth61 = 61
)
var ( var (
testdb, _ = ethdb.NewMemDatabase() testdb, _ = ethdb.NewMemDatabase()
genesis = core.GenesisBlockForTesting(testdb, common.Address{}, big.NewInt(0)) genesis = core.GenesisBlockForTesting(testdb, common.Address{}, big.NewInt(0))
@ -112,15 +117,15 @@ func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) {
} }
// newPeer registers a new block download source into the downloader. // newPeer registers a new block download source into the downloader.
func (dl *downloadTester) newPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block) error { func (dl *downloadTester) newPeer(id string, version int, hashes []common.Hash, blocks map[common.Hash]*types.Block) error {
return dl.newSlowPeer(id, hashes, blocks, 0) return dl.newSlowPeer(id, version, hashes, blocks, 0)
} }
// newSlowPeer registers a new block download source into the downloader, with a // newSlowPeer registers a new block download source into the downloader, with a
// specific delay time on processing the network packets sent to it, simulating // specific delay time on processing the network packets sent to it, simulating
// potentially slow network IO. // potentially slow network IO.
func (dl *downloadTester) newSlowPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block, delay time.Duration) error { func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Hash, blocks map[common.Hash]*types.Block, delay time.Duration) error {
err := dl.downloader.RegisterPeer(id, hashes[0], dl.peerGetHashesFn(id, delay), dl.peerGetBlocksFn(id, delay)) err := dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetHashesFn(id, delay), dl.peerGetBlocksFn(id, delay))
if err == nil { if err == nil {
// Assign the owned hashes and blocks to the peer (deep copy) // Assign the owned hashes and blocks to the peer (deep copy)
dl.peerHashes[id] = make([]common.Hash, len(hashes)) dl.peerHashes[id] = make([]common.Hash, len(hashes))
@ -201,7 +206,7 @@ func TestSynchronisation(t *testing.T) {
hashes, blocks := makeChain(targetBlocks, 0, genesis) hashes, blocks := makeChain(targetBlocks, 0, genesis)
tester := newTester() tester := newTester()
tester.newPeer("peer", hashes, blocks) tester.newPeer("peer", eth60, hashes, blocks)
// Synchronise with the peer and make sure all blocks were retrieved // Synchronise with the peer and make sure all blocks were retrieved
if err := tester.sync("peer"); err != nil { if err := tester.sync("peer"); err != nil {
@ -232,7 +237,7 @@ func TestCancel(t *testing.T) {
hashes, blocks := makeChain(targetBlocks, 0, genesis) hashes, blocks := makeChain(targetBlocks, 0, genesis)
tester := newTester() tester := newTester()
tester.newPeer("peer", hashes, blocks) tester.newPeer("peer", eth60, hashes, blocks)
// Make sure canceling works with a pristine downloader // Make sure canceling works with a pristine downloader
tester.downloader.cancel() tester.downloader.cancel()
@ -259,7 +264,7 @@ func TestThrottling(t *testing.T) {
hashes, blocks := makeChain(targetBlocks, 0, genesis) hashes, blocks := makeChain(targetBlocks, 0, genesis)
tester := newTester() tester := newTester()
tester.newPeer("peer", hashes, blocks) tester.newPeer("peer", eth60, hashes, blocks)
// Wrap the importer to allow stepping // Wrap the importer to allow stepping
done := make(chan int) done := make(chan int)
@ -317,7 +322,7 @@ func TestMultiSynchronisation(t *testing.T) {
tester := newTester() tester := newTester()
for i := 0; i < targetPeers; i++ { for i := 0; i < targetPeers; i++ {
id := fmt.Sprintf("peer #%d", i) id := fmt.Sprintf("peer #%d", i)
tester.newPeer(id, hashes[i*blockCacheLimit:], blocks) tester.newPeer(id, eth60, hashes[i*blockCacheLimit:], blocks)
} }
// Synchronise with the middle peer and make sure half of the blocks were retrieved // Synchronise with the middle peer and make sure half of the blocks were retrieved
id := fmt.Sprintf("peer #%d", targetPeers/2) id := fmt.Sprintf("peer #%d", targetPeers/2)
@ -347,8 +352,8 @@ func TestSlowSynchronisation(t *testing.T) {
targetIODelay := time.Second targetIODelay := time.Second
hashes, blocks := makeChain(targetBlocks, 0, genesis) hashes, blocks := makeChain(targetBlocks, 0, genesis)
tester.newSlowPeer("fast", hashes, blocks, 0) tester.newSlowPeer("fast", eth60, hashes, blocks, 0)
tester.newSlowPeer("slow", hashes, blocks, targetIODelay) tester.newSlowPeer("slow", eth60, hashes, blocks, targetIODelay)
// Try to sync with the peers (pull hashes from fast) // Try to sync with the peers (pull hashes from fast)
start := time.Now() start := time.Now()
@ -370,13 +375,14 @@ func TestSlowSynchronisation(t *testing.T) {
func TestNonExistingParentAttack(t *testing.T) { func TestNonExistingParentAttack(t *testing.T) {
tester := newTester() tester := newTester()
// Forge a single-link chain with a forged header
hashes, blocks := makeChain(1, 0, genesis) hashes, blocks := makeChain(1, 0, genesis)
tester.newPeer("valid", hashes, blocks) tester.newPeer("valid", eth60, hashes, blocks)
wrongblock := types.NewBlock(&types.Header{}, nil, nil, nil) wrongblock := types.NewBlock(&types.Header{}, nil, nil, nil)
wrongblock.Td = blocks[hashes[0]].Td wrongblock.Td = blocks[hashes[0]].Td
hashes, blocks = makeChain(1, 0, wrongblock) hashes, blocks = makeChain(1, 0, wrongblock)
tester.newPeer("attack", hashes, blocks) tester.newPeer("attack", eth60, hashes, blocks)
// Try and sync with the malicious node and check that it fails // Try and sync with the malicious node and check that it fails
if err := tester.sync("attack"); err == nil { if err := tester.sync("attack"); err == nil {
@ -401,8 +407,8 @@ func TestRepeatingHashAttack(t *testing.T) { // TODO: Is this thing valid??
// Create a valid chain, but drop the last link // Create a valid chain, but drop the last link
hashes, blocks := makeChain(blockCacheLimit, 0, genesis) hashes, blocks := makeChain(blockCacheLimit, 0, genesis)
tester.newPeer("valid", hashes, blocks) tester.newPeer("valid", eth60, hashes, blocks)
tester.newPeer("attack", hashes[:len(hashes)-1], blocks) tester.newPeer("attack", eth60, hashes[:len(hashes)-1], blocks)
// Try and sync with the malicious node // Try and sync with the malicious node
errc := make(chan error) errc := make(chan error)
@ -431,10 +437,10 @@ func TestNonExistingBlockAttack(t *testing.T) {
// Create a valid chain, but forge the last link // Create a valid chain, but forge the last link
hashes, blocks := makeChain(blockCacheLimit, 0, genesis) hashes, blocks := makeChain(blockCacheLimit, 0, genesis)
tester.newPeer("valid", hashes, blocks) tester.newPeer("valid", eth60, hashes, blocks)
hashes[len(hashes)/2] = common.Hash{} hashes[len(hashes)/2] = common.Hash{}
tester.newPeer("attack", hashes, blocks) tester.newPeer("attack", eth60, hashes, blocks)
// Try and sync with the malicious node and check that it fails // Try and sync with the malicious node and check that it fails
if err := tester.sync("attack"); err != errPeersUnavailable { if err := tester.sync("attack"); err != errPeersUnavailable {
@ -453,7 +459,7 @@ func TestInvalidHashOrderAttack(t *testing.T) {
// Create a valid long chain, but reverse some hashes within // Create a valid long chain, but reverse some hashes within
hashes, blocks := makeChain(4*blockCacheLimit, 0, genesis) hashes, blocks := makeChain(4*blockCacheLimit, 0, genesis)
tester.newPeer("valid", hashes, blocks) tester.newPeer("valid", eth60, hashes, blocks)
chunk1 := make([]common.Hash, blockCacheLimit) chunk1 := make([]common.Hash, blockCacheLimit)
chunk2 := make([]common.Hash, blockCacheLimit) chunk2 := make([]common.Hash, blockCacheLimit)
@ -462,7 +468,7 @@ func TestInvalidHashOrderAttack(t *testing.T) {
copy(hashes[2*blockCacheLimit:], chunk1) copy(hashes[2*blockCacheLimit:], chunk1)
copy(hashes[blockCacheLimit:], chunk2) copy(hashes[blockCacheLimit:], chunk2)
tester.newPeer("attack", hashes, blocks) tester.newPeer("attack", eth60, hashes, blocks)
// Try and sync with the malicious node and check that it fails // Try and sync with the malicious node and check that it fails
if err := tester.sync("attack"); err != errInvalidChain { if err := tester.sync("attack"); err != errInvalidChain {
@ -489,8 +495,8 @@ func TestMadeupHashChainAttack(t *testing.T) {
rand.Read(randomHashes[i][:]) rand.Read(randomHashes[i][:])
} }
tester.newPeer("valid", hashes, blocks) tester.newPeer("valid", eth60, hashes, blocks)
tester.newPeer("attack", randomHashes, nil) tester.newPeer("attack", eth60, randomHashes, nil)
// Try and sync with the malicious node and check that it fails // Try and sync with the malicious node and check that it fails
if err := tester.sync("attack"); err != errCrossCheckFailed { if err := tester.sync("attack"); err != errCrossCheckFailed {
@ -517,7 +523,7 @@ func TestMadeupHashChainDrippingAttack(t *testing.T) {
// Try and sync with the attacker, one hash at a time // Try and sync with the attacker, one hash at a time
tester.maxHashFetch = 1 tester.maxHashFetch = 1
tester.newPeer("attack", randomHashes, nil) tester.newPeer("attack", eth60, randomHashes, nil)
if err := tester.sync("attack"); err != errStallingPeer { if err := tester.sync("attack"); err != errStallingPeer {
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer)
} }
@ -540,7 +546,7 @@ func TestMadeupBlockChainAttack(t *testing.T) {
} }
// Try and sync with the malicious node and check that it fails // Try and sync with the malicious node and check that it fails
tester := newTester() tester := newTester()
tester.newPeer("attack", gapped, blocks) tester.newPeer("attack", eth60, gapped, blocks)
if err := tester.sync("attack"); err != errCrossCheckFailed { if err := tester.sync("attack"); err != errCrossCheckFailed {
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed)
} }
@ -548,13 +554,13 @@ func TestMadeupBlockChainAttack(t *testing.T) {
blockSoftTTL = defaultBlockTTL blockSoftTTL = defaultBlockTTL
crossCheckCycle = defaultCrossCheckCycle crossCheckCycle = defaultCrossCheckCycle
tester.newPeer("valid", hashes, blocks) tester.newPeer("valid", eth60, hashes, blocks)
if err := tester.sync("valid"); err != nil { if err := tester.sync("valid"); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err) t.Fatalf("failed to synchronise blocks: %v", err)
} }
} }
// tests that if one/multiple malicious peers try to feed a banned blockchain to // Tests that if one/multiple malicious peers try to feed a banned blockchain to
// the downloader, it will not keep refetching the same chain indefinitely, but // the downloader, it will not keep refetching the same chain indefinitely, but
// gradually block pieces of it, until its head is also blocked. // gradually block pieces of it, until its head is also blocked.
func TestBannedChainStarvationAttack(t *testing.T) { func TestBannedChainStarvationAttack(t *testing.T) {
@ -565,8 +571,8 @@ func TestBannedChainStarvationAttack(t *testing.T) {
// Create the tester and ban the selected hash. // Create the tester and ban the selected hash.
tester := newTester() tester := newTester()
tester.downloader.banned.Add(forkHashes[fork-1]) tester.downloader.banned.Add(forkHashes[fork-1])
tester.newPeer("valid", hashes, blocks) tester.newPeer("valid", eth60, hashes, blocks)
tester.newPeer("attack", forkHashes, forkBlocks) tester.newPeer("attack", eth60, forkHashes, forkBlocks)
// Iteratively try to sync, and verify that the banned hash list grows until // Iteratively try to sync, and verify that the banned hash list grows until
// the head of the invalid chain is blocked too. // the head of the invalid chain is blocked too.
@ -586,7 +592,7 @@ func TestBannedChainStarvationAttack(t *testing.T) {
banned = bans banned = bans
} }
// Check that after banning an entire chain, bad peers get dropped // Check that after banning an entire chain, bad peers get dropped
if err := tester.newPeer("new attacker", forkHashes, forkBlocks); err != errBannedHead { if err := tester.newPeer("new attacker", eth60, forkHashes, forkBlocks); err != errBannedHead {
t.Fatalf("peer registration mismatch: have %v, want %v", err, errBannedHead) t.Fatalf("peer registration mismatch: have %v, want %v", err, errBannedHead)
} }
if peer := tester.downloader.peers.Peer("new attacker"); peer != nil { if peer := tester.downloader.peers.Peer("new attacker"); peer != nil {
@ -618,8 +624,8 @@ func TestBannedChainMemoryExhaustionAttack(t *testing.T) {
MaxBlockFetch = 4 MaxBlockFetch = 4
maxBannedHashes = 256 maxBannedHashes = 256
tester.newPeer("valid", hashes, blocks) tester.newPeer("valid", eth60, hashes, blocks)
tester.newPeer("attack", forkHashes, forkBlocks) tester.newPeer("attack", eth60, forkHashes, forkBlocks)
// Iteratively try to sync, and verify that the banned hash list grows until // Iteratively try to sync, and verify that the banned hash list grows until
// the head of the invalid chain is blocked too. // the head of the invalid chain is blocked too.
@ -664,7 +670,7 @@ func TestOverlappingDeliveryAttack(t *testing.T) {
// Register an attacker that always returns non-requested blocks too // Register an attacker that always returns non-requested blocks too
tester := newTester() tester := newTester()
tester.newPeer("attack", hashes, blocks) tester.newPeer("attack", eth60, hashes, blocks)
rawGetBlocks := tester.downloader.peers.Peer("attack").getBlocks rawGetBlocks := tester.downloader.peers.Peer("attack").getBlocks
tester.downloader.peers.Peer("attack").getBlocks = func(request []common.Hash) error { tester.downloader.peers.Peer("attack").getBlocks = func(request []common.Hash) error {
@ -712,7 +718,7 @@ func TestHashAttackerDropping(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
// Register a new peer and ensure it's presence // Register a new peer and ensure it's presence
id := fmt.Sprintf("test %d", i) id := fmt.Sprintf("test %d", i)
if err := tester.newPeer(id, []common.Hash{genesis.Hash()}, nil); err != nil { if err := tester.newPeer(id, eth60, []common.Hash{genesis.Hash()}, nil); err != nil {
t.Fatalf("test %d: failed to register new peer: %v", i, err) t.Fatalf("test %d: failed to register new peer: %v", i, err)
} }
if _, ok := tester.peerHashes[id]; !ok { if _, ok := tester.peerHashes[id]; !ok {
@ -744,7 +750,7 @@ func TestBlockAttackerDropping(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
// Register a new peer and ensure it's presence // Register a new peer and ensure it's presence
id := fmt.Sprintf("test %d", i) id := fmt.Sprintf("test %d", i)
if err := tester.newPeer(id, []common.Hash{common.Hash{}}, nil); err != nil { if err := tester.newPeer(id, eth60, []common.Hash{common.Hash{}}, nil); err != nil {
t.Fatalf("test %d: failed to register new peer: %v", i, err) t.Fatalf("test %d: failed to register new peer: %v", i, err)
} }
if _, ok := tester.peerHashes[id]; !ok { if _, ok := tester.peerHashes[id]; !ok {

View File

@ -39,11 +39,13 @@ type peer struct {
getHashes hashFetcherFn // Method to retrieve a batch of hashes (mockable for testing) getHashes hashFetcherFn // Method to retrieve a batch of hashes (mockable for testing)
getBlocks blockFetcherFn // Method to retrieve a batch of blocks (mockable for testing) getBlocks blockFetcherFn // Method to retrieve a batch of blocks (mockable for testing)
version int // Eth protocol version number to switch strategies
} }
// newPeer create a new downloader peer, with specific hash and block retrieval // newPeer create a new downloader peer, with specific hash and block retrieval
// mechanisms. // mechanisms.
func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer { func newPeer(id string, version int, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer {
return &peer{ return &peer{
id: id, id: id,
head: head, head: head,
@ -51,6 +53,7 @@ func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blo
getHashes: getHashes, getHashes: getHashes,
getBlocks: getBlocks, getBlocks: getBlocks,
ignored: set.New(), ignored: set.New(),
version: version,
} }
} }

View File

@ -181,7 +181,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
defer pm.removePeer(p.id) defer pm.removePeer(p.id)
// Register the peer in the downloader. If the downloader considers it banned, we disconnect // Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.RequestHashes, p.RequestBlocks); err != nil { if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(), p.RequestHashes, p.RequestBlocks); err != nil {
return err return err
} }
// Propagate existing transactions. new transactions appearing // Propagate existing transactions. new transactions appearing