prysm-pulse/slasher/service/data_update.go
Ivan Martinez 923d5fc903
Cleanup slasher codebase (#4698)
* First wave of changes
* More changes
* More renames, changes
* Fix errors
* Merge branch 'master' of https://github.com/prysmaticlabs/Prysm into cleanup-slasher
* Fix errors, more cleaning
* Merge branch 'master' of https://github.com/prysmaticlabs/Prysm into cleanup-slasher
* Merge branch 'master' into cleanup-slasher
* fix err
* Merge branch 'cleanup-slasher' of https://github.com/0xKiwi/Prysm into cleanup-slasher
* Fix strings
* More cleanup
* Merge branch 'master' of https://github.com/prysmaticlabs/Prysm into cleanup-slasher
* Fix interface
* Fix
* Merge branch 'master' into cleanup-slasher
* Merge branch 'master' into cleanup-slasher
* Merge branch 'master' into cleanup-slasher
* Address comments
* Merge branch 'master' of https://github.com/prysmaticlabs/Prysm into cleanup-slasher
* Merge branch 'cleanup-slasher' of https://github.com/0xKiwi/Prysm into cleanup-slasher
2020-02-04 18:48:51 +00:00

209 lines
6.3 KiB
Go

package service
import (
"fmt"
"time"
ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/attestationutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/slasher/db"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// finalizedChangeUpdater this is a stub for the coming PRs #3133
// Store validator index to public key map Validate attestation signature.
func (s *Service) finalizedChangeUpdater() error {
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
d := time.Duration(secondsPerSlot) * time.Second
tick := time.Tick(d)
var finalizedEpoch uint64
for {
select {
case <-tick:
ch, err := s.beaconClient.GetChainHead(s.context, &ptypes.Empty{})
if err != nil {
log.Error(err)
continue
}
if ch != nil {
if ch.FinalizedEpoch > finalizedEpoch {
log.Infof("finalized epoch %d", ch.FinalizedEpoch)
}
continue
}
log.Error("no chain head was returned by beacon chain.")
case <-s.context.Done():
err := status.Error(codes.Canceled, "Stream context canceled")
log.WithError(err)
return err
}
}
}
// attestationFeeder feeds attestations that were received by archive endpoint.
func (s *Service) attestationFeeder() error {
as, err := s.beaconClient.StreamAttestations(s.context, &ptypes.Empty{})
if err != nil {
log.WithError(err).Errorf("failed to retrieve attestation stream")
return err
}
for {
select {
default:
if as == nil {
err := fmt.Errorf("attestation stream is nil. please check your archiver node status")
log.WithError(err)
return err
}
at, err := as.Recv()
if err != nil {
log.WithError(err)
return err
}
committeeReq := &ethpb.ListCommitteesRequest{
QueryFilter: &ethpb.ListCommitteesRequest_Epoch{
Epoch: at.Data.Target.Epoch,
},
}
bCommittees, err := s.beaconClient.ListBeaconCommittees(s.context, committeeReq)
if err != nil {
log.WithError(err).Errorf("Could not list beacon committees for epoch %d", at.Data.Target.Epoch)
return err
}
err = s.detectAttestation(at, bCommittees)
if err != nil {
continue
}
log.Info("detected attestation for target: %d", at.Data.Target)
case <-s.context.Done():
err := status.Error(codes.Canceled, "Stream context canceled")
log.WithError(err)
return err
}
}
}
// slasherOldAttestationFeeder a function to kick start slashing detection
// after all the included attestations in the canonical chain have been
// slashing detected. latest epoch is being updated after each iteration
// in case it changed during the detection process.
func (s *Service) slasherOldAttestationFeeder() error {
ch, err := s.getChainHead()
if err != nil {
return err
}
errOut := make(chan error)
startFromEpoch, err := s.getLatestDetectedEpoch()
if err != nil {
return err
}
for epoch := startFromEpoch; epoch < ch.FinalizedEpoch; epoch++ {
ats, bcs, err := s.getDataForDetection(epoch)
if err != nil || bcs == nil {
log.Error(err)
continue
}
log.Infof("Detecting slashable events on: %v attestations from epoch: %v", len(ats.Attestations), epoch)
for _, attestation := range ats.Attestations {
if err := s.detectAttestation(attestation, bcs); err != nil {
continue
}
}
if err := s.slasherDb.SetLatestEpochDetected(epoch); err != nil {
log.Error(err)
continue
}
}
close(errOut)
for err := range errOut {
log.Error(errors.Wrap(err, "error while writing to db in background"))
}
return nil
}
func (s *Service) detectAttestation(attestation *ethpb.Attestation, beaconCommittee *ethpb.BeaconCommittees) error {
slotCommittees, ok := beaconCommittee.Committees[attestation.Data.Slot]
if !ok || slotCommittees == nil {
err := fmt.Errorf("beacon committees object doesnt contain the attestation slot: %d, number of committees: %d",
attestation.Data.Slot, len(beaconCommittee.Committees))
log.WithError(err)
return err
}
if attestation.Data.CommitteeIndex > uint64(len(slotCommittees.Committees)) {
err := fmt.Errorf("committee index is out of range in slot wanted: %d, actual: %d", attestation.Data.CommitteeIndex, len(slotCommittees.Committees))
log.WithError(err)
return err
}
attesterCommittee := slotCommittees.Committees[attestation.Data.CommitteeIndex]
validatorIndices := attesterCommittee.ValidatorIndices
ia, err := attestationutil.ConvertToIndexed(s.context, attestation, validatorIndices)
if err != nil {
log.WithError(err)
return err
}
sar, err := s.slasher.IsSlashableAttestation(s.context, ia)
if err != nil {
log.WithError(err)
return err
}
if err := s.slasherDb.SaveAttesterSlashings(db.Active, sar.AttesterSlashing); err != nil {
log.WithError(err)
return err
}
if len(sar.AttesterSlashing) > 0 {
for _, as := range sar.AttesterSlashing {
log.WithField("attesterSlashing", as).Info("detected slashing offence")
}
}
return nil
}
func (s *Service) getDataForDetection(epoch uint64) (*ethpb.ListAttestationsResponse, *ethpb.BeaconCommittees, error) {
ats, err := s.beaconClient.ListAttestations(s.context, &ethpb.ListAttestationsRequest{
QueryFilter: &ethpb.ListAttestationsRequest_TargetEpoch{TargetEpoch: epoch},
})
if err != nil {
log.WithError(err).Errorf("Could not list attestations for epoch: %d", epoch)
}
bcs, err := s.beaconClient.ListBeaconCommittees(s.context, &ethpb.ListCommitteesRequest{
QueryFilter: &ethpb.ListCommitteesRequest_Epoch{
Epoch: epoch,
},
})
if err != nil {
log.WithError(err).Errorf("Could not list beacon committees for epoch: %d", epoch)
}
return ats, bcs, err
}
func (s *Service) getLatestDetectedEpoch() (uint64, error) {
e, err := s.slasherDb.GetLatestEpochDetected()
if err != nil {
log.Error(err)
return 0, err
}
return e, nil
}
func (s *Service) getChainHead() (*ethpb.ChainHead, error) {
if s.beaconClient == nil {
return nil, errors.New("cannot feed old attestations to slasher, beacon client has not been started")
}
ch, err := s.beaconClient.GetChainHead(s.context, &ptypes.Empty{})
if err != nil {
log.Error(err)
return nil, err
}
if ch.FinalizedEpoch < 2 {
log.Info("archive node does not have historic data for slasher to process")
}
log.WithField("finalizedEpoch", ch.FinalizedEpoch).Info("current finalized epoch on archive node")
return ch, nil
}