diff --git a/block_pool.go b/block_pool.go index f5c53b9f7..e31babfd6 100644 --- a/block_pool.go +++ b/block_pool.go @@ -11,6 +11,7 @@ import ( "github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethlog" "github.com/ethereum/eth-go/ethutil" + "github.com/ethereum/eth-go/ethwire" ) var poollogger = ethlog.NewLogger("BPOOL") @@ -38,6 +39,8 @@ type BlockPool struct { downloadStartedAt time.Time ChainLength, BlocksProcessed int + + peer *Peer } func NewBlockPool(eth *Ethereum) *BlockPool { @@ -74,6 +77,27 @@ func (self *BlockPool) Blocks() (blocks ethchain.Blocks) { return } +func (self *BlockPool) FetchHashes(peer *Peer) { + highestTd := self.eth.HighestTDPeer() + + if (self.peer == nil && peer.td.Cmp(highestTd) >= 0) || (self.peer != nil && peer.td.Cmp(self.peer.td) >= 0) || self.peer == peer { + if self.peer != peer { + poollogger.Debugf("Found better suitable peer (%v vs %v)\n", self.td, peer.td) + } + + self.peer = peer + self.td = peer.td + + if !self.HasLatestHash() { + peer.doneFetchingHashes = false + + const amount = 256 + peerlogger.Debugf("Fetching hashes (%d)\n", amount) + peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{peer.lastReceivedHash, uint32(amount)})) + } + } +} + func (self *BlockPool) AddHash(hash []byte, peer *Peer) { self.mut.Lock() defer self.mut.Unlock() @@ -99,7 +123,7 @@ func (self *BlockPool) Add(b *ethchain.Block, peer *Peer) { if !self.eth.BlockChain().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes { poollogger.Infof("Unknown block, requesting parent (%x...)\n", b.PrevHash[0:4]) - //peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)})) + peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)})) } } else if self.pool[hash] != nil { self.pool[hash].block = b @@ -227,7 +251,7 @@ out: } func (self *BlockPool) chainThread() { - procTimer := time.NewTicker(500 * time.Millisecond) + procTimer := time.NewTicker(1000 * time.Millisecond) out: for { select { @@ -237,6 +261,14 @@ out: blocks := self.Blocks() ethchain.BlockBy(ethchain.Number).Sort(blocks) + // Find common block + for i, block := range blocks { + if self.eth.BlockChain().HasBlock(block.PrevHash) { + blocks = blocks[i:] + break + } + } + if len(blocks) > 0 { if self.eth.BlockChain().HasBlock(blocks[0].PrevHash) { for i, block := range blocks[1:] { @@ -253,13 +285,51 @@ out: } } - // Handle in batches of 4k - max := int(math.Min(4000, float64(len(blocks)))) - for _, block := range blocks[:max] { - self.eth.Eventer().Post("block", block) + if len(blocks) > 0 { + self.eth.Eventer().Post("blocks", blocks) + + } + + var err error + for i, block := range blocks { + //self.eth.Eventer().Post("block", block) + err = self.eth.StateManager().Process(block, false) + if err != nil { + poollogger.Infoln(err) + poollogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4]) + poollogger.Debugln(block) + + blocks = blocks[i:] + + break + } self.Remove(block.Hash()) } + + if err != nil { + // Remove this bad chain + for _, block := range blocks { + self.Remove(block.Hash()) + } + + poollogger.Debugf("Punishing peer for supplying bad chain (%v)\n", self.peer.conn.RemoteAddr()) + // This peer gave us bad hashes and made us fetch a bad chain, therefor he shall be punished. + self.eth.BlacklistPeer(self.peer) + self.peer.StopWithReason(DiscBadPeer) + self.td = ethutil.Big0 + self.peer = nil + } + + /* + // Handle in batches of 4k + //max := int(math.Min(4000, float64(len(blocks)))) + for _, block := range blocks { + self.eth.Eventer().Post("block", block) + + self.Remove(block.Hash()) + } + */ } } } diff --git a/ethereum.go b/ethereum.go index 5fb3f2909..013214726 100644 --- a/ethereum.go +++ b/ethereum.go @@ -69,6 +69,8 @@ type Ethereum struct { Addr net.Addr Port string + blacklist [][]byte + peerMut sync.Mutex // Capabilities for outgoing peers @@ -211,6 +213,10 @@ func (s *Ethereum) HighestTDPeer() (td *big.Int) { return } +func (self *Ethereum) BlacklistPeer(peer *Peer) { + self.blacklist = append(self.blacklist, peer.pubkey) +} + func (s *Ethereum) AddPeer(conn net.Conn) { peer := NewPeer(conn, s, true) diff --git a/peer.go b/peer.go index b8f850b5a..2806e8a11 100644 --- a/peer.go +++ b/peer.go @@ -39,15 +39,15 @@ const ( // Values are given explicitly instead of by iota because these values are // defined by the wire protocol spec; it is easier for humans to ensure // correctness when values are explicit. - DiscReRequested = 0x00 - DiscReTcpSysErr = 0x01 - DiscBadProto = 0x02 - DiscBadPeer = 0x03 - DiscTooManyPeers = 0x04 - DiscConnDup = 0x05 - DiscGenesisErr = 0x06 - DiscProtoErr = 0x07 - DiscQuitting = 0x08 + DiscRequested DiscReason = iota + DiscReTcpSysErr + DiscBadProto + DiscBadPeer + DiscTooManyPeers + DiscConnDup + DiscGenesisErr + DiscProtoErr + DiscQuitting ) var discReasonToString = []string{ @@ -554,19 +554,22 @@ func (self *Peer) FetchBlocks(hashes [][]byte) { } func (self *Peer) FetchHashes() { - self.doneFetchingHashes = false - blockPool := self.ethereum.blockPool + blockPool.FetchHashes(self) - if self.td.Cmp(self.ethereum.HighestTDPeer()) >= 0 { - blockPool.td = self.td + /* + if self.td.Cmp(self.ethereum.HighestTDPeer()) >= 0 { + blockPool.td = self.td - if !blockPool.HasLatestHash() { - const amount = 256 - peerlogger.Debugf("Fetching hashes (%d)\n", amount) - self.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{self.lastReceivedHash, uint32(amount)})) + if !blockPool.HasLatestHash() { + self.doneFetchingHashes = false + + const amount = 256 + peerlogger.Debugf("Fetching hashes (%d)\n", amount) + self.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{self.lastReceivedHash, uint32(amount)})) + } } - } + */ } func (self *Peer) FetchingHashes() bool { @@ -631,18 +634,22 @@ func (p *Peer) Start() { } func (p *Peer) Stop() { + p.StopWithReason(DiscRequested) +} + +func (p *Peer) StopWithReason(reason DiscReason) { if atomic.AddInt32(&p.disconnect, 1) != 1 { return } - close(p.quit) - if atomic.LoadInt32(&p.connected) != 0 { - p.writeMessage(ethwire.NewMessage(ethwire.MsgDiscTy, "")) - p.conn.Close() - } - // Pre-emptively remove the peer; don't wait for reaping. We already know it's dead if we are here p.ethereum.RemovePeer(p) + + close(p.quit) + if atomic.LoadInt32(&p.connected) != 0 { + p.writeMessage(ethwire.NewMessage(ethwire.MsgDiscTy, reason)) + p.conn.Close() + } } func (p *Peer) peersMessage() *ethwire.Msg { @@ -764,6 +771,16 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) { return } + // Check for blacklisting + for _, pk := range p.ethereum.blacklist { + if bytes.Compare(pk, pub) == 0 { + peerlogger.Debugf("Blacklisted peer tried to connect (%x...)\n", pubkey[0:4]) + p.StopWithReason(DiscBadPeer) + + return + } + } + usedPub := 0 // This peer is already added to the peerlist so we expect to find a double pubkey at least once eachPeer(p.ethereum.Peers(), func(peer *Peer, e *list.Element) {