diff --git a/beacon-chain/rpc/beacon/attestations.go b/beacon-chain/rpc/beacon/attestations.go index a44a5219e..4157db61e 100644 --- a/beacon-chain/rpc/beacon/attestations.go +++ b/beacon-chain/rpc/beacon/attestations.go @@ -25,6 +25,8 @@ import ( "google.golang.org/grpc/status" ) +var log = logrus.WithField("prefix", "rpc") + // sortableAttestations implements the Sort interface to sort attestations // by slot as the canonical sorting attribute. type sortableAttestations []*ethpb.Attestation @@ -238,20 +240,41 @@ func (bs *Server) StreamIndexedAttestations( go bs.collectReceivedAttestations(stream.Context()) for { select { - case event := <-attestationsChannel: + case event, ok := <-attestationsChannel: + if !ok { + log.Error("Indexed attestations stream channel closed") + } if event.Type == operation.UnaggregatedAttReceived { data, ok := event.Data.(*operation.UnAggregatedAttReceivedData) if !ok { // Got bad data over the stream. + log.Warningf("Indexed attestations stream got data of wrong type on stream expected *UnAggregatedAttReceivedData, received %T", event.Data) continue } if data.Attestation == nil { // One nil attestation shouldn't stop the stream. + log.Debug("Indexed attestations stream got a nil attestation") continue } bs.ReceivedAttestationsBuffer <- data.Attestation + } else if event.Type == operation.AggregatedAttReceived { + data, ok := event.Data.(*operation.AggregatedAttReceivedData) + if !ok { + // Got bad data over the stream. + log.Warningf("Indexed attestations stream got data of wrong type on stream expected *AggregatedAttReceivedData, received %T", event.Data) + continue + } + if data.Attestation == nil || data.Attestation.Aggregate == nil { + // One nil attestation shouldn't stop the stream. + log.Info("Indexed attestations stream got nil attestation or nil attestation aggregate") + continue + } + bs.CollectedAttestationsBuffer <- []*ethpb.Attestation{data.Attestation.Aggregate} + } + case atts, ok := <-bs.CollectedAttestationsBuffer: + if !ok { + log.Error("Indexed attestations stream collected attestations channel closed") } - case atts := <-bs.CollectedAttestationsBuffer: // We aggregate the received attestations. aggAtts, err := helpers.AggregateAttestations(atts) if err != nil { @@ -326,7 +349,7 @@ func (bs *Server) collectReceivedAttestations(ctx context.Context) { case att := <-bs.ReceivedAttestationsBuffer: attDataRoot, err := ssz.HashTreeRoot(att.Data) if err != nil { - logrus.Errorf("Could not hash tree root data: %v", err) + log.Errorf("Could not hash tree root data: %v", err) continue } attsByRoot[attDataRoot] = append(attsByRoot[attDataRoot], att) diff --git a/beacon-chain/rpc/beacon/validators_stream.go b/beacon-chain/rpc/beacon/validators_stream.go index 9452864ba..9eefd9568 100644 --- a/beacon-chain/rpc/beacon/validators_stream.go +++ b/beacon-chain/rpc/beacon/validators_stream.go @@ -25,7 +25,6 @@ import ( "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/event" "github.com/prysmaticlabs/prysm/shared/params" - log "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) diff --git a/beacon-chain/sync/subscriber_beacon_aggregate_proof.go b/beacon-chain/sync/subscriber_beacon_aggregate_proof.go index 7d2d81f1c..5ed77b0b1 100644 --- a/beacon-chain/sync/subscriber_beacon_aggregate_proof.go +++ b/beacon-chain/sync/subscriber_beacon_aggregate_proof.go @@ -7,6 +7,8 @@ import ( "github.com/gogo/protobuf/proto" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" + "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation" ) // beaconAggregateProofSubscriber forwards the incoming validated aggregated attestation and proof to the @@ -22,5 +24,14 @@ func (r *Service) beaconAggregateProofSubscriber(ctx context.Context, msg proto. } r.setAggregatorIndexEpochSeen(a.Message.Aggregate.Data.Target.Epoch, a.Message.AggregatorIndex) + // Broadcast the aggregated attestation on a feed to notify other services in the beacon node + // of a received aggregated attestation. + r.attestationNotifier.OperationFeed().Send(&feed.Event{ + Type: operation.AggregatedAttReceived, + Data: &operation.AggregatedAttReceivedData{ + Attestation: a.Message, + }, + }) + return r.attPool.SaveAggregatedAttestation(a.Message.Aggregate) } diff --git a/beacon-chain/sync/subscriber_beacon_aggregate_proof_test.go b/beacon-chain/sync/subscriber_beacon_aggregate_proof_test.go index 08b21547a..7a0e2b3f8 100644 --- a/beacon-chain/sync/subscriber_beacon_aggregate_proof_test.go +++ b/beacon-chain/sync/subscriber_beacon_aggregate_proof_test.go @@ -8,6 +8,7 @@ import ( lru "github.com/hashicorp/golang-lru" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-bitfield" + mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations" ) @@ -19,6 +20,7 @@ func TestBeaconAggregateProofSubscriber_CanSave(t *testing.T) { r := &Service{ attPool: attestations.NewPool(), seenAttestationCache: c, + attestationNotifier: (&mock.ChainService{}).OperationNotifier(), } a := ðpb.SignedAggregateAttestationAndProof{Message: ðpb.AggregateAttestationAndProof{Aggregate: ðpb.Attestation{Data: ðpb.AttestationData{Target: ðpb.Checkpoint{}}, AggregationBits: bitfield.Bitlist{0x07}}, AggregatorIndex: 100}}