mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-20 17:31:11 +00:00
276 lines
5.9 KiB
Go
276 lines
5.9 KiB
Go
package p2p
|
|
|
|
import (
|
|
"bytes"
|
|
// "fmt"
|
|
"net"
|
|
"time"
|
|
|
|
"github.com/ethereum/eth-go/ethutil"
|
|
)
|
|
|
|
type Connection struct {
|
|
conn net.Conn
|
|
// conn NetworkConnection
|
|
timeout time.Duration
|
|
in chan []byte
|
|
out chan []byte
|
|
err chan *PeerError
|
|
closingIn chan chan bool
|
|
closingOut chan chan bool
|
|
}
|
|
|
|
// const readBufferLength = 2 //for testing
|
|
|
|
const readBufferLength = 1440
|
|
const partialsQueueSize = 10
|
|
const maxPendingQueueSize = 1
|
|
const defaultTimeout = 500
|
|
|
|
var magicToken = []byte{34, 64, 8, 145}
|
|
|
|
func (self *Connection) Open() {
|
|
go self.startRead()
|
|
go self.startWrite()
|
|
}
|
|
|
|
func (self *Connection) Close() {
|
|
self.closeIn()
|
|
self.closeOut()
|
|
}
|
|
|
|
func (self *Connection) closeIn() {
|
|
errc := make(chan bool)
|
|
self.closingIn <- errc
|
|
<-errc
|
|
}
|
|
|
|
func (self *Connection) closeOut() {
|
|
errc := make(chan bool)
|
|
self.closingOut <- errc
|
|
<-errc
|
|
}
|
|
|
|
func NewConnection(conn net.Conn, errchan chan *PeerError) *Connection {
|
|
return &Connection{
|
|
conn: conn,
|
|
timeout: defaultTimeout,
|
|
in: make(chan []byte),
|
|
out: make(chan []byte),
|
|
err: errchan,
|
|
closingIn: make(chan chan bool, 1),
|
|
closingOut: make(chan chan bool, 1),
|
|
}
|
|
}
|
|
|
|
func (self *Connection) Read() <-chan []byte {
|
|
return self.in
|
|
}
|
|
|
|
func (self *Connection) Write() chan<- []byte {
|
|
return self.out
|
|
}
|
|
|
|
func (self *Connection) Error() <-chan *PeerError {
|
|
return self.err
|
|
}
|
|
|
|
func (self *Connection) startRead() {
|
|
payloads := make(chan []byte)
|
|
done := make(chan *PeerError)
|
|
pending := [][]byte{}
|
|
var head []byte
|
|
var wait time.Duration // initally 0 (no delay)
|
|
read := time.After(wait * time.Millisecond)
|
|
|
|
for {
|
|
// if pending empty, nil channel blocks
|
|
var in chan []byte
|
|
if len(pending) > 0 {
|
|
in = self.in // enable send case
|
|
head = pending[0]
|
|
} else {
|
|
in = nil
|
|
}
|
|
|
|
select {
|
|
case <-read:
|
|
go self.read(payloads, done)
|
|
case err := <-done:
|
|
if err == nil { // no error but nothing to read
|
|
if len(pending) < maxPendingQueueSize {
|
|
wait = 100
|
|
} else if wait == 0 {
|
|
wait = 100
|
|
} else {
|
|
wait = 2 * wait
|
|
}
|
|
} else {
|
|
self.err <- err // report error
|
|
wait = 100
|
|
}
|
|
read = time.After(wait * time.Millisecond)
|
|
case payload := <-payloads:
|
|
pending = append(pending, payload)
|
|
if len(pending) < maxPendingQueueSize {
|
|
wait = 0
|
|
} else {
|
|
wait = 100
|
|
}
|
|
read = time.After(wait * time.Millisecond)
|
|
case in <- head:
|
|
pending = pending[1:]
|
|
case errc := <-self.closingIn:
|
|
errc <- true
|
|
close(self.in)
|
|
return
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
func (self *Connection) startWrite() {
|
|
pending := [][]byte{}
|
|
done := make(chan *PeerError)
|
|
writing := false
|
|
for {
|
|
if len(pending) > 0 && !writing {
|
|
writing = true
|
|
go self.write(pending[0], done)
|
|
}
|
|
select {
|
|
case payload := <-self.out:
|
|
pending = append(pending, payload)
|
|
case err := <-done:
|
|
if err == nil {
|
|
pending = pending[1:]
|
|
writing = false
|
|
} else {
|
|
self.err <- err // report error
|
|
}
|
|
case errc := <-self.closingOut:
|
|
errc <- true
|
|
close(self.out)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func pack(payload []byte) (packet []byte) {
|
|
length := ethutil.NumberToBytes(uint32(len(payload)), 32)
|
|
// return error if too long?
|
|
// Write magic token and payload length (first 8 bytes)
|
|
packet = append(magicToken, length...)
|
|
packet = append(packet, payload...)
|
|
return
|
|
}
|
|
|
|
func avoidPanic(done chan *PeerError) {
|
|
if rec := recover(); rec != nil {
|
|
err := NewPeerError(MiscError, " %v", rec)
|
|
logger.Debugln(err)
|
|
done <- err
|
|
}
|
|
}
|
|
|
|
func (self *Connection) write(payload []byte, done chan *PeerError) {
|
|
defer avoidPanic(done)
|
|
var err *PeerError
|
|
_, ok := self.conn.Write(pack(payload))
|
|
if ok != nil {
|
|
err = NewPeerError(WriteError, " %v", ok)
|
|
logger.Debugln(err)
|
|
}
|
|
done <- err
|
|
}
|
|
|
|
func (self *Connection) read(payloads chan []byte, done chan *PeerError) {
|
|
//defer avoidPanic(done)
|
|
|
|
partials := make(chan []byte, partialsQueueSize)
|
|
errc := make(chan *PeerError)
|
|
go self.readPartials(partials, errc)
|
|
|
|
packet := []byte{}
|
|
length := 8
|
|
start := true
|
|
var err *PeerError
|
|
out:
|
|
for {
|
|
// appends partials read via connection until packet is
|
|
// - either parseable (>=8bytes)
|
|
// - or complete (payload fully consumed)
|
|
for len(packet) < length {
|
|
partial, ok := <-partials
|
|
if !ok { // partials channel is closed
|
|
err = <-errc
|
|
if err == nil && len(packet) > 0 {
|
|
if start {
|
|
err = NewPeerError(PacketTooShort, "%v", packet)
|
|
} else {
|
|
err = NewPeerError(PayloadTooShort, "%d < %d", len(packet), length)
|
|
}
|
|
}
|
|
break out
|
|
}
|
|
packet = append(packet, partial...)
|
|
}
|
|
if start {
|
|
// at least 8 bytes read, can validate packet
|
|
if bytes.Compare(magicToken, packet[:4]) != 0 {
|
|
err = NewPeerError(MagicTokenMismatch, " received %v", packet[:4])
|
|
break
|
|
}
|
|
length = int(ethutil.BytesToNumber(packet[4:8]))
|
|
packet = packet[8:]
|
|
|
|
if length > 0 {
|
|
start = false // now consuming payload
|
|
} else { //penalize peer but read on
|
|
self.err <- NewPeerError(EmptyPayload, "")
|
|
length = 8
|
|
}
|
|
} else {
|
|
// packet complete (payload fully consumed)
|
|
payloads <- packet[:length]
|
|
packet = packet[length:] // resclice packet
|
|
start = true
|
|
length = 8
|
|
}
|
|
}
|
|
|
|
// this stops partials read via the connection, should we?
|
|
//if err != nil {
|
|
// select {
|
|
// case errc <- err
|
|
// default:
|
|
//}
|
|
done <- err
|
|
}
|
|
|
|
func (self *Connection) readPartials(partials chan []byte, errc chan *PeerError) {
|
|
defer close(partials)
|
|
for {
|
|
// Give buffering some time
|
|
self.conn.SetReadDeadline(time.Now().Add(self.timeout * time.Millisecond))
|
|
buffer := make([]byte, readBufferLength)
|
|
// read partial from connection
|
|
bytesRead, err := self.conn.Read(buffer)
|
|
if err == nil || err.Error() == "EOF" {
|
|
if bytesRead > 0 {
|
|
partials <- buffer[:bytesRead]
|
|
}
|
|
if err != nil && err.Error() == "EOF" {
|
|
break
|
|
}
|
|
} else {
|
|
// unexpected error, report to errc
|
|
err := NewPeerError(ReadError, " %v", err)
|
|
logger.Debugln(err)
|
|
errc <- err
|
|
return // will close partials channel
|
|
}
|
|
}
|
|
close(errc)
|
|
}
|