erigon-pulse/cmd/lightclient/sentinel/proto/ssz_snappy/stream.go
a 9f6fd488d2
[cl lightclient] - packet decoding & ssz code generation (#5558)
* added subscription to gossip

* added all handlers with logs

* disconnecting from peers with goodbye message received

* added time out for stream

* wip

* Remove unused

* remove extra structs

* status handler

* minor clean up

* add structs for altair light client

* begin writing out altair light client sync protocol to figure out what other structs are needed

* remove sszgen

* cleanup pt 1

* cleanup pt 2

* remove go 1.19 function

* less ambigious variable name

* run go fmt

* rename snappy_ssz to ssz_snappy to better align with wire name

* move more structs over

* poof set deadline

Co-authored-by: Enrique Jose Avila Asapche <eavilaasapche@gmail.com>
Co-authored-by: a <a@a.a>
2022-09-29 19:07:57 +02:00

93 lines
2.1 KiB
Go

package ssz_snappy
import (
"encoding/binary"
"fmt"
"io"
"reflect"
"sync"
"gfx.cafe/util/go/bufpool"
ssz "github.com/ferranbt/fastssz"
"github.com/golang/snappy"
"github.com/ledgerwatch/erigon/cmd/lightclient/sentinel/proto"
"github.com/libp2p/go-libp2p/core/network"
)
type StreamCodec struct {
s network.Stream
sr *snappy.Reader
sw *snappy.Writer
mu sync.Mutex
}
func NewStreamCodec(
s network.Stream,
) proto.StreamCodec {
return &StreamCodec{
s: s,
sr: snappy.NewReader(s),
sw: snappy.NewWriter(s),
}
}
// write packet to stream. will add correct header + compression
// will error if packet does not implement ssz.Marshaler interface
func (d *StreamCodec) WritePacket(pkt proto.Packet) (n int, err error) {
if val, ok := pkt.(ssz.Marshaler); ok {
sz := val.SizeSSZ()
bp := bufpool.Get(sz)
defer bufpool.Put(bp)
p := bp.Bytes()
p = append(p, make([]byte, 10)...)
vin := binary.PutVarint(p, int64(sz))
_, err = val.MarshalSSZTo(p[vin:])
if err != nil {
return 0, fmt.Errorf("marshal ssz: %w", err)
}
n, err := d.sw.Write(p)
if err != nil {
return 0, err
}
return n, nil
}
return 0, fmt.Errorf("packet %s does not implement ssz.Marshaler", reflect.TypeOf(pkt))
}
// write raw bytes to stream
func (d *StreamCodec) Write(payload []byte) (n int, err error) {
return d.s.Write(payload)
}
// decode into packet p, then return the packet context
func (d *StreamCodec) Decode(p proto.Packet) (ctx *proto.StreamContext, err error) {
ctx, err = d.readPacket(p)
return
}
func (d *StreamCodec) readPacket(p proto.Packet) (ctx *proto.StreamContext, err error) {
c := &proto.StreamContext{
Packet: p,
Stream: d.s,
Codec: d,
Protocol: d.s.Protocol(),
}
if val, ok := p.(ssz.Unmarshaler); ok {
ln, _, err := proto.ReadUvarint(d.s)
if err != nil {
return c, err
}
c.Raw = make([]byte, ln)
_, err = io.ReadFull(d.sr, c.Raw)
if err != nil {
return c, fmt.Errorf("readPacket: %w", err)
}
err = val.UnmarshalSSZ(c.Raw)
if err != nil {
return c, fmt.Errorf("readPacket: %w", err)
}
}
return c, nil
}