diff --git a/cl/antiquary/state_antiquary.go b/cl/antiquary/state_antiquary.go index c4b55bf87..892df0a2c 100644 --- a/cl/antiquary/state_antiquary.go +++ b/cl/antiquary/state_antiquary.go @@ -150,41 +150,42 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error { return next(k, k, v) } - effectiveBalance := etl.NewCollector(kv.ValidatorEffectiveBalance, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger) + etlBufSz := etl.BufferOptimalSize / 8 // 18 collectors * 256mb / 8 = 512mb in worst case + effectiveBalance := etl.NewCollector(kv.ValidatorEffectiveBalance, s.dirs.Tmp, etl.NewSortableBuffer(etlBufSz), s.logger) defer effectiveBalance.Close() - balances := etl.NewCollector(kv.ValidatorBalance, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger) + balances := etl.NewCollector(kv.ValidatorBalance, s.dirs.Tmp, etl.NewSortableBuffer(etlBufSz), s.logger) defer balances.Close() - randaoMixes := etl.NewCollector(kv.RandaoMixes, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger) + randaoMixes := etl.NewCollector(kv.RandaoMixes, s.dirs.Tmp, etl.NewSortableBuffer(etlBufSz), s.logger) defer randaoMixes.Close() - intraRandaoMixes := etl.NewCollector(kv.IntraRandaoMixes, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger) + intraRandaoMixes := etl.NewCollector(kv.IntraRandaoMixes, s.dirs.Tmp, etl.NewSortableBuffer(etlBufSz), s.logger) defer intraRandaoMixes.Close() - proposers := etl.NewCollector(kv.Proposers, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger) + proposers := etl.NewCollector(kv.Proposers, s.dirs.Tmp, etl.NewSortableBuffer(etlBufSz), s.logger) defer proposers.Close() - slashings := etl.NewCollector(kv.ValidatorSlashings, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger) + slashings := etl.NewCollector(kv.ValidatorSlashings, s.dirs.Tmp, etl.NewSortableBuffer(etlBufSz), s.logger) defer slashings.Close() - blockRoots := etl.NewCollector(kv.BlockRoot, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger) + blockRoots := etl.NewCollector(kv.BlockRoot, s.dirs.Tmp, etl.NewSortableBuffer(etlBufSz), s.logger) defer blockRoots.Close() - stateRoots := etl.NewCollector(kv.StateRoot, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger) + stateRoots := etl.NewCollector(kv.StateRoot, s.dirs.Tmp, etl.NewSortableBuffer(etlBufSz), s.logger) defer stateRoots.Close() - slotData := etl.NewCollector(kv.SlotData, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger) + slotData := etl.NewCollector(kv.SlotData, s.dirs.Tmp, etl.NewSortableBuffer(etlBufSz), s.logger) defer slotData.Close() - epochData := etl.NewCollector(kv.EpochData, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger) + epochData := etl.NewCollector(kv.EpochData, s.dirs.Tmp, etl.NewSortableBuffer(etlBufSz), s.logger) defer epochData.Close() - inactivityScoresC := etl.NewCollector(kv.InactivityScores, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger) + inactivityScoresC := etl.NewCollector(kv.InactivityScores, s.dirs.Tmp, etl.NewSortableBuffer(etlBufSz), s.logger) defer inactivityScoresC.Close() - nextSyncCommittee := etl.NewCollector(kv.NextSyncCommittee, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger) + nextSyncCommittee := etl.NewCollector(kv.NextSyncCommittee, s.dirs.Tmp, etl.NewSortableBuffer(etlBufSz), s.logger) defer nextSyncCommittee.Close() - currentSyncCommittee := etl.NewCollector(kv.CurrentSyncCommittee, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger) + currentSyncCommittee := etl.NewCollector(kv.CurrentSyncCommittee, s.dirs.Tmp, etl.NewSortableBuffer(etlBufSz), s.logger) defer currentSyncCommittee.Close() - currentEpochAttestations := etl.NewCollector(kv.CurrentEpochAttestations, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger) + currentEpochAttestations := etl.NewCollector(kv.CurrentEpochAttestations, s.dirs.Tmp, etl.NewSortableBuffer(etlBufSz), s.logger) defer currentEpochAttestations.Close() - previousEpochAttestations := etl.NewCollector(kv.PreviousEpochAttestations, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger) + previousEpochAttestations := etl.NewCollector(kv.PreviousEpochAttestations, s.dirs.Tmp, etl.NewSortableBuffer(etlBufSz), s.logger) defer previousEpochAttestations.Close() - eth1DataVotes := etl.NewCollector(kv.Eth1DataVotes, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger) + eth1DataVotes := etl.NewCollector(kv.Eth1DataVotes, s.dirs.Tmp, etl.NewSortableBuffer(etlBufSz), s.logger) defer eth1DataVotes.Close() - stateEvents := etl.NewCollector(kv.StateEvents, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger) + stateEvents := etl.NewCollector(kv.StateEvents, s.dirs.Tmp, etl.NewSortableBuffer(etlBufSz), s.logger) defer stateEvents.Close() - activeValidatorIndicies := etl.NewCollector(kv.ActiveValidatorIndicies, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger) + activeValidatorIndicies := etl.NewCollector(kv.ActiveValidatorIndicies, s.dirs.Tmp, etl.NewSortableBuffer(etlBufSz), s.logger) defer activeValidatorIndicies.Close() progress, err := state_accessors.GetStateProcessingProgress(tx)