mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-22 03:30:35 +00:00
revert revert of f6764fe62b
(#12399)
This commit is contained in:
parent
b4f1fea029
commit
955a21fea4
@ -581,7 +581,8 @@ func (b *BeaconNode) fetchBuilderService() *builder.Service {
|
|||||||
|
|
||||||
func (b *BeaconNode) registerAttestationPool() error {
|
func (b *BeaconNode) registerAttestationPool() error {
|
||||||
s, err := attestations.NewService(b.ctx, &attestations.Config{
|
s, err := attestations.NewService(b.ctx, &attestations.Config{
|
||||||
Pool: b.attestationPool,
|
Pool: b.attestationPool,
|
||||||
|
InitialSyncComplete: b.initialSyncComplete,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "could not register atts pool service")
|
return errors.Wrap(err, "could not register atts pool service")
|
||||||
|
@ -18,6 +18,7 @@ go_library(
|
|||||||
deps = [
|
deps = [
|
||||||
"//beacon-chain/operations/attestations/kv:go_default_library",
|
"//beacon-chain/operations/attestations/kv:go_default_library",
|
||||||
"//cache/lru:go_default_library",
|
"//cache/lru:go_default_library",
|
||||||
|
"//config/features:go_default_library",
|
||||||
"//config/params:go_default_library",
|
"//config/params:go_default_library",
|
||||||
"//consensus-types/primitives:go_default_library",
|
"//consensus-types/primitives:go_default_library",
|
||||||
"//crypto/hash:go_default_library",
|
"//crypto/hash:go_default_library",
|
||||||
|
@ -7,6 +7,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/prysmaticlabs/go-bitfield"
|
"github.com/prysmaticlabs/go-bitfield"
|
||||||
|
"github.com/prysmaticlabs/prysm/v4/config/features"
|
||||||
|
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||||
"github.com/prysmaticlabs/prysm/v4/crypto/hash"
|
"github.com/prysmaticlabs/prysm/v4/crypto/hash"
|
||||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||||
attaggregation "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1/attestation/aggregation/attestations"
|
attaggregation "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1/attestation/aggregation/attestations"
|
||||||
@ -14,20 +16,30 @@ import (
|
|||||||
"go.opencensus.io/trace"
|
"go.opencensus.io/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Prepare attestations for fork choice three times per slot.
|
|
||||||
var prepareForkChoiceAttsPeriod = slots.DivideSlotBy(3 /* times-per-slot */)
|
|
||||||
|
|
||||||
// This prepares fork choice attestations by running batchForkChoiceAtts
|
// This prepares fork choice attestations by running batchForkChoiceAtts
|
||||||
// every prepareForkChoiceAttsPeriod.
|
// every prepareForkChoiceAttsPeriod.
|
||||||
func (s *Service) prepareForkChoiceAtts() {
|
func (s *Service) prepareForkChoiceAtts() {
|
||||||
ticker := time.NewTicker(prepareForkChoiceAttsPeriod)
|
intervals := features.Get().AggregateIntervals
|
||||||
defer ticker.Stop()
|
slotDuration := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
|
||||||
|
// Adjust intervals for networks with a lower slot duration (Hive, e2e, etc)
|
||||||
|
for {
|
||||||
|
if intervals[len(intervals)-1] >= slotDuration {
|
||||||
|
for i, offset := range intervals {
|
||||||
|
intervals[i] = offset / 2
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ticker := slots.NewSlotTickerWithIntervals(time.Unix(int64(s.genesisTime), 0), intervals)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C():
|
||||||
|
t := time.Now()
|
||||||
if err := s.batchForkChoiceAtts(s.ctx); err != nil {
|
if err := s.batchForkChoiceAtts(s.ctx); err != nil {
|
||||||
log.WithError(err).Error("Could not prepare attestations for fork choice")
|
log.WithError(err).Error("Could not prepare attestations for fork choice")
|
||||||
}
|
}
|
||||||
|
log.WithField("latency", time.Since(t).Milliseconds()).Debug("batched forkchoice attestations")
|
||||||
case <-s.ctx.Done():
|
case <-s.ctx.Done():
|
||||||
log.Debug("Context closed, exiting routine")
|
log.Debug("Context closed, exiting routine")
|
||||||
return
|
return
|
||||||
|
@ -5,6 +5,7 @@ package attestations
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru"
|
||||||
@ -26,8 +27,9 @@ type Service struct {
|
|||||||
|
|
||||||
// Config options for the service.
|
// Config options for the service.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Pool Pool
|
Pool Pool
|
||||||
pruneInterval time.Duration
|
pruneInterval time.Duration
|
||||||
|
InitialSyncComplete chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewService instantiates a new attestation pool service instance that will
|
// NewService instantiates a new attestation pool service instance that will
|
||||||
@ -51,10 +53,24 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
|
|||||||
|
|
||||||
// Start an attestation pool service's main event loop.
|
// Start an attestation pool service's main event loop.
|
||||||
func (s *Service) Start() {
|
func (s *Service) Start() {
|
||||||
|
if err := s.waitForSync(s.cfg.InitialSyncComplete); err != nil {
|
||||||
|
log.WithError(err).Error("failed to wait for initial sync")
|
||||||
|
return
|
||||||
|
}
|
||||||
go s.prepareForkChoiceAtts()
|
go s.prepareForkChoiceAtts()
|
||||||
go s.pruneAttsPool()
|
go s.pruneAttsPool()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// waitForSync waits until the beacon node is synced to the latest head.
|
||||||
|
func (s *Service) waitForSync(syncChan chan struct{}) error {
|
||||||
|
select {
|
||||||
|
case <-syncChan:
|
||||||
|
return nil
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
return errors.New("context closed, exiting goroutine")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Stop the beacon block attestation pool service's main event loop
|
// Stop the beacon block attestation pool service's main event loop
|
||||||
// and associated goroutines.
|
// and associated goroutines.
|
||||||
func (s *Service) Stop() error {
|
func (s *Service) Stop() error {
|
||||||
|
@ -72,6 +72,9 @@ type Flags struct {
|
|||||||
// KeystoreImportDebounceInterval specifies the time duration the validator waits to reload new keys if they have
|
// KeystoreImportDebounceInterval specifies the time duration the validator waits to reload new keys if they have
|
||||||
// changed on disk. This feature is for advanced use cases only.
|
// changed on disk. This feature is for advanced use cases only.
|
||||||
KeystoreImportDebounceInterval time.Duration
|
KeystoreImportDebounceInterval time.Duration
|
||||||
|
|
||||||
|
// AggregateIntervals specifies the time durations at which we aggregate attestations preparing for forkchoice.
|
||||||
|
AggregateIntervals []time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
var featureConfig *Flags
|
var featureConfig *Flags
|
||||||
@ -218,6 +221,7 @@ func ConfigureBeaconChain(ctx *cli.Context) error {
|
|||||||
logEnabled(buildBlockParallel)
|
logEnabled(buildBlockParallel)
|
||||||
cfg.BuildBlockParallel = true
|
cfg.BuildBlockParallel = true
|
||||||
}
|
}
|
||||||
|
cfg.AggregateIntervals = []time.Duration{aggregateFirstInterval.Value, aggregateSecondInterval.Value, aggregateThirdInterval.Value}
|
||||||
Init(cfg)
|
Init(cfg)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -50,6 +50,24 @@ var (
|
|||||||
Usage: "(Danger): Writes the wallet password to the wallet directory on completing Prysm web onboarding. " +
|
Usage: "(Danger): Writes the wallet password to the wallet directory on completing Prysm web onboarding. " +
|
||||||
"We recommend against this flag unless you are an advanced user.",
|
"We recommend against this flag unless you are an advanced user.",
|
||||||
}
|
}
|
||||||
|
aggregateFirstInterval = &cli.DurationFlag{
|
||||||
|
Name: "aggregate-first-interval",
|
||||||
|
Usage: "(Advanced): Specifies the first interval in which attestations are aggregated in the slot (typically unnaggregated attestations are aggregated in this interval)",
|
||||||
|
Value: 7 * time.Second,
|
||||||
|
Hidden: true,
|
||||||
|
}
|
||||||
|
aggregateSecondInterval = &cli.DurationFlag{
|
||||||
|
Name: "aggregate-second-interval",
|
||||||
|
Usage: "(Advanced): Specifies the second interval in which attestations are aggregated in the slot",
|
||||||
|
Value: 9 * time.Second,
|
||||||
|
Hidden: true,
|
||||||
|
}
|
||||||
|
aggregateThirdInterval = &cli.DurationFlag{
|
||||||
|
Name: "aggregate-third-interval",
|
||||||
|
Usage: "(Advanced): Specifies the third interval in which attestations are aggregated in the slot",
|
||||||
|
Value: 11 * time.Second,
|
||||||
|
Hidden: true,
|
||||||
|
}
|
||||||
dynamicKeyReloadDebounceInterval = &cli.DurationFlag{
|
dynamicKeyReloadDebounceInterval = &cli.DurationFlag{
|
||||||
Name: "dynamic-key-reload-debounce-interval",
|
Name: "dynamic-key-reload-debounce-interval",
|
||||||
Usage: "(Advanced): Specifies the time duration the validator waits to reload new keys if they have " +
|
Usage: "(Advanced): Specifies the time duration the validator waits to reload new keys if they have " +
|
||||||
@ -168,6 +186,9 @@ var BeaconChainFlags = append(deprecatedBeaconFlags, append(deprecatedFlags, []c
|
|||||||
enableVerboseSigVerification,
|
enableVerboseSigVerification,
|
||||||
enableOptionalEngineMethods,
|
enableOptionalEngineMethods,
|
||||||
prepareAllPayloads,
|
prepareAllPayloads,
|
||||||
|
aggregateFirstInterval,
|
||||||
|
aggregateSecondInterval,
|
||||||
|
aggregateThirdInterval,
|
||||||
buildBlockParallel,
|
buildBlockParallel,
|
||||||
}...)...)
|
}...)...)
|
||||||
|
|
||||||
|
@ -37,5 +37,6 @@ go_test(
|
|||||||
"//time:go_default_library",
|
"//time:go_default_library",
|
||||||
"@com_github_sirupsen_logrus//:go_default_library",
|
"@com_github_sirupsen_logrus//:go_default_library",
|
||||||
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
|
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
|
||||||
|
"@com_github_stretchr_testify//require:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -4,6 +4,7 @@ package slots
|
|||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||||
prysmTime "github.com/prysmaticlabs/prysm/v4/time"
|
prysmTime "github.com/prysmaticlabs/prysm/v4/time"
|
||||||
)
|
)
|
||||||
@ -104,3 +105,65 @@ func (s *SlotTicker) start(
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// startWithIntervals starts a ticker that emits a tick every slot at the
|
||||||
|
// prescribed intervals. The caller is responsible to make these intervals increasing and
|
||||||
|
// less than secondsPerSlot
|
||||||
|
func (s *SlotTicker) startWithIntervals(
|
||||||
|
genesisTime time.Time,
|
||||||
|
until func(time.Time) time.Duration,
|
||||||
|
after func(time.Duration) <-chan time.Time,
|
||||||
|
intervals []time.Duration) {
|
||||||
|
go func() {
|
||||||
|
slot := Since(genesisTime)
|
||||||
|
slot++
|
||||||
|
interval := 0
|
||||||
|
nextTickTime := startFromTime(genesisTime, slot).Add(intervals[0])
|
||||||
|
|
||||||
|
for {
|
||||||
|
waitTime := until(nextTickTime)
|
||||||
|
select {
|
||||||
|
case <-after(waitTime):
|
||||||
|
s.c <- slot
|
||||||
|
interval++
|
||||||
|
if interval == len(intervals) {
|
||||||
|
interval = 0
|
||||||
|
slot++
|
||||||
|
}
|
||||||
|
nextTickTime = startFromTime(genesisTime, slot).Add(intervals[interval])
|
||||||
|
case <-s.done:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewSlotTickerWithIntervals starts and returns a SlotTicker instance that allows
|
||||||
|
// several offsets of time from genesis,
|
||||||
|
// Caller is responsible to input the intervals in increasing order and none bigger or equal than
|
||||||
|
// SecondsPerSlot
|
||||||
|
func NewSlotTickerWithIntervals(genesisTime time.Time, intervals []time.Duration) *SlotTicker {
|
||||||
|
if genesisTime.Unix() == 0 {
|
||||||
|
panic("zero genesis time")
|
||||||
|
}
|
||||||
|
if len(intervals) == 0 {
|
||||||
|
panic("at least one interval has to be entered")
|
||||||
|
}
|
||||||
|
slotDuration := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
|
||||||
|
lastOffset := time.Duration(0)
|
||||||
|
for _, offset := range intervals {
|
||||||
|
if offset < lastOffset {
|
||||||
|
panic("invalid decreasing offsets")
|
||||||
|
}
|
||||||
|
if offset >= slotDuration {
|
||||||
|
panic("invalid ticker offset")
|
||||||
|
}
|
||||||
|
lastOffset = offset
|
||||||
|
}
|
||||||
|
ticker := &SlotTicker{
|
||||||
|
c: make(chan primitives.Slot),
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
ticker.startWithIntervals(genesisTime, prysmTime.Until, time.After, intervals)
|
||||||
|
return ticker
|
||||||
|
}
|
||||||
|
@ -4,7 +4,9 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ Ticker = (*SlotTicker)(nil)
|
var _ Ticker = (*SlotTicker)(nil)
|
||||||
@ -136,3 +138,49 @@ func TestGetSlotTickerWithOffset_OK(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGetSlotTickerWitIntervals(t *testing.T) {
|
||||||
|
genesisTime := time.Now()
|
||||||
|
offset := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second / 3
|
||||||
|
intervals := []time.Duration{offset, 2 * offset}
|
||||||
|
|
||||||
|
intervalTicker := NewSlotTickerWithIntervals(genesisTime, intervals)
|
||||||
|
normalTicker := NewSlotTicker(genesisTime, params.BeaconConfig().SecondsPerSlot)
|
||||||
|
|
||||||
|
firstTicked := 0
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-intervalTicker.C():
|
||||||
|
// interval ticks starts in second slot
|
||||||
|
if firstTicked < 2 {
|
||||||
|
t.Fatal("Expected other ticker to tick first")
|
||||||
|
}
|
||||||
|
return
|
||||||
|
case <-normalTicker.C():
|
||||||
|
if firstTicked > 1 {
|
||||||
|
t.Fatal("Expected normal ticker to tick first")
|
||||||
|
}
|
||||||
|
firstTicked++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSlotTickerWithIntervalsInputValidation(t *testing.T) {
|
||||||
|
var genesisTime time.Time
|
||||||
|
offset := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second / 3
|
||||||
|
intervals := make([]time.Duration, 0)
|
||||||
|
panicCall := func() {
|
||||||
|
NewSlotTickerWithIntervals(genesisTime, intervals)
|
||||||
|
}
|
||||||
|
require.Panics(t, panicCall, "zero genesis time")
|
||||||
|
genesisTime = time.Now()
|
||||||
|
require.Panics(t, panicCall, "at least one interval has to be entered")
|
||||||
|
intervals = []time.Duration{2 * offset, offset}
|
||||||
|
require.Panics(t, panicCall, "invalid decreasing offsets")
|
||||||
|
intervals = []time.Duration{offset, 4 * offset}
|
||||||
|
require.Panics(t, panicCall, "invalid ticker offset")
|
||||||
|
intervals = []time.Duration{4 * offset, offset}
|
||||||
|
require.Panics(t, panicCall, "invalid ticker offset")
|
||||||
|
intervals = []time.Duration{offset, 2 * offset}
|
||||||
|
require.NotPanics(t, panicCall)
|
||||||
|
}
|
||||||
|
@ -16,12 +16,17 @@ import (
|
|||||||
// incoming objects. (24 mins with mainnet spec)
|
// incoming objects. (24 mins with mainnet spec)
|
||||||
const MaxSlotBuffer = uint64(1 << 7)
|
const MaxSlotBuffer = uint64(1 << 7)
|
||||||
|
|
||||||
|
// startFromTime returns the slot start in terms of genesis time.Time
|
||||||
|
func startFromTime(genesis time.Time, slot primitives.Slot) time.Time {
|
||||||
|
duration := time.Second * time.Duration(slot.Mul(params.BeaconConfig().SecondsPerSlot))
|
||||||
|
return genesis.Add(duration) // lint:ignore uintcast -- Genesis timestamp will not exceed int64 in your lifetime.
|
||||||
|
}
|
||||||
|
|
||||||
// StartTime returns the start time in terms of its unix epoch
|
// StartTime returns the start time in terms of its unix epoch
|
||||||
// value.
|
// value.
|
||||||
func StartTime(genesis uint64, slot primitives.Slot) time.Time {
|
func StartTime(genesis uint64, slot primitives.Slot) time.Time {
|
||||||
duration := time.Second * time.Duration(slot.Mul(params.BeaconConfig().SecondsPerSlot))
|
genesisTime := time.Unix(int64(genesis), 0) // lint:ignore uintcast -- Genesis timestamp will not exceed int64 in your lifetime.
|
||||||
startTime := time.Unix(int64(genesis), 0).Add(duration) // lint:ignore uintcast -- Genesis timestamp will not exceed int64 in your lifetime.
|
return startFromTime(genesisTime, slot)
|
||||||
return startTime
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SinceGenesis returns the number of slots since
|
// SinceGenesis returns the number of slots since
|
||||||
|
Loading…
Reference in New Issue
Block a user