diff --git a/cl/beacon/handler/handler.go b/cl/beacon/handler/handler.go index 9ead9cb0b..47baf6175 100644 --- a/cl/beacon/handler/handler.go +++ b/cl/beacon/handler/handler.go @@ -32,14 +32,16 @@ type ApiHandler struct { stateReader *historical_states_reader.HistoricalStatesReader sentinel sentinel.SentinelClient + version string // Node's version + // pools randaoMixesPool sync.Pool } -func NewApiHandler(genesisConfig *clparams.GenesisConfig, beaconChainConfig *clparams.BeaconChainConfig, source persistence.RawBeaconBlockChain, indiciesDB kv.RoDB, forkchoiceStore forkchoice.ForkChoiceStorage, operationsPool pool.OperationsPool, rcsn freezeblocks.BeaconSnapshotReader, syncedData *synced_data.SyncedDataManager, stateReader *historical_states_reader.HistoricalStatesReader, sentinel sentinel.SentinelClient) *ApiHandler { +func NewApiHandler(genesisConfig *clparams.GenesisConfig, beaconChainConfig *clparams.BeaconChainConfig, source persistence.RawBeaconBlockChain, indiciesDB kv.RoDB, forkchoiceStore forkchoice.ForkChoiceStorage, operationsPool pool.OperationsPool, rcsn freezeblocks.BeaconSnapshotReader, syncedData *synced_data.SyncedDataManager, stateReader *historical_states_reader.HistoricalStatesReader, sentinel sentinel.SentinelClient, version string) *ApiHandler { return &ApiHandler{o: sync.Once{}, genesisCfg: genesisConfig, beaconChainCfg: beaconChainConfig, indiciesDB: indiciesDB, forkchoiceStore: forkchoiceStore, operationsPool: operationsPool, blockReader: rcsn, syncedData: syncedData, stateReader: stateReader, randaoMixesPool: sync.Pool{New: func() interface{} { return solid.NewHashVector(int(beaconChainConfig.EpochsPerHistoricalVector)) - }}, sentinel: sentinel} + }}, sentinel: sentinel, version: version} } func (a *ApiHandler) init() { @@ -53,6 +55,7 @@ func (a *ApiHandler) init() { r.Get("/events", http.NotFound) r.Route("/node", func(r chi.Router) { r.Get("/health", a.GetEthV1NodeHealth) + r.Get("/version", a.GetEthV1NodeVersion) }) r.Get("/debug/fork_choice", a.GetEthV1DebugBeaconForkChoice) r.Route("/config", func(r chi.Router) { diff --git a/cl/beacon/handler/node.go b/cl/beacon/handler/node.go index 26f4fc46f..72687b2e3 100644 --- a/cl/beacon/handler/node.go +++ b/cl/beacon/handler/node.go @@ -1,6 +1,11 @@ package handler -import "net/http" +import ( + "encoding/json" + "fmt" + "net/http" + "runtime" +) func (a *ApiHandler) GetEthV1NodeHealth(w http.ResponseWriter, r *http.Request) { syncingStatus, err := uint64FromQueryParams(r, "syncing_status") @@ -18,3 +23,14 @@ func (a *ApiHandler) GetEthV1NodeHealth(w http.ResponseWriter, r *http.Request) } w.WriteHeader(http.StatusOK) } + +func (a *ApiHandler) GetEthV1NodeVersion(w http.ResponseWriter, r *http.Request) { + // Get OS and Arch + if err := json.NewEncoder(w).Encode(map[string]interface{}{ + "data": map[string]interface{}{ + "version": fmt.Sprintf("Caplin/%s %s/%s", a.version, runtime.GOOS, runtime.GOARCH), + }, + }); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} diff --git a/cl/beacon/handler/node_test.go b/cl/beacon/handler/node_test.go index 094412ddd..8572f1430 100644 --- a/cl/beacon/handler/node_test.go +++ b/cl/beacon/handler/node_test.go @@ -1,15 +1,17 @@ package handler import ( + "encoding/json" "net/http" "net/http/httptest" + "strings" "testing" "github.com/ledgerwatch/erigon/cl/clparams" "github.com/stretchr/testify/require" ) -func TestNodeSyncing(t *testing.T) { +func TestNodeHealthSyncing(t *testing.T) { // i just want the correct schema to be generated _, _, _, _, _, handler, _, _, _ := setupTestingHandler(t, clparams.Phase0Version) @@ -26,7 +28,7 @@ func TestNodeSyncing(t *testing.T) { require.Equal(t, 666, resp.StatusCode) } -func TestNodeSyncingTip(t *testing.T) { +func TestNodeHealthSyncingTip(t *testing.T) { // i just want the correct schema to be generated _, _, _, _, post, handler, _, sm, _ := setupTestingHandler(t, clparams.Phase0Version) @@ -47,3 +49,23 @@ func TestNodeSyncingTip(t *testing.T) { defer resp.Body.Close() require.Equal(t, 200, resp.StatusCode) } + +func TestNodeVersion(t *testing.T) { + // i just want the correct schema to be generated + _, _, _, _, _, handler, _, _, _ := setupTestingHandler(t, clparams.Phase0Version) + + // Call GET /eth/v1/node/health + server := httptest.NewServer(handler.mux) + defer server.Close() + + req, err := http.NewRequest("GET", server.URL+"/eth/v1/node/version", nil) + require.NoError(t, err) + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + out := map[string]interface{}{} + require.NoError(t, json.NewDecoder(resp.Body).Decode(&out)) + v := out["data"].(map[string]interface{})["version"].(string) + require.True(t, strings.Contains(v, "Caplin")) +} diff --git a/cl/beacon/handler/utils_test.go b/cl/beacon/handler/utils_test.go index f4117fa17..345cd94d7 100644 --- a/cl/beacon/handler/utils_test.go +++ b/cl/beacon/handler/utils_test.go @@ -62,7 +62,8 @@ func setupTestingHandler(t *testing.T, v clparams.StateVersion) (db kv.RwDB, blo reader, syncedData, statesReader, - nil) + nil, + "test-version") handler.init() return } diff --git a/cl/phase1/core/state/accessors.go b/cl/phase1/core/state/accessors.go index 14d6d74c0..90974b2cc 100644 --- a/cl/phase1/core/state/accessors.go +++ b/cl/phase1/core/state/accessors.go @@ -1,6 +1,7 @@ package state import ( + "encoding/binary" "fmt" "github.com/Giulio2002/bls" @@ -28,6 +29,12 @@ func Epoch(b abstract.BeaconStateBasic) uint64 { return GetEpochAtSlot(b.BeaconConfig(), b.Slot()) } +func IsAggregator(cfg *clparams.BeaconChainConfig, committeeLength, slot, committeeIndex uint64, slotSignature libcommon.Bytes96) bool { + modulo := utils.Max64(1, committeeLength/cfg.TargetAggregatorsPerCommittee) + hashSlotSignatue := utils.Sha256(slotSignature[:]) + return binary.LittleEndian.Uint64(hashSlotSignatue[:8])%modulo == 0 +} + // GetTotalBalance return the sum of all balances within the given validator set. func GetTotalBalance(b abstract.BeaconStateBasic, validatorSet []uint64) (uint64, error) { var ( diff --git a/cl/phase1/core/state/cache.go b/cl/phase1/core/state/cache.go index fcd410f08..63ada3623 100644 --- a/cl/phase1/core/state/cache.go +++ b/cl/phase1/core/state/cache.go @@ -334,11 +334,16 @@ func readUint64WithBuffer(r io.Reader, buf []byte, out *uint64) error { // internal encoding/decoding algos func (b *CachingBeaconState) encodeActiveValidatorsCache(w io.Writer, num []byte) error { - keys := b.activeValidatorsCache.Keys() - lists := make([][]uint64, len(keys)) - - for i, key := range keys { - lists[i], _ = b.activeValidatorsCache.Get(key) + keysA := b.activeValidatorsCache.Keys() + keys := make([]uint64, 0, len(keysA)) + lists := make([][]uint64, 0, len(keys)) + for _, key := range keysA { + l, ok := b.activeValidatorsCache.Get(key) + if !ok || len(l) == 0 { + continue + } + keys = append(keys, key) + lists = append(lists, l) } // Write the total length if err := writeUint64WithBuffer(w, uint64(len(keys)), num); err != nil { @@ -396,11 +401,17 @@ func (b *CachingBeaconState) decodeActiveValidatorsCache(r io.Reader, num []byte // internal encoding/decoding algos func (b *CachingBeaconState) encodeShuffledSetsCache(w io.Writer, num []byte) error { - keys := b.shuffledSetsCache.Keys() - lists := make([][]uint64, len(keys)) + keysA := b.shuffledSetsCache.Keys() + keys := make([]common.Hash, 0, len(keysA)) + lists := make([][]uint64, 0, len(keys)) - for i, key := range keys { - lists[i], _ = b.shuffledSetsCache.Get(key) + for _, key := range keysA { + l, ok := b.shuffledSetsCache.Get(key) + if !ok || len(l) == 0 { + continue + } + keys = append(keys, key) + lists = append(lists, l) } // Write the total length if err := writeUint64WithBuffer(w, uint64(len(keys)), num); err != nil { diff --git a/cl/phase1/forkchoice/fork_choice_test.go b/cl/phase1/forkchoice/fork_choice_test.go index f712a4471..fa3559490 100644 --- a/cl/phase1/forkchoice/fork_choice_test.go +++ b/cl/phase1/forkchoice/fork_choice_test.go @@ -92,7 +92,7 @@ func TestForkChoiceBasic(t *testing.T) { require.Equal(t, headSlot, uint64(3)) require.Equal(t, headRoot, libcommon.HexToHash("0x744cc484f6503462f0f3a5981d956bf4fcb3e57ab8687ed006467e05049ee033")) // lastly do attestation - require.NoError(t, store.OnAttestation(testAttestation, false)) + require.NoError(t, store.OnAttestation(testAttestation, false, false)) // Try processing a voluntary exit err = store.OnVoluntaryExit(&cltypes.SignedVoluntaryExit{ VoluntaryExit: &cltypes.VoluntaryExit{ diff --git a/cl/phase1/forkchoice/forkchoice_mock.go b/cl/phase1/forkchoice/forkchoice_mock.go index 6ae413d4f..691b832f8 100644 --- a/cl/phase1/forkchoice/forkchoice_mock.go +++ b/cl/phase1/forkchoice/forkchoice_mock.go @@ -160,7 +160,7 @@ func (f *ForkChoiceStorageMock) Time() uint64 { return f.TimeVal } -func (f *ForkChoiceStorageMock) OnAttestation(attestation *solid.Attestation, fromBlock bool) error { +func (f *ForkChoiceStorageMock) OnAttestation(attestation *solid.Attestation, fromBlock, insert bool) error { f.Pool.AttestationsPool.Insert(attestation.Signature(), attestation) return nil } diff --git a/cl/phase1/forkchoice/interface.go b/cl/phase1/forkchoice/interface.go index 7da33e5ac..81fa579e2 100644 --- a/cl/phase1/forkchoice/interface.go +++ b/cl/phase1/forkchoice/interface.go @@ -44,7 +44,7 @@ type ForkChoiceStorageReader interface { } type ForkChoiceStorageWriter interface { - OnAttestation(attestation *solid.Attestation, fromBlock bool) error + OnAttestation(attestation *solid.Attestation, fromBlock, insert bool) error OnAttesterSlashing(attesterSlashing *cltypes.AttesterSlashing, test bool) error OnVoluntaryExit(signedVoluntaryExit *cltypes.SignedVoluntaryExit, test bool) error OnProposerSlashing(proposerSlashing *cltypes.ProposerSlashing, test bool) error diff --git a/cl/phase1/forkchoice/on_attestation.go b/cl/phase1/forkchoice/on_attestation.go index ed734b4b8..5dd9326b0 100644 --- a/cl/phase1/forkchoice/on_attestation.go +++ b/cl/phase1/forkchoice/on_attestation.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + "github.com/ledgerwatch/erigon/cl/cltypes" "github.com/ledgerwatch/erigon/cl/cltypes/solid" "github.com/ledgerwatch/erigon/cl/phase1/cache" "github.com/ledgerwatch/erigon/cl/phase1/core/state" @@ -13,7 +14,7 @@ import ( ) // OnAttestation processes incoming attestations. -func (f *ForkChoiceStore) OnAttestation(attestation *solid.Attestation, fromBlock bool) error { +func (f *ForkChoiceStore) OnAttestation(attestation *solid.Attestation, fromBlock bool, insert bool) error { f.mu.Lock() defer f.mu.Unlock() f.headHash = libcommon.Hash{} @@ -61,11 +62,41 @@ func (f *ForkChoiceStore) OnAttestation(attestation *solid.Attestation, fromBloc cache.StoreAttestation(&data, attestation.AggregationBits(), attestationIndicies) // Lastly update latest messages. f.processAttestingIndicies(attestation, attestationIndicies) + if !fromBlock && insert { + // Add to the pool when verified. + f.operationsPool.AttestationsPool.Insert(attestation.Signature(), attestation) + } return nil } +func (f *ForkChoiceStore) OnAggregateAndProof(aggregateAndProof *cltypes.SignedAggregateAndProof, test bool) error { + slot := aggregateAndProof.Message.Aggregate.AttestantionData().Slot() + selectionProof := aggregateAndProof.Message.SelectionProof + committeeIndex := aggregateAndProof.Message.Aggregate.AttestantionData().ValidatorIndex() + epoch := state.GetEpochAtSlot(f.beaconCfg, slot) + + target := aggregateAndProof.Message.Aggregate.AttestantionData().Target() + targetState, err := f.getCheckpointState(target) + if err != nil { + return nil + } + + activeIndicies := targetState.getActiveIndicies(epoch) + activeIndiciesLength := uint64(len(activeIndicies)) + + count := targetState.committeeCount(epoch, activeIndiciesLength) * f.beaconCfg.SlotsPerEpoch + start := (activeIndiciesLength * committeeIndex) / count + end := (activeIndiciesLength * (committeeIndex + 1)) / count + committeeLength := end - start + if !state.IsAggregator(f.beaconCfg, committeeLength, slot, committeeIndex, selectionProof) { + log.Warn("invalid aggregate and proof") + return fmt.Errorf("invalid aggregate and proof") + } + return f.OnAttestation(aggregateAndProof.Message.Aggregate, false, false) +} + // scheduleAttestationForLaterProcessing scheudules an attestation for later processing -func (f *ForkChoiceStore) scheduleAttestationForLaterProcessing(attestation *solid.Attestation, fromBlock bool) { +func (f *ForkChoiceStore) scheduleAttestationForLaterProcessing(attestation *solid.Attestation, insert bool) { go func() { logInterval := time.NewTicker(50 * time.Millisecond) for { @@ -76,8 +107,8 @@ func (f *ForkChoiceStore) scheduleAttestationForLaterProcessing(attestation *sol if f.Slot() < attestation.AttestantionData().Slot()+1 { continue } - if err := f.OnAttestation(attestation, false); err != nil { - log.Trace("could not process scheduled attestation", "reason", err) + if err := f.OnAttestation(attestation, false, insert); err != nil { + log.Debug("could not process scheduled attestation", "reason", err) } return } diff --git a/cl/phase1/network/gossip_manager.go b/cl/phase1/network/gossip_manager.go index 1207c1e92..88c0841de 100644 --- a/cl/phase1/network/gossip_manager.go +++ b/cl/phase1/network/gossip_manager.go @@ -160,6 +160,10 @@ func (g *GossipManager) onRecv(ctx context.Context, data *sentinel.GossipData, l if err := operationsContract[*cltypes.SignedBLSToExecutionChange](ctx, g, l, data, int(version), "bls to execution change", g.forkChoice.OnBlsToExecutionChange); err != nil { return err } + case gossip.TopicNameBeaconAggregateAndProof: + if err := operationsContract[*cltypes.SignedAggregateAndProof](ctx, g, l, data, int(version), "aggregate and proof", g.forkChoice.OnAggregateAndProof); err != nil { + return err + } } return nil } diff --git a/cl/phase1/stages/clstages.go b/cl/phase1/stages/clstages.go index b2d57dd01..8c5710bfe 100644 --- a/cl/phase1/stages/clstages.go +++ b/cl/phase1/stages/clstages.go @@ -405,7 +405,7 @@ func ConsensusClStages(ctx context.Context, continue MainLoop } block.Block.Body.Attestations.Range(func(idx int, a *solid.Attestation, total int) bool { - if err = cfg.forkChoice.OnAttestation(a, true); err != nil { + if err = cfg.forkChoice.OnAttestation(a, true, false); err != nil { log.Debug("bad attestation received", "err", err) } return true diff --git a/cl/pool/operation_pool.go b/cl/pool/operation_pool.go index 449621355..348680559 100644 --- a/cl/pool/operation_pool.go +++ b/cl/pool/operation_pool.go @@ -1,13 +1,19 @@ package pool import ( + "time" + "github.com/ledgerwatch/erigon/cl/phase1/core/state/lru" ) +const lifeSpan = 30 * time.Minute + var operationsMultiplier = 20 // Cap the amount of cached element to max_operations_per_block * operations_multiplier type OperationPool[K comparable, T any] struct { - pool *lru.Cache[K, T] // Map the Signature to the underlying object + pool *lru.Cache[K, T] // Map the Signature to the underlying object + recentlySeen map[K]time.Time + lastPruned time.Time } func NewOperationPool[K comparable, T any](maxOperationsPerBlock int, matricName string) *OperationPool[K, T] { @@ -15,11 +21,30 @@ func NewOperationPool[K comparable, T any](maxOperationsPerBlock int, matricName if err != nil { panic(err) } - return &OperationPool[K, T]{pool: pool} + return &OperationPool[K, T]{ + pool: pool, + recentlySeen: make(map[K]time.Time), + } } func (o *OperationPool[K, T]) Insert(k K, operation T) { + if _, ok := o.recentlySeen[k]; ok { + return + } o.pool.Add(k, operation) + o.recentlySeen[k] = time.Now() + if time.Since(o.lastPruned) > lifeSpan { + deleteList := make([]K, 0, len(o.recentlySeen)) + for k, t := range o.recentlySeen { + if time.Since(t) > lifeSpan { + deleteList = append(deleteList, k) + } + } + for _, k := range deleteList { + delete(o.recentlySeen, k) + } + o.lastPruned = time.Now() + } } func (o *OperationPool[K, T]) DeleteIfExist(k K) (removed bool) { diff --git a/cl/sentinel/service/start.go b/cl/sentinel/service/start.go index 50ad1b38c..f84de0094 100644 --- a/cl/sentinel/service/start.go +++ b/cl/sentinel/service/start.go @@ -31,7 +31,7 @@ func createSentinel(cfg *sentinel.SentinelConfig, db persistence.RawBeaconBlockC } gossipTopics := []sentinel.GossipTopic{ sentinel.BeaconBlockSsz, - //sentinel.BeaconAggregateAndProofSsz, + sentinel.BeaconAggregateAndProofSsz, sentinel.VoluntaryExitSsz, sentinel.ProposerSlashingSsz, sentinel.AttesterSlashingSsz, diff --git a/cl/spectest/consensus_tests/fork_choice.go b/cl/spectest/consensus_tests/fork_choice.go index ca39a83c0..ac33534e0 100644 --- a/cl/spectest/consensus_tests/fork_choice.go +++ b/cl/spectest/consensus_tests/fork_choice.go @@ -3,10 +3,11 @@ package consensus_tests import ( "context" "fmt" - "github.com/ledgerwatch/erigon/spectest" "io/fs" "testing" + "github.com/ledgerwatch/erigon/spectest" + "github.com/ledgerwatch/erigon/cl/abstract" "github.com/ledgerwatch/erigon/cl/clparams" "github.com/ledgerwatch/erigon/cl/cltypes/solid" @@ -194,7 +195,7 @@ func (b *ForkChoice) Run(t *testing.T, root fs.FS, c spectest.TestCase) (err err att := &solid.Attestation{} err := spectest.ReadSsz(root, c.Version(), step.GetAttestation()+".ssz_snappy", att) require.NoError(t, err, stepstr) - err = forkStore.OnAttestation(att, false) + err = forkStore.OnAttestation(att, false, false) if step.GetValid() { require.NoError(t, err, stepstr) } else { diff --git a/cmd/caplin-regression/regression/tester.go b/cmd/caplin-regression/regression/tester.go index badaebe54..9734b2189 100644 --- a/cmd/caplin-regression/regression/tester.go +++ b/cmd/caplin-regression/regression/tester.go @@ -88,7 +88,7 @@ func TestRegressionWithValidation(store *forkchoice.ForkChoiceStore, block *clty return err } block.Block.Body.Attestations.Range(func(index int, value *solid2.Attestation, length int) bool { - store.OnAttestation(value, true) + store.OnAttestation(value, true, true) return true }) return nil diff --git a/cmd/caplin/caplin1/run.go b/cmd/caplin/caplin1/run.go index 7455d911f..6edd48572 100644 --- a/cmd/caplin/caplin1/run.go +++ b/cmd/caplin/caplin1/run.go @@ -18,6 +18,7 @@ import ( "github.com/ledgerwatch/erigon/cl/freezer" freezer2 "github.com/ledgerwatch/erigon/cl/freezer" "github.com/ledgerwatch/erigon/eth/ethconfig" + "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks" "github.com/ledgerwatch/erigon/cl/persistence" @@ -212,7 +213,7 @@ func RunCaplinPhase1(ctx context.Context, sentinel sentinel.SentinelClient, engi statesReader := historical_states_reader.NewHistoricalStatesReader(beaconConfig, rcsn, vTables, af, genesisState) syncedDataManager := synced_data.NewSyncedDataManager(cfg.Active, beaconConfig) if cfg.Active { - apiHandler := handler.NewApiHandler(genesisConfig, beaconConfig, rawDB, indexDB, forkChoice, pool, rcsn, syncedDataManager, statesReader, sentinel) + apiHandler := handler.NewApiHandler(genesisConfig, beaconConfig, rawDB, indexDB, forkChoice, pool, rcsn, syncedDataManager, statesReader, sentinel, params.GitTag) headApiHandler := &validatorapi.ValidatorApiHandler{ FC: forkChoice, BeaconChainCfg: beaconConfig,