mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-22 03:30:37 +00:00
Fixed ungraceful shutdown (#8307)
This commit is contained in:
parent
adaba41538
commit
d555fc450d
32
cl/pool/operation_pool.go
Normal file
32
cl/pool/operation_pool.go
Normal file
@ -0,0 +1,32 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
libcommon "github.com/ledgerwatch/erigon-lib/common"
|
||||
"github.com/ledgerwatch/erigon/cl/phase1/core/state/lru"
|
||||
)
|
||||
|
||||
var operationsMultiplier = 10 // Cap the amount of cached element to max_operations_per_block * operations_multiplier
|
||||
|
||||
type OperationPool[T any] struct {
|
||||
pool *lru.Cache[libcommon.Bytes96, T] // Map the Signature to the underlying object
|
||||
}
|
||||
|
||||
func NewOperationPool[T any](maxOperationsPerBlock int, matricName string) *OperationPool[T] {
|
||||
pool, err := lru.New[libcommon.Bytes96, T](matricName, maxOperationsPerBlock*operationsMultiplier)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return &OperationPool[T]{pool: pool}
|
||||
}
|
||||
|
||||
func (o *OperationPool[T]) Insert(signature libcommon.Bytes96, operation T) {
|
||||
o.pool.Add(signature, operation)
|
||||
}
|
||||
|
||||
func (o *OperationPool[T]) DeleteIfExist(signature libcommon.Bytes96) (removed bool) {
|
||||
return o.pool.Remove(signature)
|
||||
}
|
||||
|
||||
func (o *OperationPool[T]) Raw() []T {
|
||||
return o.pool.Values()
|
||||
}
|
43
cl/pool/operations_pool.go
Normal file
43
cl/pool/operations_pool.go
Normal file
@ -0,0 +1,43 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
libcommon "github.com/ledgerwatch/erigon-lib/common"
|
||||
"github.com/ledgerwatch/erigon/cl/clparams"
|
||||
"github.com/ledgerwatch/erigon/cl/cltypes"
|
||||
"github.com/ledgerwatch/erigon/cl/cltypes/solid"
|
||||
"github.com/ledgerwatch/erigon/cl/utils"
|
||||
)
|
||||
|
||||
// DoubleSignatureKey uses blake2b algorithm to merge two signatures together. blake2 is faster than sha3.
|
||||
func doubleSignatureKey(one, two libcommon.Bytes96) (out libcommon.Bytes96) {
|
||||
res := utils.Keccak256(one[:], two[:])
|
||||
copy(out[:], res[:])
|
||||
return
|
||||
}
|
||||
|
||||
func ComputeKeyForProposerSlashing(slashing *cltypes.ProposerSlashing) libcommon.Bytes96 {
|
||||
return doubleSignatureKey(slashing.Header1.Signature, slashing.Header2.Signature)
|
||||
}
|
||||
|
||||
func ComputeKeyForAttesterSlashing(slashing *cltypes.AttesterSlashing) libcommon.Bytes96 {
|
||||
return doubleSignatureKey(slashing.Attestation_1.Signature, slashing.Attestation_2.Signature)
|
||||
}
|
||||
|
||||
// OperationsPool is the collection of all gossip-collectable operations.
|
||||
type OperationsPool struct {
|
||||
AttestationsPool *OperationPool[*solid.Attestation]
|
||||
AttesterSlashingsPool *OperationPool[*cltypes.AttesterSlashing]
|
||||
ProposerSlashingsPool *OperationPool[*cltypes.ProposerSlashing]
|
||||
BLSToExecutionChangesPool *OperationPool[*cltypes.SignedBLSToExecutionChange]
|
||||
VoluntaryExistsPool *OperationPool[*cltypes.SignedVoluntaryExit]
|
||||
}
|
||||
|
||||
func NewOperationsPool(beaconCfg *clparams.BeaconChainConfig) OperationsPool {
|
||||
return OperationsPool{
|
||||
AttestationsPool: NewOperationPool[*solid.Attestation](int(beaconCfg.MaxAttestations), "attestationsPool"),
|
||||
AttesterSlashingsPool: NewOperationPool[*cltypes.AttesterSlashing](int(beaconCfg.MaxAttestations), "attesterSlashingsPool"),
|
||||
ProposerSlashingsPool: NewOperationPool[*cltypes.ProposerSlashing](int(beaconCfg.MaxAttestations), "proposerSlashingsPool"),
|
||||
BLSToExecutionChangesPool: NewOperationPool[*cltypes.SignedBLSToExecutionChange](int(beaconCfg.MaxBlsToExecutionChanges), "blsExecutionChangesPool"),
|
||||
VoluntaryExistsPool: NewOperationPool[*cltypes.SignedVoluntaryExit](int(beaconCfg.MaxBlsToExecutionChanges), "voluntaryExitsPool"),
|
||||
}
|
||||
}
|
71
cl/pool/operations_pool_test.go
Normal file
71
cl/pool/operations_pool_test.go
Normal file
@ -0,0 +1,71 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/ledgerwatch/erigon/cl/clparams"
|
||||
"github.com/ledgerwatch/erigon/cl/cltypes"
|
||||
"github.com/ledgerwatch/erigon/cl/cltypes/solid"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestOperationsPool(t *testing.T) {
|
||||
pools := NewOperationsPool(&clparams.MainnetBeaconConfig)
|
||||
|
||||
// AttestationsPool
|
||||
pools.AttestationsPool.Insert([96]byte{}, &solid.Attestation{})
|
||||
pools.AttestationsPool.Insert([96]byte{1}, &solid.Attestation{})
|
||||
require.Equal(t, 2, len(pools.AttestationsPool.Raw()))
|
||||
require.True(t, pools.AttestationsPool.DeleteIfExist([96]byte{}))
|
||||
require.Equal(t, 1, len(pools.AttestationsPool.Raw()))
|
||||
// ProposerSlashingsPool
|
||||
slashing1 := &cltypes.ProposerSlashing{
|
||||
Header1: &cltypes.SignedBeaconBlockHeader{
|
||||
Signature: [96]byte{1},
|
||||
},
|
||||
Header2: &cltypes.SignedBeaconBlockHeader{
|
||||
Signature: [96]byte{2},
|
||||
},
|
||||
}
|
||||
slashing2 := &cltypes.ProposerSlashing{
|
||||
Header1: &cltypes.SignedBeaconBlockHeader{
|
||||
Signature: [96]byte{3},
|
||||
},
|
||||
Header2: &cltypes.SignedBeaconBlockHeader{
|
||||
Signature: [96]byte{4},
|
||||
},
|
||||
}
|
||||
pools.ProposerSlashingsPool.Insert(ComputeKeyForProposerSlashing(slashing1), slashing1)
|
||||
pools.ProposerSlashingsPool.Insert(ComputeKeyForProposerSlashing(slashing2), slashing2)
|
||||
require.True(t, pools.ProposerSlashingsPool.DeleteIfExist(ComputeKeyForProposerSlashing(slashing2)))
|
||||
// AttesterSlashingsPool
|
||||
attesterSlashing1 := &cltypes.AttesterSlashing{
|
||||
Attestation_1: &cltypes.IndexedAttestation{
|
||||
Signature: [96]byte{1},
|
||||
},
|
||||
Attestation_2: &cltypes.IndexedAttestation{
|
||||
Signature: [96]byte{2},
|
||||
},
|
||||
}
|
||||
attesterSlashing2 := &cltypes.AttesterSlashing{
|
||||
Attestation_1: &cltypes.IndexedAttestation{
|
||||
Signature: [96]byte{3},
|
||||
},
|
||||
Attestation_2: &cltypes.IndexedAttestation{
|
||||
Signature: [96]byte{4},
|
||||
},
|
||||
}
|
||||
pools.AttesterSlashingsPool.Insert(ComputeKeyForAttesterSlashing(attesterSlashing1), attesterSlashing1)
|
||||
pools.AttesterSlashingsPool.Insert(ComputeKeyForAttesterSlashing(attesterSlashing2), attesterSlashing2)
|
||||
require.True(t, pools.AttesterSlashingsPool.DeleteIfExist(ComputeKeyForAttesterSlashing(attesterSlashing2)))
|
||||
require.Equal(t, 1, len(pools.AttesterSlashingsPool.Raw()))
|
||||
|
||||
// BLSToExecutionChangesPool
|
||||
pools.BLSToExecutionChangesPool.Insert([96]byte{}, &cltypes.SignedBLSToExecutionChange{})
|
||||
pools.BLSToExecutionChangesPool.Insert([96]byte{1}, &cltypes.SignedBLSToExecutionChange{})
|
||||
require.Equal(t, 2, len(pools.BLSToExecutionChangesPool.Raw()))
|
||||
require.True(t, pools.BLSToExecutionChangesPool.DeleteIfExist([96]byte{}))
|
||||
require.Equal(t, 1, len(pools.BLSToExecutionChangesPool.Raw()))
|
||||
|
||||
require.Equal(t, 1, len(pools.ProposerSlashingsPool.Raw()))
|
||||
}
|
@ -235,7 +235,13 @@ func (d *Downloader) mainLoop(silent bool) error {
|
||||
}
|
||||
}(t)
|
||||
}
|
||||
time.Sleep(10 * time.Second)
|
||||
timer := time.NewTimer(10 * time.Second)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-d.ctx.Done():
|
||||
return
|
||||
case <-timer.C:
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user