Caplin: Fixed crash in OnAttestation (#9159)

* Added handling of AggregateAndProof
This commit is contained in:
Giulio rebuffo 2024-01-08 17:13:25 +01:00 committed by GitHub
parent 2521f47e7b
commit b2fa618f74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 152 additions and 30 deletions

View File

@ -32,14 +32,16 @@ type ApiHandler struct {
stateReader *historical_states_reader.HistoricalStatesReader
sentinel sentinel.SentinelClient
version string // Node's version
// pools
randaoMixesPool sync.Pool
}
func NewApiHandler(genesisConfig *clparams.GenesisConfig, beaconChainConfig *clparams.BeaconChainConfig, source persistence.RawBeaconBlockChain, indiciesDB kv.RoDB, forkchoiceStore forkchoice.ForkChoiceStorage, operationsPool pool.OperationsPool, rcsn freezeblocks.BeaconSnapshotReader, syncedData *synced_data.SyncedDataManager, stateReader *historical_states_reader.HistoricalStatesReader, sentinel sentinel.SentinelClient) *ApiHandler {
func NewApiHandler(genesisConfig *clparams.GenesisConfig, beaconChainConfig *clparams.BeaconChainConfig, source persistence.RawBeaconBlockChain, indiciesDB kv.RoDB, forkchoiceStore forkchoice.ForkChoiceStorage, operationsPool pool.OperationsPool, rcsn freezeblocks.BeaconSnapshotReader, syncedData *synced_data.SyncedDataManager, stateReader *historical_states_reader.HistoricalStatesReader, sentinel sentinel.SentinelClient, version string) *ApiHandler {
return &ApiHandler{o: sync.Once{}, genesisCfg: genesisConfig, beaconChainCfg: beaconChainConfig, indiciesDB: indiciesDB, forkchoiceStore: forkchoiceStore, operationsPool: operationsPool, blockReader: rcsn, syncedData: syncedData, stateReader: stateReader, randaoMixesPool: sync.Pool{New: func() interface{} {
return solid.NewHashVector(int(beaconChainConfig.EpochsPerHistoricalVector))
}}, sentinel: sentinel}
}}, sentinel: sentinel, version: version}
}
func (a *ApiHandler) init() {
@ -53,6 +55,7 @@ func (a *ApiHandler) init() {
r.Get("/events", http.NotFound)
r.Route("/node", func(r chi.Router) {
r.Get("/health", a.GetEthV1NodeHealth)
r.Get("/version", a.GetEthV1NodeVersion)
})
r.Get("/debug/fork_choice", a.GetEthV1DebugBeaconForkChoice)
r.Route("/config", func(r chi.Router) {

View File

@ -1,6 +1,11 @@
package handler
import "net/http"
import (
"encoding/json"
"fmt"
"net/http"
"runtime"
)
func (a *ApiHandler) GetEthV1NodeHealth(w http.ResponseWriter, r *http.Request) {
syncingStatus, err := uint64FromQueryParams(r, "syncing_status")
@ -18,3 +23,14 @@ func (a *ApiHandler) GetEthV1NodeHealth(w http.ResponseWriter, r *http.Request)
}
w.WriteHeader(http.StatusOK)
}
func (a *ApiHandler) GetEthV1NodeVersion(w http.ResponseWriter, r *http.Request) {
// Get OS and Arch
if err := json.NewEncoder(w).Encode(map[string]interface{}{
"data": map[string]interface{}{
"version": fmt.Sprintf("Caplin/%s %s/%s", a.version, runtime.GOOS, runtime.GOARCH),
},
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}

View File

@ -1,15 +1,17 @@
package handler
import (
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/stretchr/testify/require"
)
func TestNodeSyncing(t *testing.T) {
func TestNodeHealthSyncing(t *testing.T) {
// i just want the correct schema to be generated
_, _, _, _, _, handler, _, _, _ := setupTestingHandler(t, clparams.Phase0Version)
@ -26,7 +28,7 @@ func TestNodeSyncing(t *testing.T) {
require.Equal(t, 666, resp.StatusCode)
}
func TestNodeSyncingTip(t *testing.T) {
func TestNodeHealthSyncingTip(t *testing.T) {
// i just want the correct schema to be generated
_, _, _, _, post, handler, _, sm, _ := setupTestingHandler(t, clparams.Phase0Version)
@ -47,3 +49,23 @@ func TestNodeSyncingTip(t *testing.T) {
defer resp.Body.Close()
require.Equal(t, 200, resp.StatusCode)
}
func TestNodeVersion(t *testing.T) {
// i just want the correct schema to be generated
_, _, _, _, _, handler, _, _, _ := setupTestingHandler(t, clparams.Phase0Version)
// Call GET /eth/v1/node/health
server := httptest.NewServer(handler.mux)
defer server.Close()
req, err := http.NewRequest("GET", server.URL+"/eth/v1/node/version", nil)
require.NoError(t, err)
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
out := map[string]interface{}{}
require.NoError(t, json.NewDecoder(resp.Body).Decode(&out))
v := out["data"].(map[string]interface{})["version"].(string)
require.True(t, strings.Contains(v, "Caplin"))
}

View File

@ -62,7 +62,8 @@ func setupTestingHandler(t *testing.T, v clparams.StateVersion) (db kv.RwDB, blo
reader,
syncedData,
statesReader,
nil)
nil,
"test-version")
handler.init()
return
}

View File

@ -1,6 +1,7 @@
package state
import (
"encoding/binary"
"fmt"
"github.com/Giulio2002/bls"
@ -28,6 +29,12 @@ func Epoch(b abstract.BeaconStateBasic) uint64 {
return GetEpochAtSlot(b.BeaconConfig(), b.Slot())
}
func IsAggregator(cfg *clparams.BeaconChainConfig, committeeLength, slot, committeeIndex uint64, slotSignature libcommon.Bytes96) bool {
modulo := utils.Max64(1, committeeLength/cfg.TargetAggregatorsPerCommittee)
hashSlotSignatue := utils.Sha256(slotSignature[:])
return binary.LittleEndian.Uint64(hashSlotSignatue[:8])%modulo == 0
}
// GetTotalBalance return the sum of all balances within the given validator set.
func GetTotalBalance(b abstract.BeaconStateBasic, validatorSet []uint64) (uint64, error) {
var (

View File

@ -334,11 +334,16 @@ func readUint64WithBuffer(r io.Reader, buf []byte, out *uint64) error {
// internal encoding/decoding algos
func (b *CachingBeaconState) encodeActiveValidatorsCache(w io.Writer, num []byte) error {
keys := b.activeValidatorsCache.Keys()
lists := make([][]uint64, len(keys))
for i, key := range keys {
lists[i], _ = b.activeValidatorsCache.Get(key)
keysA := b.activeValidatorsCache.Keys()
keys := make([]uint64, 0, len(keysA))
lists := make([][]uint64, 0, len(keys))
for _, key := range keysA {
l, ok := b.activeValidatorsCache.Get(key)
if !ok || len(l) == 0 {
continue
}
keys = append(keys, key)
lists = append(lists, l)
}
// Write the total length
if err := writeUint64WithBuffer(w, uint64(len(keys)), num); err != nil {
@ -396,11 +401,17 @@ func (b *CachingBeaconState) decodeActiveValidatorsCache(r io.Reader, num []byte
// internal encoding/decoding algos
func (b *CachingBeaconState) encodeShuffledSetsCache(w io.Writer, num []byte) error {
keys := b.shuffledSetsCache.Keys()
lists := make([][]uint64, len(keys))
keysA := b.shuffledSetsCache.Keys()
keys := make([]common.Hash, 0, len(keysA))
lists := make([][]uint64, 0, len(keys))
for i, key := range keys {
lists[i], _ = b.shuffledSetsCache.Get(key)
for _, key := range keysA {
l, ok := b.shuffledSetsCache.Get(key)
if !ok || len(l) == 0 {
continue
}
keys = append(keys, key)
lists = append(lists, l)
}
// Write the total length
if err := writeUint64WithBuffer(w, uint64(len(keys)), num); err != nil {

View File

@ -92,7 +92,7 @@ func TestForkChoiceBasic(t *testing.T) {
require.Equal(t, headSlot, uint64(3))
require.Equal(t, headRoot, libcommon.HexToHash("0x744cc484f6503462f0f3a5981d956bf4fcb3e57ab8687ed006467e05049ee033"))
// lastly do attestation
require.NoError(t, store.OnAttestation(testAttestation, false))
require.NoError(t, store.OnAttestation(testAttestation, false, false))
// Try processing a voluntary exit
err = store.OnVoluntaryExit(&cltypes.SignedVoluntaryExit{
VoluntaryExit: &cltypes.VoluntaryExit{

View File

@ -160,7 +160,7 @@ func (f *ForkChoiceStorageMock) Time() uint64 {
return f.TimeVal
}
func (f *ForkChoiceStorageMock) OnAttestation(attestation *solid.Attestation, fromBlock bool) error {
func (f *ForkChoiceStorageMock) OnAttestation(attestation *solid.Attestation, fromBlock, insert bool) error {
f.Pool.AttestationsPool.Insert(attestation.Signature(), attestation)
return nil
}

View File

@ -44,7 +44,7 @@ type ForkChoiceStorageReader interface {
}
type ForkChoiceStorageWriter interface {
OnAttestation(attestation *solid.Attestation, fromBlock bool) error
OnAttestation(attestation *solid.Attestation, fromBlock, insert bool) error
OnAttesterSlashing(attesterSlashing *cltypes.AttesterSlashing, test bool) error
OnVoluntaryExit(signedVoluntaryExit *cltypes.SignedVoluntaryExit, test bool) error
OnProposerSlashing(proposerSlashing *cltypes.ProposerSlashing, test bool) error

View File

@ -4,6 +4,7 @@ import (
"fmt"
"time"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/cltypes/solid"
"github.com/ledgerwatch/erigon/cl/phase1/cache"
"github.com/ledgerwatch/erigon/cl/phase1/core/state"
@ -13,7 +14,7 @@ import (
)
// OnAttestation processes incoming attestations.
func (f *ForkChoiceStore) OnAttestation(attestation *solid.Attestation, fromBlock bool) error {
func (f *ForkChoiceStore) OnAttestation(attestation *solid.Attestation, fromBlock bool, insert bool) error {
f.mu.Lock()
defer f.mu.Unlock()
f.headHash = libcommon.Hash{}
@ -61,11 +62,41 @@ func (f *ForkChoiceStore) OnAttestation(attestation *solid.Attestation, fromBloc
cache.StoreAttestation(&data, attestation.AggregationBits(), attestationIndicies)
// Lastly update latest messages.
f.processAttestingIndicies(attestation, attestationIndicies)
if !fromBlock && insert {
// Add to the pool when verified.
f.operationsPool.AttestationsPool.Insert(attestation.Signature(), attestation)
}
return nil
}
func (f *ForkChoiceStore) OnAggregateAndProof(aggregateAndProof *cltypes.SignedAggregateAndProof, test bool) error {
slot := aggregateAndProof.Message.Aggregate.AttestantionData().Slot()
selectionProof := aggregateAndProof.Message.SelectionProof
committeeIndex := aggregateAndProof.Message.Aggregate.AttestantionData().ValidatorIndex()
epoch := state.GetEpochAtSlot(f.beaconCfg, slot)
target := aggregateAndProof.Message.Aggregate.AttestantionData().Target()
targetState, err := f.getCheckpointState(target)
if err != nil {
return nil
}
activeIndicies := targetState.getActiveIndicies(epoch)
activeIndiciesLength := uint64(len(activeIndicies))
count := targetState.committeeCount(epoch, activeIndiciesLength) * f.beaconCfg.SlotsPerEpoch
start := (activeIndiciesLength * committeeIndex) / count
end := (activeIndiciesLength * (committeeIndex + 1)) / count
committeeLength := end - start
if !state.IsAggregator(f.beaconCfg, committeeLength, slot, committeeIndex, selectionProof) {
log.Warn("invalid aggregate and proof")
return fmt.Errorf("invalid aggregate and proof")
}
return f.OnAttestation(aggregateAndProof.Message.Aggregate, false, false)
}
// scheduleAttestationForLaterProcessing scheudules an attestation for later processing
func (f *ForkChoiceStore) scheduleAttestationForLaterProcessing(attestation *solid.Attestation, fromBlock bool) {
func (f *ForkChoiceStore) scheduleAttestationForLaterProcessing(attestation *solid.Attestation, insert bool) {
go func() {
logInterval := time.NewTicker(50 * time.Millisecond)
for {
@ -76,8 +107,8 @@ func (f *ForkChoiceStore) scheduleAttestationForLaterProcessing(attestation *sol
if f.Slot() < attestation.AttestantionData().Slot()+1 {
continue
}
if err := f.OnAttestation(attestation, false); err != nil {
log.Trace("could not process scheduled attestation", "reason", err)
if err := f.OnAttestation(attestation, false, insert); err != nil {
log.Debug("could not process scheduled attestation", "reason", err)
}
return
}

View File

@ -160,6 +160,10 @@ func (g *GossipManager) onRecv(ctx context.Context, data *sentinel.GossipData, l
if err := operationsContract[*cltypes.SignedBLSToExecutionChange](ctx, g, l, data, int(version), "bls to execution change", g.forkChoice.OnBlsToExecutionChange); err != nil {
return err
}
case gossip.TopicNameBeaconAggregateAndProof:
if err := operationsContract[*cltypes.SignedAggregateAndProof](ctx, g, l, data, int(version), "aggregate and proof", g.forkChoice.OnAggregateAndProof); err != nil {
return err
}
}
return nil
}

View File

@ -405,7 +405,7 @@ func ConsensusClStages(ctx context.Context,
continue MainLoop
}
block.Block.Body.Attestations.Range(func(idx int, a *solid.Attestation, total int) bool {
if err = cfg.forkChoice.OnAttestation(a, true); err != nil {
if err = cfg.forkChoice.OnAttestation(a, true, false); err != nil {
log.Debug("bad attestation received", "err", err)
}
return true

View File

@ -1,13 +1,19 @@
package pool
import (
"time"
"github.com/ledgerwatch/erigon/cl/phase1/core/state/lru"
)
const lifeSpan = 30 * time.Minute
var operationsMultiplier = 20 // Cap the amount of cached element to max_operations_per_block * operations_multiplier
type OperationPool[K comparable, T any] struct {
pool *lru.Cache[K, T] // Map the Signature to the underlying object
pool *lru.Cache[K, T] // Map the Signature to the underlying object
recentlySeen map[K]time.Time
lastPruned time.Time
}
func NewOperationPool[K comparable, T any](maxOperationsPerBlock int, matricName string) *OperationPool[K, T] {
@ -15,11 +21,30 @@ func NewOperationPool[K comparable, T any](maxOperationsPerBlock int, matricName
if err != nil {
panic(err)
}
return &OperationPool[K, T]{pool: pool}
return &OperationPool[K, T]{
pool: pool,
recentlySeen: make(map[K]time.Time),
}
}
func (o *OperationPool[K, T]) Insert(k K, operation T) {
if _, ok := o.recentlySeen[k]; ok {
return
}
o.pool.Add(k, operation)
o.recentlySeen[k] = time.Now()
if time.Since(o.lastPruned) > lifeSpan {
deleteList := make([]K, 0, len(o.recentlySeen))
for k, t := range o.recentlySeen {
if time.Since(t) > lifeSpan {
deleteList = append(deleteList, k)
}
}
for _, k := range deleteList {
delete(o.recentlySeen, k)
}
o.lastPruned = time.Now()
}
}
func (o *OperationPool[K, T]) DeleteIfExist(k K) (removed bool) {

View File

@ -31,7 +31,7 @@ func createSentinel(cfg *sentinel.SentinelConfig, db persistence.RawBeaconBlockC
}
gossipTopics := []sentinel.GossipTopic{
sentinel.BeaconBlockSsz,
//sentinel.BeaconAggregateAndProofSsz,
sentinel.BeaconAggregateAndProofSsz,
sentinel.VoluntaryExitSsz,
sentinel.ProposerSlashingSsz,
sentinel.AttesterSlashingSsz,

View File

@ -3,10 +3,11 @@ package consensus_tests
import (
"context"
"fmt"
"github.com/ledgerwatch/erigon/spectest"
"io/fs"
"testing"
"github.com/ledgerwatch/erigon/spectest"
"github.com/ledgerwatch/erigon/cl/abstract"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes/solid"
@ -194,7 +195,7 @@ func (b *ForkChoice) Run(t *testing.T, root fs.FS, c spectest.TestCase) (err err
att := &solid.Attestation{}
err := spectest.ReadSsz(root, c.Version(), step.GetAttestation()+".ssz_snappy", att)
require.NoError(t, err, stepstr)
err = forkStore.OnAttestation(att, false)
err = forkStore.OnAttestation(att, false, false)
if step.GetValid() {
require.NoError(t, err, stepstr)
} else {

View File

@ -88,7 +88,7 @@ func TestRegressionWithValidation(store *forkchoice.ForkChoiceStore, block *clty
return err
}
block.Block.Body.Attestations.Range(func(index int, value *solid2.Attestation, length int) bool {
store.OnAttestation(value, true)
store.OnAttestation(value, true, true)
return true
})
return nil

View File

@ -18,6 +18,7 @@ import (
"github.com/ledgerwatch/erigon/cl/freezer"
freezer2 "github.com/ledgerwatch/erigon/cl/freezer"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks"
"github.com/ledgerwatch/erigon/cl/persistence"
@ -212,7 +213,7 @@ func RunCaplinPhase1(ctx context.Context, sentinel sentinel.SentinelClient, engi
statesReader := historical_states_reader.NewHistoricalStatesReader(beaconConfig, rcsn, vTables, af, genesisState)
syncedDataManager := synced_data.NewSyncedDataManager(cfg.Active, beaconConfig)
if cfg.Active {
apiHandler := handler.NewApiHandler(genesisConfig, beaconConfig, rawDB, indexDB, forkChoice, pool, rcsn, syncedDataManager, statesReader, sentinel)
apiHandler := handler.NewApiHandler(genesisConfig, beaconConfig, rawDB, indexDB, forkChoice, pool, rcsn, syncedDataManager, statesReader, sentinel, params.GitTag)
headApiHandler := &validatorapi.ValidatorApiHandler{
FC: forkChoice,
BeaconChainCfg: beaconConfig,