Use forkchoice to validate sync messages faster (#12430)

* Use forkchoice to validate sync messages faster

* add metric
This commit is contained in:
Potuz 2023-05-19 11:47:39 -03:00 committed by GitHub
parent aeaa72fdc2
commit b84dd40ba9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 79 additions and 20 deletions

View File

@ -30,8 +30,22 @@ func (s *Store) SaveSyncCommitteeMessage(msg *ethpb.SyncCommitteeMessage) error
return errors.New("not typed []ethpb.SyncCommitteeMessage")
}
messages = append(messages, copied)
savedSyncCommitteeMessageTotal.Inc()
idx := -1
for i, msg := range messages {
if msg.ValidatorIndex == copied.ValidatorIndex {
idx = i
break
}
}
if idx >= 0 {
// Override the existing messages with a new one
messages[idx] = copied
} else {
// Append the new message
messages = append(messages, copied)
savedSyncCommitteeMessageTotal.Inc()
}
return s.messageCache.Push(&queue.Item{
Key: syncCommitteeKey(msg.Slot),
Value: messages,

View File

@ -125,6 +125,14 @@ var (
Help: "Time to verify gossiped blocks",
},
)
// Sync committee verification performance.
syncMessagesForUnkownBlocks = promauto.NewCounter(
prometheus.CounterOpts{
Name: "sync_committee_messages_unnkown_root",
Help: "The number of sync committee messages that are checked against DB to see if there vote is for an unknown root",
},
)
)
func (s *Service) updateMetrics() {

View File

@ -90,7 +90,7 @@ func (s *Service) validateSyncCommitteeMessage(
ctx,
ignoreEmptyCommittee(committeeIndices),
s.rejectIncorrectSyncCommittee(committeeIndices, *msg.Topic),
s.ignoreHasSeenSyncMsg(m, committeeIndices),
s.ignoreHasSeenSyncMsg(ctx, m, committeeIndices),
s.rejectInvalidSyncCommitteeSignature(m),
); result != pubsub.ValidationAccept {
return result, err
@ -123,24 +123,45 @@ func (s *Service) markSyncCommitteeMessagesSeen(committeeIndices []primitives.Co
subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount
for _, idx := range committeeIndices {
subnet := uint64(idx) / subCommitteeSize
s.setSeenSyncMessageIndexSlot(m.Slot, m.ValidatorIndex, subnet)
s.setSeenSyncMessageIndexSlot(m, subnet)
}
}
// Returns true if the node has received sync committee for the validator with index and slot.
func (s *Service) hasSeenSyncMessageIndexSlot(slot primitives.Slot, valIndex primitives.ValidatorIndex, subCommitteeIndex uint64) bool {
func (s *Service) hasSeenSyncMessageIndexSlot(ctx context.Context, m *ethpb.SyncCommitteeMessage, subCommitteeIndex uint64) bool {
s.seenSyncMessageLock.RLock()
defer s.seenSyncMessageLock.RUnlock()
_, seen := s.seenSyncMessageCache.Get(seenSyncCommitteeKey(slot, valIndex, subCommitteeIndex))
return seen
rt, seen := s.seenSyncMessageCache.Get(seenSyncCommitteeKey(m.Slot, m.ValidatorIndex, subCommitteeIndex))
if !seen {
// return early if this is the first message
return false
}
root, ok := rt.([32]byte)
if !ok {
return true // Impossible. Return true to be safe
}
if !s.cfg.chain.InForkchoice(root) && !s.cfg.beaconDB.HasBlock(ctx, root) {
syncMessagesForUnkownBlocks.Inc()
return true
}
msgRoot := [32]byte(m.BlockRoot)
if !s.cfg.chain.InForkchoice(msgRoot) && !s.cfg.beaconDB.HasBlock(ctx, msgRoot) {
syncMessagesForUnkownBlocks.Inc()
return false
}
headRoot := s.cfg.chain.CachedHeadRoot()
if root == headRoot {
return true
}
return msgRoot != headRoot
}
// Set sync committee message validator index and slot as seen.
func (s *Service) setSeenSyncMessageIndexSlot(slot primitives.Slot, valIndex primitives.ValidatorIndex, subCommitteeIndex uint64) {
func (s *Service) setSeenSyncMessageIndexSlot(m *ethpb.SyncCommitteeMessage, subCommitteeIndex uint64) {
s.seenSyncMessageLock.Lock()
defer s.seenSyncMessageLock.Unlock()
key := seenSyncCommitteeKey(slot, valIndex, subCommitteeIndex)
s.seenSyncMessageCache.Add(key, true)
key := seenSyncCommitteeKey(m.Slot, m.ValidatorIndex, subCommitteeIndex)
s.seenSyncMessageCache.Add(key, [32]byte(m.BlockRoot))
}
// The `subnet_id` is valid for the given validator. This implies the validator is part of the broader
@ -184,7 +205,7 @@ func (s *Service) rejectIncorrectSyncCommittee(
// There has been no other valid sync committee signature for the declared `slot`, `validator_index`,
// and `subcommittee_index`. In the event of `validator_index` belongs to multiple subnets, as long
// as one subnet has not been seen, we should let it in.
func (s *Service) ignoreHasSeenSyncMsg(
func (s *Service) ignoreHasSeenSyncMsg(ctx context.Context,
m *ethpb.SyncCommitteeMessage, committeeIndices []primitives.CommitteeIndex,
) validationFn {
return func(ctx context.Context) (pubsub.ValidationResult, error) {
@ -192,7 +213,7 @@ func (s *Service) ignoreHasSeenSyncMsg(
subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount
for _, idx := range committeeIndices {
subnet := uint64(idx) / subCommitteeSize
if !s.hasSeenSyncMessageIndexSlot(m.Slot, m.ValidatorIndex, subnet) {
if !s.hasSeenSyncMessageIndexSlot(ctx, m, subnet) {
isValid = true
break
}

View File

@ -144,8 +144,12 @@ func TestService_ValidateSyncCommitteeMessage(t *testing.T) {
s.cfg.stateGen = stategen.New(beaconDB, doublylinkedtree.New())
s.cfg.beaconDB = beaconDB
s.initCaches()
s.setSeenSyncMessageIndexSlot(1, 1, 0)
m := &ethpb.SyncCommitteeMessage{
Slot: 1,
ValidatorIndex: 1,
BlockRoot: params.BeaconConfig().ZeroHash[:],
}
s.setSeenSyncMessageIndexSlot(m, 0)
return s, topic, startup.NewClock(time.Now(), [32]byte{})
},
args: args{
@ -441,10 +445,15 @@ func TestService_ignoreHasSeenSyncMsg(t *testing.T) {
name: "has seen",
setupSvc: func(s *Service, msg *ethpb.SyncCommitteeMessage, topic string) (*Service, string) {
s.initCaches()
s.setSeenSyncMessageIndexSlot(1, 0, 0)
m := &ethpb.SyncCommitteeMessage{
Slot: 1,
BlockRoot: params.BeaconConfig().ZeroHash[:],
}
s.setSeenSyncMessageIndexSlot(m, 0)
return s, ""
},
msg: &ethpb.SyncCommitteeMessage{ValidatorIndex: 0, Slot: 1},
msg: &ethpb.SyncCommitteeMessage{ValidatorIndex: 0, Slot: 1,
BlockRoot: params.BeaconConfig().ZeroHash[:]},
committee: []primitives.CommitteeIndex{1, 2, 3},
want: pubsub.ValidationIgnore,
},
@ -452,19 +461,26 @@ func TestService_ignoreHasSeenSyncMsg(t *testing.T) {
name: "has not seen",
setupSvc: func(s *Service, msg *ethpb.SyncCommitteeMessage, topic string) (*Service, string) {
s.initCaches()
s.setSeenSyncMessageIndexSlot(1, 0, 0)
m := &ethpb.SyncCommitteeMessage{
Slot: 1,
BlockRoot: params.BeaconConfig().ZeroHash[:],
}
s.setSeenSyncMessageIndexSlot(m, 0)
return s, ""
},
msg: &ethpb.SyncCommitteeMessage{ValidatorIndex: 1, Slot: 1},
msg: &ethpb.SyncCommitteeMessage{ValidatorIndex: 1, Slot: 1,
BlockRoot: bytesutil.PadTo([]byte{'A'}, 32)},
committee: []primitives.CommitteeIndex{1, 2, 3},
want: pubsub.ValidationAccept,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &Service{}
s := &Service{
cfg: &config{chain: &mockChain.ChainService{}},
}
s, _ = tt.setupSvc(s, tt.msg, "")
f := s.ignoreHasSeenSyncMsg(tt.msg, tt.committee)
f := s.ignoreHasSeenSyncMsg(context.Background(), tt.msg, tt.committee)
result, err := f(context.Background())
_ = err
require.Equal(t, tt.want, result)