mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-11 12:10:05 +00:00
Stream p2p agg attestation (#5809)
* stream aggreagted attestations from p2p network to indexed attestation stream * remove excessive log * fix test * handle nil attestation as well * Update beacon-chain/sync/subscriber_beacon_aggregate_proof.go * Update beacon-chain/sync/subscriber_beacon_aggregate_proof.go * Update beacon-chain/sync/subscriber_beacon_aggregate_proof_test.go * terence feedback * sort imports * sort imports * Change to received buffer * preston feedback * error log * raul feedback * more logging changes * fix duplicate package name Co-authored-by: terence tsao <terence@prysmaticlabs.com> Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> Co-authored-by: Ivan Martinez <ivanthegreatdev@gmail.com>
This commit is contained in:
parent
08762af5a2
commit
ffa08f5a85
@ -25,6 +25,8 @@ import (
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
var log = logrus.WithField("prefix", "rpc")
|
||||
|
||||
// sortableAttestations implements the Sort interface to sort attestations
|
||||
// by slot as the canonical sorting attribute.
|
||||
type sortableAttestations []*ethpb.Attestation
|
||||
@ -238,20 +240,41 @@ func (bs *Server) StreamIndexedAttestations(
|
||||
go bs.collectReceivedAttestations(stream.Context())
|
||||
for {
|
||||
select {
|
||||
case event := <-attestationsChannel:
|
||||
case event, ok := <-attestationsChannel:
|
||||
if !ok {
|
||||
log.Error("Indexed attestations stream channel closed")
|
||||
}
|
||||
if event.Type == operation.UnaggregatedAttReceived {
|
||||
data, ok := event.Data.(*operation.UnAggregatedAttReceivedData)
|
||||
if !ok {
|
||||
// Got bad data over the stream.
|
||||
log.Warningf("Indexed attestations stream got data of wrong type on stream expected *UnAggregatedAttReceivedData, received %T", event.Data)
|
||||
continue
|
||||
}
|
||||
if data.Attestation == nil {
|
||||
// One nil attestation shouldn't stop the stream.
|
||||
log.Debug("Indexed attestations stream got a nil attestation")
|
||||
continue
|
||||
}
|
||||
bs.ReceivedAttestationsBuffer <- data.Attestation
|
||||
} else if event.Type == operation.AggregatedAttReceived {
|
||||
data, ok := event.Data.(*operation.AggregatedAttReceivedData)
|
||||
if !ok {
|
||||
// Got bad data over the stream.
|
||||
log.Warningf("Indexed attestations stream got data of wrong type on stream expected *AggregatedAttReceivedData, received %T", event.Data)
|
||||
continue
|
||||
}
|
||||
if data.Attestation == nil || data.Attestation.Aggregate == nil {
|
||||
// One nil attestation shouldn't stop the stream.
|
||||
log.Info("Indexed attestations stream got nil attestation or nil attestation aggregate")
|
||||
continue
|
||||
}
|
||||
bs.CollectedAttestationsBuffer <- []*ethpb.Attestation{data.Attestation.Aggregate}
|
||||
}
|
||||
case atts, ok := <-bs.CollectedAttestationsBuffer:
|
||||
if !ok {
|
||||
log.Error("Indexed attestations stream collected attestations channel closed")
|
||||
}
|
||||
case atts := <-bs.CollectedAttestationsBuffer:
|
||||
// We aggregate the received attestations.
|
||||
aggAtts, err := helpers.AggregateAttestations(atts)
|
||||
if err != nil {
|
||||
@ -326,7 +349,7 @@ func (bs *Server) collectReceivedAttestations(ctx context.Context) {
|
||||
case att := <-bs.ReceivedAttestationsBuffer:
|
||||
attDataRoot, err := ssz.HashTreeRoot(att.Data)
|
||||
if err != nil {
|
||||
logrus.Errorf("Could not hash tree root data: %v", err)
|
||||
log.Errorf("Could not hash tree root data: %v", err)
|
||||
continue
|
||||
}
|
||||
attsByRoot[attDataRoot] = append(attsByRoot[attDataRoot], att)
|
||||
|
@ -25,7 +25,6 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/event"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
@ -7,6 +7,8 @@ import (
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
|
||||
)
|
||||
|
||||
// beaconAggregateProofSubscriber forwards the incoming validated aggregated attestation and proof to the
|
||||
@ -22,5 +24,14 @@ func (r *Service) beaconAggregateProofSubscriber(ctx context.Context, msg proto.
|
||||
}
|
||||
r.setAggregatorIndexEpochSeen(a.Message.Aggregate.Data.Target.Epoch, a.Message.AggregatorIndex)
|
||||
|
||||
// Broadcast the aggregated attestation on a feed to notify other services in the beacon node
|
||||
// of a received aggregated attestation.
|
||||
r.attestationNotifier.OperationFeed().Send(&feed.Event{
|
||||
Type: operation.AggregatedAttReceived,
|
||||
Data: &operation.AggregatedAttReceivedData{
|
||||
Attestation: a.Message,
|
||||
},
|
||||
})
|
||||
|
||||
return r.attPool.SaveAggregatedAttestation(a.Message.Aggregate)
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations"
|
||||
)
|
||||
|
||||
@ -19,6 +20,7 @@ func TestBeaconAggregateProofSubscriber_CanSave(t *testing.T) {
|
||||
r := &Service{
|
||||
attPool: attestations.NewPool(),
|
||||
seenAttestationCache: c,
|
||||
attestationNotifier: (&mock.ChainService{}).OperationNotifier(),
|
||||
}
|
||||
|
||||
a := ðpb.SignedAggregateAttestationAndProof{Message: ðpb.AggregateAttestationAndProof{Aggregate: ðpb.Attestation{Data: ðpb.AttestationData{Target: ðpb.Checkpoint{}}, AggregationBits: bitfield.Bitlist{0x07}}, AggregatorIndex: 100}}
|
||||
|
Loading…
Reference in New Issue
Block a user