2022-11-25 16:38:22 +01:00
package network
import (
"context"
2023-09-29 23:42:07 +02:00
"fmt"
2023-08-08 18:21:19 -05:00
"sync"
2022-11-25 16:38:22 +01:00
2023-12-30 09:55:01 -06:00
"github.com/ledgerwatch/erigon-lib/common"
2023-06-05 01:52:55 +02:00
"github.com/ledgerwatch/erigon/cl/freezer"
2023-12-30 09:55:01 -06:00
"github.com/ledgerwatch/erigon/cl/gossip"
2023-05-15 00:12:24 +02:00
"github.com/ledgerwatch/erigon/cl/phase1/forkchoice"
2023-08-08 18:21:19 -05:00
"github.com/ledgerwatch/erigon/cl/sentinel/peers"
2023-05-15 00:12:24 +02:00
2022-11-29 00:00:40 +01:00
"github.com/ledgerwatch/erigon-lib/gointerfaces/sentinel"
2023-05-11 04:54:20 -07:00
"github.com/ledgerwatch/erigon-lib/types/ssz"
2023-01-17 02:22:05 +01:00
"github.com/ledgerwatch/erigon/cl/clparams"
2022-11-25 16:38:22 +01:00
"github.com/ledgerwatch/erigon/cl/cltypes"
2023-04-17 20:06:50 +02:00
"github.com/ledgerwatch/erigon/cl/utils"
2022-11-25 16:38:22 +01:00
"github.com/ledgerwatch/log/v3"
)
2023-04-17 20:06:50 +02:00
// Gossip manager is sending all messages to fork choice or others
2022-11-25 16:38:22 +01:00
type GossipManager struct {
2023-06-05 01:52:55 +02:00
recorder freezer . Freezer
2023-04-17 20:06:50 +02:00
forkChoice * forkchoice . ForkChoiceStore
sentinel sentinel . SentinelClient
// configs
beaconConfig * clparams . BeaconChainConfig
genesisConfig * clparams . GenesisConfig
2023-08-08 18:21:19 -05:00
mu sync . RWMutex
subs map [ int ] chan * peers . PeeredObject [ * cltypes . SignedBeaconBlock ]
totalSubs int
2022-11-25 16:38:22 +01:00
}
2023-08-15 21:32:40 -05:00
func NewGossipReceiver ( s sentinel . SentinelClient , forkChoice * forkchoice . ForkChoiceStore ,
2023-06-05 01:52:55 +02:00
beaconConfig * clparams . BeaconChainConfig , genesisConfig * clparams . GenesisConfig , recorder freezer . Freezer ) * GossipManager {
2022-11-25 16:38:22 +01:00
return & GossipManager {
2023-04-17 20:06:50 +02:00
sentinel : s ,
forkChoice : forkChoice ,
beaconConfig : beaconConfig ,
genesisConfig : genesisConfig ,
2023-06-05 01:52:55 +02:00
recorder : recorder ,
2023-08-08 18:21:19 -05:00
subs : make ( map [ int ] chan * peers . PeeredObject [ * cltypes . SignedBeaconBlock ] ) ,
2022-11-25 16:38:22 +01:00
}
}
2023-08-08 18:21:19 -05:00
// this subscribes to signed beacon blocks..... i wish this was better
func ( g * GossipManager ) SubscribeSignedBeaconBlocks ( ctx context . Context ) <- chan * peers . PeeredObject [ * cltypes . SignedBeaconBlock ] {
// a really big limit because why not....
out := make ( chan * peers . PeeredObject [ * cltypes . SignedBeaconBlock ] , 512 )
g . mu . Lock ( )
g . totalSubs ++
idx := g . totalSubs
g . subs [ idx ] = out
g . mu . Unlock ( )
go func ( ) {
<- ctx . Done ( )
g . mu . Lock ( )
delete ( g . subs , idx )
g . mu . Unlock ( )
} ( )
return out
2023-09-29 23:42:07 +02:00
}
2023-08-08 18:21:19 -05:00
2023-09-29 23:42:07 +02:00
func operationsContract [ T ssz . EncodableSSZ ] ( ctx context . Context , g * GossipManager , l log . Ctx , data * sentinel . GossipData , version int , name string , fn func ( T , bool ) error ) error {
var t T
object := t . Clone ( ) . ( T )
if err := object . DecodeSSZ ( common . CopyBytes ( data . Data ) , version ) ; err != nil {
g . sentinel . BanPeer ( ctx , data . Peer )
l [ "at" ] = fmt . Sprintf ( "decoding %s" , name )
return err
}
if err := fn ( object /*test=*/ , false ) ; err != nil {
l [ "at" ] = fmt . Sprintf ( "verify %s" , name )
return err
}
if _ , err := g . sentinel . PublishGossip ( ctx , data ) ; err != nil {
log . Debug ( "failed publish gossip" , "err" , err )
}
return nil
2023-08-08 18:21:19 -05:00
}
2023-09-29 23:42:07 +02:00
func ( g * GossipManager ) onRecv ( ctx context . Context , data * sentinel . GossipData , l log . Ctx ) ( err error ) {
defer func ( ) {
r := recover ( )
if r != nil {
err = fmt . Errorf ( "%v" , r )
}
} ( )
2022-11-25 16:38:22 +01:00
2023-05-17 18:06:25 -05:00
currentEpoch := utils . GetCurrentEpoch ( g . genesisConfig . GenesisTime , g . beaconConfig . SecondsPerSlot , g . beaconConfig . SlotsPerEpoch )
version := g . beaconConfig . GetCurrentStateVersion ( currentEpoch )
// Depending on the type of the received data, we create an instance of a specific type that implements the ObjectSSZ interface,
// then attempts to deserialize the received data into it.
// If the deserialization fails, an error is logged and the loop returns to the next iteration.
// If the deserialization is successful, the object is set to the deserialized value and the loop returns to the next iteration.
var object ssz . Unmarshaler
2023-12-30 09:55:01 -06:00
switch data . Name {
case gossip . TopicNameBeaconBlock :
2023-08-10 22:34:58 +02:00
object = cltypes . NewSignedBeaconBlock ( g . beaconConfig )
2023-05-17 18:06:25 -05:00
if err := object . DecodeSSZ ( common . CopyBytes ( data . Data ) , int ( version ) ) ; err != nil {
2023-08-15 21:32:40 -05:00
g . sentinel . BanPeer ( ctx , data . Peer )
2023-05-17 18:06:25 -05:00
l [ "at" ] = "decoding block"
return err
}
block := object . ( * cltypes . SignedBeaconBlock )
l [ "slot" ] = block . Block . Slot
currentSlotByTime := utils . GetCurrentSlot ( g . genesisConfig . GenesisTime , g . beaconConfig . SecondsPerSlot )
maxGossipSlotThreshold := uint64 ( 4 )
// Skip if slot is too far behind.
if block . Block . Slot + maxGossipSlotThreshold < currentSlotByTime {
return nil
}
2023-06-07 20:01:32 +02:00
if block . Block . Slot == currentSlotByTime {
2023-08-15 21:32:40 -05:00
if _ , err := g . sentinel . PublishGossip ( ctx , data ) ; err != nil {
2023-05-17 18:06:25 -05:00
log . Debug ( "failed publish gossip" , "err" , err )
}
}
2023-08-15 21:32:40 -05:00
count , err := g . sentinel . GetPeers ( ctx , & sentinel . EmptyMessage { } )
2022-11-25 16:38:22 +01:00
if err != nil {
2023-05-17 18:06:25 -05:00
l [ "at" ] = "sentinel peer count"
return err
2022-11-25 16:38:22 +01:00
}
2023-04-17 20:06:50 +02:00
2023-05-17 18:06:25 -05:00
log . Debug ( "Received block via gossip" ,
"peers" , count . Amount ,
"slot" , block . Block . Slot ,
)
2023-04-17 20:06:50 +02:00
2023-06-06 03:24:59 +02:00
if err := freezer . PutObjectSSZIntoFreezer ( "signedBeaconBlock" , "caplin_core" , block . Block . Slot , block , g . recorder ) ; err != nil {
2023-06-05 01:52:55 +02:00
return err
}
2023-08-08 18:21:19 -05:00
g . mu . RLock ( )
for _ , v := range g . subs {
select {
case v <- & peers . PeeredObject [ * cltypes . SignedBeaconBlock ] { Data : block , Peer : data . Peer . Pid } :
default :
2022-11-25 16:38:22 +01:00
}
2023-05-17 18:06:25 -05:00
}
2023-08-08 18:21:19 -05:00
g . mu . RUnlock ( )
2023-12-30 09:55:01 -06:00
case gossip . TopicNameVoluntaryExit :
2023-09-29 23:42:07 +02:00
if err := operationsContract [ * cltypes . SignedVoluntaryExit ] ( ctx , g , l , data , int ( version ) , "voluntary exit" , g . forkChoice . OnVoluntaryExit ) ; err != nil {
2023-05-17 18:06:25 -05:00
return err
}
2023-12-30 09:55:01 -06:00
case gossip . TopicNameProposerSlashing :
2023-09-29 23:42:07 +02:00
if err := operationsContract [ * cltypes . ProposerSlashing ] ( ctx , g , l , data , int ( version ) , "proposer slashing" , g . forkChoice . OnProposerSlashing ) ; err != nil {
2023-05-17 18:06:25 -05:00
return err
}
2023-12-30 09:55:01 -06:00
case gossip . TopicNameAttesterSlashing :
2023-09-29 23:42:07 +02:00
if err := operationsContract [ * cltypes . AttesterSlashing ] ( ctx , g , l , data , int ( version ) , "attester slashing" , g . forkChoice . OnAttesterSlashing ) ; err != nil {
2023-05-17 18:06:25 -05:00
return err
}
2023-12-30 09:55:01 -06:00
case gossip . TopicNameBlsToExecutionChange :
2023-10-01 17:16:55 +02:00
if err := operationsContract [ * cltypes . SignedBLSToExecutionChange ] ( ctx , g , l , data , int ( version ) , "bls to execution change" , g . forkChoice . OnBlsToExecutionChange ) ; err != nil {
return err
}
2024-01-08 17:13:25 +01:00
case gossip . TopicNameBeaconAggregateAndProof :
if err := operationsContract [ * cltypes . SignedAggregateAndProof ] ( ctx , g , l , data , int ( version ) , "aggregate and proof" , g . forkChoice . OnAggregateAndProof ) ; err != nil {
return err
}
2023-05-17 18:06:25 -05:00
}
return nil
}
2023-04-17 20:06:50 +02:00
2023-08-15 21:32:40 -05:00
func ( g * GossipManager ) Start ( ctx context . Context ) {
2024-01-07 12:03:17 -06:00
subscription , err := g . sentinel . SubscribeGossip ( ctx , & sentinel . SubscriptionData { } )
2023-05-17 18:06:25 -05:00
if err != nil {
return
}
2023-05-05 11:19:24 +02:00
2023-05-17 18:06:25 -05:00
l := log . Ctx { }
for {
data , err := subscription . Recv ( )
if err != nil {
log . Warn ( "[Beacon Gossip] Fatal error receiving gossip" , "err" , err )
break
}
for k := range l {
delete ( l , k )
}
2023-08-15 21:32:40 -05:00
err = g . onRecv ( ctx , data , l )
2023-05-17 18:06:25 -05:00
if err != nil {
l [ "err" ] = err
log . Debug ( "[Beacon Gossip] Recoverable Error" , l )
2022-11-25 16:38:22 +01:00
}
}
}