snapshots: reduce merge limit of blocks to 100K (#8614)

Reason: 
- produce and seed snapshots earlier on chain tip. reduce depnedency on
"good peers with history" at p2p-network.
Some networks have no much archive peers, also ConsensusLayer clients
are not-good(not-incentivised) at serving history.
- avoiding having too much files:
more files(shards) - means "more metadata", "more lookups for
non-indexed queries", "more dictionaries", "more bittorrent
connections", ...
less files - means small files will be removed after merge (no peers for
this files).


ToDo:
[x] Recent 500K - merge up to 100K 
[x] Older than 500K - merge up to 500K 
[x] Start seeding 100k files
[x] Stop seeding 100k files after merge (right before delete)

In next PR: 
[] Old version of Erigon must be able download recent hashes. To achieve
it - at first start erigon will download preverified hashes .toml from
s3 - if it's newer that what we have (build-in) - use it.
This commit is contained in:
Alex Sharov 2023-11-01 23:22:35 +07:00 committed by GitHub
parent 35696afca1
commit 329d18ef6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 414 additions and 189 deletions

View File

@ -7,6 +7,7 @@ import (
"net"
"os"
"path/filepath"
"runtime"
"strings"
"time"
@ -177,6 +178,7 @@ func Downloader(ctx context.Context, logger log.Logger) error {
return err
}
cfg.ClientConfig.PieceHashersPerTorrent = 4 * runtime.NumCPU()
cfg.ClientConfig.DisableIPv6 = disableIPV6
cfg.ClientConfig.DisableIPv4 = disableIPV4

View File

@ -1551,7 +1551,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
}
notifications := &shards.Notifications{}
blockRetire := freezeblocks.NewBlockRetire(1, dirs, blockReader, blockWriter, freezeblocks.MergeSteps, db, notifications.Events, logger)
blockRetire := freezeblocks.NewBlockRetire(1, dirs, blockReader, blockWriter, db, notifications.Events, logger)
stages := stages2.NewDefaultStages(context.Background(), db, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil, heimdallClient, logger)
sync := stagedsync.New(stages, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger)

View File

@ -35,6 +35,9 @@ func NewDownloaderClient(server proto_downloader.DownloaderServer) *DownloaderCl
func (c *DownloaderClient) Download(ctx context.Context, in *proto_downloader.DownloadRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
return c.server.Download(ctx, in)
}
func (c *DownloaderClient) Delete(ctx context.Context, in *proto_downloader.DeleteRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
return c.server.Delete(ctx, in)
}
func (c *DownloaderClient) Verify(ctx context.Context, in *proto_downloader.VerifyRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
return c.server.Verify(ctx, in)
}

View File

@ -19,6 +19,8 @@ package downloader
import (
"context"
"fmt"
"os"
"path/filepath"
"time"
"github.com/anacrolix/torrent/metainfo"
@ -44,6 +46,7 @@ type GrpcServer struct {
// Download - create new .torrent ONLY if initialSync, everything else Erigon can generate by itself
func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.DownloadRequest) (*emptypb.Empty, error) {
defer s.d.ReCalcStats(10 * time.Second) // immediately call ReCalc to set stat.Complete flag
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
@ -70,7 +73,33 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow
return nil, err
}
}
s.d.ReCalcStats(10 * time.Second) // immediately call ReCalc to set stat.Complete flag
return &emptypb.Empty{}, nil
}
// Delete - stop seeding, remove file, remove .torrent
func (s *GrpcServer) Delete(ctx context.Context, request *proto_downloader.DeleteRequest) (*emptypb.Empty, error) {
defer s.d.ReCalcStats(10 * time.Second) // immediately call ReCalc to set stat.Complete flag
torrents := s.d.torrentClient.Torrents()
for _, name := range request.Paths {
if name == "" {
return nil, fmt.Errorf("field 'path' is required")
}
for _, t := range torrents {
select {
case <-t.GotInfo():
continue
default:
}
if t.Name() == name {
t.Drop()
break
}
}
fPath := filepath.Join(s.d.SnapDir(), name)
_ = os.Remove(fPath)
_ = os.Remove(fPath + ".torrent")
}
return &emptypb.Empty{}, nil
}

View File

@ -29,7 +29,6 @@ import (
lg "github.com/anacrolix/log"
"github.com/anacrolix/torrent"
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon-lib/common/cmp"
"github.com/ledgerwatch/erigon-lib/common/datadir"
"github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/log/v3"
@ -60,7 +59,10 @@ type Cfg struct {
func Default() *torrent.ClientConfig {
torrentConfig := torrent.NewDefaultClientConfig()
torrentConfig.PieceHashersPerTorrent = cmp.Max(1, runtime.NumCPU()-1)
// better don't increase because erigon periodically producing "new seedable files" - and adding them to downloader.
// it must not impact chain tip sync - so, limit resources to minimum by default.
// but when downloader is started as a separated process - rise it to max
//torrentConfig.PieceHashersPerTorrent = cmp.Max(1, runtime.NumCPU()-1)
torrentConfig.MinDialTimeout = 6 * time.Second //default: 3s
torrentConfig.HandshakesTimeout = 8 * time.Second //default: 4s

View File

@ -44,6 +44,8 @@ const (
BeaconBlocks
)
var BorSnapshotTypes = []Type{BorEvents, BorSpans}
func (ft Type) String() string {
switch ft {
case Headers:
@ -90,7 +92,7 @@ const (
func (it IdxType) String() string { return string(it) }
var AllSnapshotTypes = []Type{Headers, Bodies, Transactions}
var BlockSnapshotTypes = []Type{Headers, Bodies, Transactions}
var (
ErrInvalidFileName = fmt.Errorf("invalid compressed file name")
@ -175,8 +177,10 @@ type FileInfo struct {
}
func (f FileInfo) TorrentFileExists() bool { return dir.FileExist(f.Path + ".torrent") }
func (f FileInfo) Seedable() bool { return f.To-f.From == Erigon2MergeLimit }
func (f FileInfo) NeedTorrentFile() bool { return f.Seedable() && !f.TorrentFileExists() }
func (f FileInfo) Seedable() bool {
return f.To-f.From == Erigon2MergeLimit || f.To-f.From == Erigon2RecentMergeLimit
}
func (f FileInfo) NeedTorrentFile() bool { return f.Seedable() && !f.TorrentFileExists() }
func IdxFiles(dir string) (res []FileInfo, err error) { return FilesWithExt(dir, ".idx") }
func Segments(dir string) (res []FileInfo, err error) { return FilesWithExt(dir, ".seg") }

View File

@ -4,8 +4,8 @@ go 1.20
require (
github.com/erigontech/mdbx-go v0.35.2-0.20231101074031-9f999220e9ed
github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231019214918-3eb2303a41f3
github.com/ledgerwatch/interfaces v0.0.0-20231011121315-f58b806039f0
github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231101135659-d85154191b66
github.com/ledgerwatch/interfaces v0.0.0-20231031050643-c86352e41520
github.com/ledgerwatch/log/v3 v3.9.0
github.com/ledgerwatch/secp256k1 v1.0.0
)

View File

@ -291,10 +291,10 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231019214918-3eb2303a41f3 h1:59TKwBsS+Fn4iGh+PCxw7s73+/0WmDeK6bskDe3tFzY=
github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231019214918-3eb2303a41f3/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
github.com/ledgerwatch/interfaces v0.0.0-20231011121315-f58b806039f0 h1:7z6cyoCKP6qxtKSO74eAY6XiHWKaOi+melvPeMCXLl8=
github.com/ledgerwatch/interfaces v0.0.0-20231011121315-f58b806039f0/go.mod h1:ugQv1QllJzBny3cKZKxUrSnykkjkBgm27eQM6dnGAcc=
github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231101135659-d85154191b66 h1:0GY0mXeC6ThOw1x50tF1dDWso0kL6a5SS7k/zrsIF60=
github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231101135659-d85154191b66/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
github.com/ledgerwatch/interfaces v0.0.0-20231031050643-c86352e41520 h1:j/PRJWbPrbk8wpVjU77SWS8xJ/N+dcxPs1relNSolUs=
github.com/ledgerwatch/interfaces v0.0.0-20231031050643-c86352e41520/go.mod h1:ugQv1QllJzBny3cKZKxUrSnykkjkBgm27eQM6dnGAcc=
github.com/ledgerwatch/log/v3 v3.9.0 h1:iDwrXe0PVwBC68Dd94YSsHbMgQ3ufsgjzXtFNFVZFRk=
github.com/ledgerwatch/log/v3 v3.9.0/go.mod h1:EiAY6upmI/6LkNhOVxb4eVsmsP11HZCnZ3PlJMjYiqE=
github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ=

View File

@ -127,6 +127,54 @@ func (x *DownloadRequest) GetItems() []*DownloadItem {
return nil
}
// DeleteRequest: stop seeding, delete file, delete .torrent
type DeleteRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Paths []string `protobuf:"bytes,1,rep,name=paths,proto3" json:"paths,omitempty"`
}
func (x *DeleteRequest) Reset() {
*x = DeleteRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_downloader_downloader_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *DeleteRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*DeleteRequest) ProtoMessage() {}
func (x *DeleteRequest) ProtoReflect() protoreflect.Message {
mi := &file_downloader_downloader_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use DeleteRequest.ProtoReflect.Descriptor instead.
func (*DeleteRequest) Descriptor() ([]byte, []int) {
return file_downloader_downloader_proto_rawDescGZIP(), []int{2}
}
func (x *DeleteRequest) GetPaths() []string {
if x != nil {
return x.Paths
}
return nil
}
type VerifyRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@ -136,7 +184,7 @@ type VerifyRequest struct {
func (x *VerifyRequest) Reset() {
*x = VerifyRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_downloader_downloader_proto_msgTypes[2]
mi := &file_downloader_downloader_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -149,7 +197,7 @@ func (x *VerifyRequest) String() string {
func (*VerifyRequest) ProtoMessage() {}
func (x *VerifyRequest) ProtoReflect() protoreflect.Message {
mi := &file_downloader_downloader_proto_msgTypes[2]
mi := &file_downloader_downloader_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -162,7 +210,7 @@ func (x *VerifyRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use VerifyRequest.ProtoReflect.Descriptor instead.
func (*VerifyRequest) Descriptor() ([]byte, []int) {
return file_downloader_downloader_proto_rawDescGZIP(), []int{2}
return file_downloader_downloader_proto_rawDescGZIP(), []int{3}
}
type StatsRequest struct {
@ -174,7 +222,7 @@ type StatsRequest struct {
func (x *StatsRequest) Reset() {
*x = StatsRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_downloader_downloader_proto_msgTypes[3]
mi := &file_downloader_downloader_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -187,7 +235,7 @@ func (x *StatsRequest) String() string {
func (*StatsRequest) ProtoMessage() {}
func (x *StatsRequest) ProtoReflect() protoreflect.Message {
mi := &file_downloader_downloader_proto_msgTypes[3]
mi := &file_downloader_downloader_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -200,7 +248,7 @@ func (x *StatsRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use StatsRequest.ProtoReflect.Descriptor instead.
func (*StatsRequest) Descriptor() ([]byte, []int) {
return file_downloader_downloader_proto_rawDescGZIP(), []int{3}
return file_downloader_downloader_proto_rawDescGZIP(), []int{4}
}
type StatsReply struct {
@ -228,7 +276,7 @@ type StatsReply struct {
func (x *StatsReply) Reset() {
*x = StatsReply{}
if protoimpl.UnsafeEnabled {
mi := &file_downloader_downloader_proto_msgTypes[4]
mi := &file_downloader_downloader_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -241,7 +289,7 @@ func (x *StatsReply) String() string {
func (*StatsReply) ProtoMessage() {}
func (x *StatsReply) ProtoReflect() protoreflect.Message {
mi := &file_downloader_downloader_proto_msgTypes[4]
mi := &file_downloader_downloader_proto_msgTypes[5]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -254,7 +302,7 @@ func (x *StatsReply) ProtoReflect() protoreflect.Message {
// Deprecated: Use StatsReply.ProtoReflect.Descriptor instead.
func (*StatsReply) Descriptor() ([]byte, []int) {
return file_downloader_downloader_proto_rawDescGZIP(), []int{4}
return file_downloader_downloader_proto_rawDescGZIP(), []int{5}
}
func (x *StatsReply) GetMetadataReady() int32 {
@ -345,47 +393,53 @@ var file_downloader_downloader_proto_rawDesc = []byte{
0x12, 0x2e, 0x0a, 0x05, 0x69, 0x74, 0x65, 0x6d, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32,
0x18, 0x2e, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65, 0x72, 0x2e, 0x44, 0x6f, 0x77,
0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x74, 0x65, 0x6d, 0x52, 0x05, 0x69, 0x74, 0x65, 0x6d, 0x73,
0x22, 0x0f, 0x0a, 0x0d, 0x56, 0x65, 0x72, 0x69, 0x66, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x22, 0x0e, 0x0a, 0x0c, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x22, 0xee, 0x02, 0x0a, 0x0a, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79,
0x12, 0x25, 0x0a, 0x0e, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x72, 0x65, 0x61,
0x64, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0d, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61,
0x74, 0x61, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x1f, 0x0a, 0x0b, 0x66, 0x69, 0x6c, 0x65, 0x73,
0x5f, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x66, 0x69,
0x6c, 0x65, 0x73, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x65, 0x65, 0x72,
0x73, 0x5f, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b,
0x70, 0x65, 0x65, 0x72, 0x73, 0x55, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x12, 0x2b, 0x0a, 0x11, 0x63,
0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x5f, 0x74, 0x6f, 0x74, 0x61, 0x6c,
0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x10, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69,
0x6f, 0x6e, 0x73, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x12, 0x1c, 0x0a, 0x09, 0x63, 0x6f, 0x6d, 0x70,
0x6c, 0x65, 0x74, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x63, 0x6f, 0x6d,
0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x67, 0x72, 0x65,
0x73, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x02, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x67, 0x72, 0x65,
0x73, 0x73, 0x12, 0x27, 0x0a, 0x0f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x5f, 0x63, 0x6f, 0x6d, 0x70,
0x6c, 0x65, 0x74, 0x65, 0x64, 0x18, 0x08, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0e, 0x62, 0x79, 0x74,
0x65, 0x73, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x12, 0x1f, 0x0a, 0x0b, 0x62,
0x79, 0x74, 0x65, 0x73, 0x5f, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x18, 0x09, 0x20, 0x01, 0x28, 0x04,
0x52, 0x0a, 0x62, 0x79, 0x74, 0x65, 0x73, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x12, 0x1f, 0x0a, 0x0b,
0x75, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x5f, 0x72, 0x61, 0x74, 0x65, 0x18, 0x0a, 0x20, 0x01, 0x28,
0x04, 0x52, 0x0a, 0x75, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x61, 0x74, 0x65, 0x12, 0x23, 0x0a,
0x0d, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x5f, 0x72, 0x61, 0x74, 0x65, 0x18, 0x0b,
0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x61,
0x74, 0x65, 0x32, 0xcb, 0x01, 0x0a, 0x0a, 0x44, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65,
0x72, 0x12, 0x41, 0x0a, 0x08, 0x44, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x1b, 0x2e,
0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65, 0x72, 0x2e, 0x44, 0x6f, 0x77, 0x6e, 0x6c,
0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f,
0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70,
0x74, 0x79, 0x22, 0x00, 0x12, 0x3d, 0x0a, 0x06, 0x56, 0x65, 0x72, 0x69, 0x66, 0x79, 0x12, 0x19,
0x2e, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65, 0x72, 0x2e, 0x56, 0x65, 0x72, 0x69,
0x66, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67,
0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74,
0x79, 0x22, 0x00, 0x12, 0x3b, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x18, 0x2e, 0x64,
0x22, 0x25, 0x0a, 0x0d, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x12, 0x14, 0x0a, 0x05, 0x70, 0x61, 0x74, 0x68, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09,
0x52, 0x05, 0x70, 0x61, 0x74, 0x68, 0x73, 0x22, 0x0f, 0x0a, 0x0d, 0x56, 0x65, 0x72, 0x69, 0x66,
0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x0e, 0x0a, 0x0c, 0x53, 0x74, 0x61, 0x74,
0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xee, 0x02, 0x0a, 0x0a, 0x53, 0x74, 0x61,
0x74, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x25, 0x0a, 0x0e, 0x6d, 0x65, 0x74, 0x61, 0x64,
0x61, 0x74, 0x61, 0x5f, 0x72, 0x65, 0x61, 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52,
0x0d, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x1f,
0x0a, 0x0b, 0x66, 0x69, 0x6c, 0x65, 0x73, 0x5f, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x18, 0x02, 0x20,
0x01, 0x28, 0x05, 0x52, 0x0a, 0x66, 0x69, 0x6c, 0x65, 0x73, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x12,
0x21, 0x0a, 0x0c, 0x70, 0x65, 0x65, 0x72, 0x73, 0x5f, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x18,
0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x70, 0x65, 0x65, 0x72, 0x73, 0x55, 0x6e, 0x69, 0x71,
0x75, 0x65, 0x12, 0x2b, 0x0a, 0x11, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e,
0x73, 0x5f, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x10, 0x63,
0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x12,
0x1c, 0x0a, 0x09, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01,
0x28, 0x08, 0x52, 0x09, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x12, 0x1a, 0x0a,
0x08, 0x70, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x02, 0x52,
0x08, 0x70, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x27, 0x0a, 0x0f, 0x62, 0x79, 0x74,
0x65, 0x73, 0x5f, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x18, 0x08, 0x20, 0x01,
0x28, 0x04, 0x52, 0x0e, 0x62, 0x79, 0x74, 0x65, 0x73, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74,
0x65, 0x64, 0x12, 0x1f, 0x0a, 0x0b, 0x62, 0x79, 0x74, 0x65, 0x73, 0x5f, 0x74, 0x6f, 0x74, 0x61,
0x6c, 0x18, 0x09, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x62, 0x79, 0x74, 0x65, 0x73, 0x54, 0x6f,
0x74, 0x61, 0x6c, 0x12, 0x1f, 0x0a, 0x0b, 0x75, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x5f, 0x72, 0x61,
0x74, 0x65, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x75, 0x70, 0x6c, 0x6f, 0x61, 0x64,
0x52, 0x61, 0x74, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64,
0x5f, 0x72, 0x61, 0x74, 0x65, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x64, 0x6f, 0x77,
0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x61, 0x74, 0x65, 0x32, 0x8a, 0x02, 0x0a, 0x0a, 0x44, 0x6f,
0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65, 0x72, 0x12, 0x41, 0x0a, 0x08, 0x44, 0x6f, 0x77, 0x6e,
0x6c, 0x6f, 0x61, 0x64, 0x12, 0x1b, 0x2e, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65,
0x72, 0x2e, 0x44, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3d, 0x0a, 0x06, 0x44,
0x65, 0x6c, 0x65, 0x74, 0x65, 0x12, 0x19, 0x2e, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64,
0x65, 0x72, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3d, 0x0a, 0x06, 0x56, 0x65,
0x72, 0x69, 0x66, 0x79, 0x12, 0x19, 0x2e, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65,
0x72, 0x2e, 0x56, 0x65, 0x72, 0x69, 0x66, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3b, 0x0a, 0x05, 0x53, 0x74, 0x61,
0x74, 0x73, 0x12, 0x18, 0x2e, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65, 0x72, 0x2e,
0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x64,
0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65, 0x72, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61,
0x64, 0x65, 0x72, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00,
0x42, 0x19, 0x5a, 0x17, 0x2e, 0x2f, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65, 0x72,
0x3b, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x33,
0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x19, 0x5a, 0x17, 0x2e, 0x2f, 0x64, 0x6f, 0x77, 0x6e,
0x6c, 0x6f, 0x61, 0x64, 0x65, 0x72, 0x3b, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65,
0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -400,27 +454,30 @@ func file_downloader_downloader_proto_rawDescGZIP() []byte {
return file_downloader_downloader_proto_rawDescData
}
var file_downloader_downloader_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
var file_downloader_downloader_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_downloader_downloader_proto_goTypes = []interface{}{
(*DownloadItem)(nil), // 0: downloader.DownloadItem
(*DownloadRequest)(nil), // 1: downloader.DownloadRequest
(*VerifyRequest)(nil), // 2: downloader.VerifyRequest
(*StatsRequest)(nil), // 3: downloader.StatsRequest
(*StatsReply)(nil), // 4: downloader.StatsReply
(*types.H160)(nil), // 5: types.H160
(*emptypb.Empty)(nil), // 6: google.protobuf.Empty
(*DeleteRequest)(nil), // 2: downloader.DeleteRequest
(*VerifyRequest)(nil), // 3: downloader.VerifyRequest
(*StatsRequest)(nil), // 4: downloader.StatsRequest
(*StatsReply)(nil), // 5: downloader.StatsReply
(*types.H160)(nil), // 6: types.H160
(*emptypb.Empty)(nil), // 7: google.protobuf.Empty
}
var file_downloader_downloader_proto_depIdxs = []int32{
5, // 0: downloader.DownloadItem.torrent_hash:type_name -> types.H160
6, // 0: downloader.DownloadItem.torrent_hash:type_name -> types.H160
0, // 1: downloader.DownloadRequest.items:type_name -> downloader.DownloadItem
1, // 2: downloader.Downloader.Download:input_type -> downloader.DownloadRequest
2, // 3: downloader.Downloader.Verify:input_type -> downloader.VerifyRequest
3, // 4: downloader.Downloader.Stats:input_type -> downloader.StatsRequest
6, // 5: downloader.Downloader.Download:output_type -> google.protobuf.Empty
6, // 6: downloader.Downloader.Verify:output_type -> google.protobuf.Empty
4, // 7: downloader.Downloader.Stats:output_type -> downloader.StatsReply
5, // [5:8] is the sub-list for method output_type
2, // [2:5] is the sub-list for method input_type
2, // 3: downloader.Downloader.Delete:input_type -> downloader.DeleteRequest
3, // 4: downloader.Downloader.Verify:input_type -> downloader.VerifyRequest
4, // 5: downloader.Downloader.Stats:input_type -> downloader.StatsRequest
7, // 6: downloader.Downloader.Download:output_type -> google.protobuf.Empty
7, // 7: downloader.Downloader.Delete:output_type -> google.protobuf.Empty
7, // 8: downloader.Downloader.Verify:output_type -> google.protobuf.Empty
5, // 9: downloader.Downloader.Stats:output_type -> downloader.StatsReply
6, // [6:10] is the sub-list for method output_type
2, // [2:6] is the sub-list for method input_type
2, // [2:2] is the sub-list for extension type_name
2, // [2:2] is the sub-list for extension extendee
0, // [0:2] is the sub-list for field type_name
@ -457,7 +514,7 @@ func file_downloader_downloader_proto_init() {
}
}
file_downloader_downloader_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*VerifyRequest); i {
switch v := v.(*DeleteRequest); i {
case 0:
return &v.state
case 1:
@ -469,7 +526,7 @@ func file_downloader_downloader_proto_init() {
}
}
file_downloader_downloader_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StatsRequest); i {
switch v := v.(*VerifyRequest); i {
case 0:
return &v.state
case 1:
@ -481,6 +538,18 @@ func file_downloader_downloader_proto_init() {
}
}
file_downloader_downloader_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StatsRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_downloader_downloader_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StatsReply); i {
case 0:
return &v.state
@ -499,7 +568,7 @@ func file_downloader_downloader_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_downloader_downloader_proto_rawDesc,
NumEnums: 0,
NumMessages: 5,
NumMessages: 6,
NumExtensions: 0,
NumServices: 1,
},

View File

@ -21,6 +21,7 @@ const _ = grpc.SupportPackageIsVersion7
const (
Downloader_Download_FullMethodName = "/downloader.Downloader/Download"
Downloader_Delete_FullMethodName = "/downloader.Downloader/Delete"
Downloader_Verify_FullMethodName = "/downloader.Downloader/Verify"
Downloader_Stats_FullMethodName = "/downloader.Downloader/Stats"
)
@ -30,6 +31,7 @@ const (
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type DownloaderClient interface {
Download(ctx context.Context, in *DownloadRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
Verify(ctx context.Context, in *VerifyRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
Stats(ctx context.Context, in *StatsRequest, opts ...grpc.CallOption) (*StatsReply, error)
}
@ -51,6 +53,15 @@ func (c *downloaderClient) Download(ctx context.Context, in *DownloadRequest, op
return out, nil
}
func (c *downloaderClient) Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, Downloader_Delete_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *downloaderClient) Verify(ctx context.Context, in *VerifyRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, Downloader_Verify_FullMethodName, in, out, opts...)
@ -74,6 +85,7 @@ func (c *downloaderClient) Stats(ctx context.Context, in *StatsRequest, opts ...
// for forward compatibility
type DownloaderServer interface {
Download(context.Context, *DownloadRequest) (*emptypb.Empty, error)
Delete(context.Context, *DeleteRequest) (*emptypb.Empty, error)
Verify(context.Context, *VerifyRequest) (*emptypb.Empty, error)
Stats(context.Context, *StatsRequest) (*StatsReply, error)
mustEmbedUnimplementedDownloaderServer()
@ -86,6 +98,9 @@ type UnimplementedDownloaderServer struct {
func (UnimplementedDownloaderServer) Download(context.Context, *DownloadRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method Download not implemented")
}
func (UnimplementedDownloaderServer) Delete(context.Context, *DeleteRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method Delete not implemented")
}
func (UnimplementedDownloaderServer) Verify(context.Context, *VerifyRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method Verify not implemented")
}
@ -123,6 +138,24 @@ func _Downloader_Download_Handler(srv interface{}, ctx context.Context, dec func
return interceptor(ctx, in, info, handler)
}
func _Downloader_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DeleteRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DownloaderServer).Delete(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Downloader_Delete_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DownloaderServer).Delete(ctx, req.(*DeleteRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Downloader_Verify_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(VerifyRequest)
if err := dec(in); err != nil {
@ -170,6 +203,10 @@ var Downloader_ServiceDesc = grpc.ServiceDesc{
MethodName: "Download",
Handler: _Downloader_Download_Handler,
},
{
MethodName: "Delete",
Handler: _Downloader_Delete_Handler,
},
{
MethodName: "Verify",
Handler: _Downloader_Verify_Handler,

View File

@ -621,7 +621,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
// intiialize engine backend
var engine *execution_client.ExecutionClientDirect
blockRetire := freezeblocks.NewBlockRetire(1, dirs, blockReader, blockWriter, freezeblocks.MergeSteps, backend.chainDB, backend.notifications.Events, logger)
blockRetire := freezeblocks.NewBlockRetire(1, dirs, blockReader, blockWriter, backend.chainDB, backend.notifications.Events, logger)
miningRPC = privateapi.NewMiningServer(ctx, backend, ethashApi, logger)

View File

@ -314,12 +314,16 @@ func SnapshotsPrune(s *PruneState, initialCycle bool, cfg SnapshotsCfg, ctx cont
}
cfg.blockRetire.RetireBlocksInBackground(ctx, s.ForwardProgress, cfg.chainConfig.Bor != nil, log.LvlInfo, func(downloadRequest []services.DownloadRequest) error {
if cfg.snapshotDownloader != nil && !reflect.ValueOf(cfg.snapshotDownloader).IsNil() {
if err := snapshotsync.RequestSnapshotsDownload(ctx, downloadRequest, cfg.snapshotDownloader); err != nil {
return err
}
if cfg.snapshotDownloader == nil || reflect.ValueOf(cfg.snapshotDownloader).IsNil() {
return nil
}
return nil
return snapshotsync.RequestSnapshotsDownload(ctx, downloadRequest, cfg.snapshotDownloader)
}, func(l []string) error {
if cfg.snapshotDownloader == nil || reflect.ValueOf(cfg.snapshotDownloader).IsNil() {
return nil
}
_, err := cfg.snapshotDownloader.Delete(ctx, &proto_downloader.DeleteRequest{Paths: l})
return err
})
//cfg.agg.BuildFilesInBackground()
}

2
go.mod
View File

@ -186,7 +186,7 @@ require (
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231027092055-45ee9d86a6cb // indirect
github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231101135659-d85154191b66 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.1.0 // indirect

4
go.sum
View File

@ -539,8 +539,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231027092055-45ee9d86a6cb h1:Y6eZ4D8rMrAdDoy2za2Lkf8qSHbNNSPGnSdl/B78G4Y=
github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231027092055-45ee9d86a6cb/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231101135659-d85154191b66 h1:0GY0mXeC6ThOw1x50tF1dDWso0kL6a5SS7k/zrsIF60=
github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231101135659-d85154191b66/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
github.com/ledgerwatch/log/v3 v3.9.0 h1:iDwrXe0PVwBC68Dd94YSsHbMgQ3ufsgjzXtFNFVZFRk=
github.com/ledgerwatch/log/v3 v3.9.0/go.mod h1:EiAY6upmI/6LkNhOVxb4eVsmsP11HZCnZ3PlJMjYiqE=
github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ=

View File

@ -404,7 +404,7 @@ func doRetireCommand(cliCtx *cli.Context) error {
blockReader := freezeblocks.NewBlockReader(blockSnapshots, borSnapshots)
blockWriter := blockio.NewBlockWriter(fromdb.HistV3(db))
br := freezeblocks.NewBlockRetire(estimate.CompressSnapshot.Workers(), dirs, blockReader, blockWriter, freezeblocks.MergeSteps, db, nil, logger)
br := freezeblocks.NewBlockRetire(estimate.CompressSnapshot.Workers(), dirs, blockReader, blockWriter, db, nil, logger)
agg, err := libstate.NewAggregatorV3(ctx, dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, db, logger)
if err != nil {
return err
@ -460,7 +460,7 @@ func doRetireCommand(cliCtx *cli.Context) error {
}
for i := from; i < to; i += every {
if err := br.RetireBlocks(ctx, i, i+every, log.LvlInfo, nil); err != nil {
if err := br.RetireBlocks(ctx, i, i+every, log.LvlInfo, nil, nil); err != nil {
panic(err)
}
if err := db.Update(ctx, func(tx kv.RwTx) error {

View File

@ -93,7 +93,7 @@ type BlockSnapshots interface {
// BlockRetire - freezing blocks: moving old data from DB to snapshot files
type BlockRetire interface {
PruneAncientBlocks(tx kv.RwTx, limit int, includeBor bool) error
RetireBlocksInBackground(ctx context.Context, maxBlockNumInDB uint64, includeBor bool, lvl log.Lvl, seedNewSnapshots func(downloadRequest []DownloadRequest) error)
RetireBlocksInBackground(ctx context.Context, maxBlockNumInDB uint64, includeBor bool, lvl log.Lvl, seedNewSnapshots func(downloadRequest []DownloadRequest) error, onDelete func(l []string) error)
HasNewFrozenFiles() bool
BuildMissedIndicesIfNeed(ctx context.Context, logPrefix string, notifier DBEventNotifier, cc *chain.Config) error
}

View File

@ -953,7 +953,7 @@ func BuildMissedIndices(logPrefix string, ctx context.Context, dirs datadir.Dirs
}
}()
for _, t := range snaptype.AllSnapshotTypes {
for _, t := range snaptype.BlockSnapshotTypes {
for index := range segments {
segment := segments[index]
if segment.T != t {
@ -1059,7 +1059,7 @@ MainLoop:
if f.From == f.To {
continue
}
for _, t := range snaptype.AllSnapshotTypes {
for _, t := range snaptype.BlockSnapshotTypes {
p := filepath.Join(dir, snaptype.SegmentFileName(f.From, f.To, t))
if !dir2.FileExist(p) {
continue MainLoop
@ -1202,12 +1202,10 @@ type BlockRetire struct {
blockReader services.FullBlockReader
blockWriter *blockio.BlockWriter
dirs datadir.Dirs
mergeSteps []uint64
}
func NewBlockRetire(workers int, dirs datadir.Dirs, blockReader services.FullBlockReader, blockWriter *blockio.BlockWriter, mergeSteps []uint64, db kv.RoDB, notifier services.DBEventNotifier, logger log.Logger) *BlockRetire {
return &BlockRetire{workers: workers, tmpDir: dirs.Tmp, dirs: dirs, blockReader: blockReader, blockWriter: blockWriter, mergeSteps: mergeSteps, db: db, notifier: notifier, logger: logger}
func NewBlockRetire(workers int, dirs datadir.Dirs, blockReader services.FullBlockReader, blockWriter *blockio.BlockWriter, db kv.RoDB, notifier services.DBEventNotifier, logger log.Logger) *BlockRetire {
return &BlockRetire{workers: workers, tmpDir: dirs.Tmp, dirs: dirs, blockReader: blockReader, blockWriter: blockWriter, db: db, notifier: notifier, logger: logger}
}
func (br *BlockRetire) snapshots() *RoSnapshots { return br.blockReader.Snapshots().(*RoSnapshots) }
@ -1268,7 +1266,7 @@ func CanDeleteTo(curBlockNum uint64, blocksInSnapshots uint64) (blockTo uint64)
return cmp.Min(hardLimit, blocksInSnapshots+1)
}
func (br *BlockRetire) RetireBlocks(ctx context.Context, blockFrom, blockTo uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error) error {
func (br *BlockRetire) RetireBlocks(ctx context.Context, blockFrom, blockTo uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error, onDelete func(l []string) error) error {
chainConfig := fromdb.ChainConfig(br.db)
notifier, logger, blockReader, tmpDir, db, workers := br.notifier, br.logger, br.blockReader, br.tmpDir, br.db, br.workers
logger.Log(lvl, "[snapshots] Retire Blocks", "range", fmt.Sprintf("%dk-%dk", blockFrom/1000, blockTo/1000))
@ -1286,34 +1284,31 @@ func (br *BlockRetire) RetireBlocks(ctx context.Context, blockFrom, blockTo uint
if notifier != nil && !reflect.ValueOf(notifier).IsNil() { // notify about new snapshots of any size
notifier.OnNewSnapshot()
}
merger := NewMerger(tmpDir, workers, lvl, br.mergeSteps, db, chainConfig, notifier, logger)
rangesToMerge := merger.FindMergeRanges(snapshots.Ranges())
merger := NewMerger(tmpDir, workers, lvl, db, chainConfig, logger)
rangesToMerge := merger.FindMergeRanges(snapshots.Ranges(), snapshots.BlocksAvailable())
if len(rangesToMerge) == 0 {
return nil
}
err := merger.Merge(ctx, snapshots, rangesToMerge, snapshots.Dir(), true /* doIndex */)
onMerge := func(r Range) error {
if notifier != nil && !reflect.ValueOf(notifier).IsNil() { // notify about new snapshots of any size
notifier.OnNewSnapshot()
}
if seedNewSnapshots != nil {
downloadRequest := []services.DownloadRequest{
services.NewDownloadRequest(&services.Range{From: r.from, To: r.to}, "", "", false /* Bor */),
}
if err := seedNewSnapshots(downloadRequest); err != nil {
return err
}
}
return nil
}
err := merger.Merge(ctx, snapshots, rangesToMerge, snapshots.Dir(), true /* doIndex */, onMerge, onDelete)
if err != nil {
return err
}
if err := snapshots.ReopenFolder(); err != nil {
return fmt.Errorf("reopen: %w", err)
}
snapshots.LogStat()
if notifier != nil && !reflect.ValueOf(notifier).IsNil() { // notify about new snapshots of any size
notifier.OnNewSnapshot()
}
downloadRequest := make([]services.DownloadRequest, 0, len(rangesToMerge))
for i := range rangesToMerge {
r := &services.Range{From: rangesToMerge[i].from, To: rangesToMerge[i].to}
downloadRequest = append(downloadRequest, services.NewDownloadRequest(r, "", "", false /* Bor */))
}
if seedNewSnapshots != nil {
if err := seedNewSnapshots(downloadRequest); err != nil {
return err
}
}
return nil
}
@ -1338,7 +1333,7 @@ func (br *BlockRetire) PruneAncientBlocks(tx kv.RwTx, limit int, includeBor bool
return nil
}
func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, forwardProgress uint64, includeBor bool, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error) {
func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, forwardProgress uint64, includeBor bool, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error, onDeleteSnapshots func(l []string) error) {
ok := br.working.CompareAndSwap(false, true)
if !ok {
// go-routine is still working
@ -1349,7 +1344,7 @@ func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, forwardProg
blockFrom, blockTo, ok := CanRetire(forwardProgress, br.blockReader.FrozenBlocks())
if ok {
if err := br.RetireBlocks(ctx, blockFrom, blockTo, lvl, seedNewSnapshots); err != nil {
if err := br.RetireBlocks(ctx, blockFrom, blockTo, lvl, seedNewSnapshots, onDeleteSnapshots); err != nil {
br.logger.Warn("[snapshots] retire blocks", "err", err, "fromBlock", blockFrom, "toBlock", blockTo)
}
}
@ -2172,32 +2167,43 @@ type Merger struct {
tmpDir string
chainConfig *chain.Config
chainDB kv.RoDB
notifier services.DBEventNotifier
logger log.Logger
mergeSteps []uint64
}
func NewMerger(tmpDir string, compressWorkers int, lvl log.Lvl, mergeSteps []uint64, chainDB kv.RoDB, chainConfig *chain.Config, notifier services.DBEventNotifier, logger log.Logger) *Merger {
return &Merger{tmpDir: tmpDir, compressWorkers: compressWorkers, lvl: lvl, mergeSteps: mergeSteps, chainDB: chainDB, chainConfig: chainConfig, notifier: notifier, logger: logger}
func NewMerger(tmpDir string, compressWorkers int, lvl log.Lvl, chainDB kv.RoDB, chainConfig *chain.Config, logger log.Logger) *Merger {
return &Merger{tmpDir: tmpDir, compressWorkers: compressWorkers, lvl: lvl, chainDB: chainDB, chainConfig: chainConfig, logger: logger}
}
type Range struct {
from, to uint64
}
func (r Range) From() uint64 { return r.from }
func (r Range) To() uint64 { return r.to }
func (r Range) From() uint64 { return r.from }
func (r Range) To() uint64 { return r.to }
func (r Range) IsRecent(max uint64) bool { return max-r.to < snaptype.Erigon2MergeLimit }
type Ranges []Range
func (r Ranges) String() string {
return fmt.Sprintf("%d", r)
}
var MergeSteps = []uint64{500_000, 100_000, 10_000}
var RecentMergeSteps = []uint64{100_000, 10_000}
func (m *Merger) FindMergeRanges(currentRanges []Range) (toMerge []Range) {
func (m *Merger) FindMergeRanges(currentRanges []Range, maxBlockNum uint64) (toMerge []Range) {
for i := len(currentRanges) - 1; i > 0; i-- {
r := currentRanges[i]
if r.to-r.from >= snaptype.Erigon2MergeLimit { // is complete .seg
continue
isRecent := r.IsRecent(maxBlockNum)
mergeLimit, mergeSteps := uint64(snaptype.Erigon2MergeLimit), MergeSteps
if isRecent {
mergeLimit, mergeSteps = snaptype.Erigon2RecentMergeLimit, RecentMergeSteps
}
for _, span := range m.mergeSteps {
if r.to-r.from >= mergeLimit {
continue
}
for _, span := range mergeSteps {
if r.to%span != 0 {
continue
}
@ -2294,7 +2300,7 @@ func (m *Merger) filesByRange(snapshots *RoSnapshots, from, to uint64) (map[snap
}
// Merge does merge segments in given ranges
func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges []Range, snapDir string, doIndex bool) error {
func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges []Range, snapDir string, doIndex bool, onMerge func(r Range) error, onDelete func(l []string) error) error {
if len(mergeRanges) == 0 {
return nil
}
@ -2306,7 +2312,7 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges
return err
}
for _, t := range snaptype.AllSnapshotTypes {
for _, t := range snaptype.BlockSnapshotTypes {
segName := snaptype.SegmentFileName(r.from, r.to, t)
f, ok := snaptype.ParseFileName(snapDir, segName)
if !ok {
@ -2326,11 +2332,20 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges
return fmt.Errorf("ReopenSegments: %w", err)
}
snapshots.LogStat()
if m.notifier != nil { // notify about new snapshots of any size
m.notifier.OnNewSnapshot()
time.Sleep(1 * time.Second) // i working on blocking API - to ensure client does not use old snapsthos - and then delete them
if err := onMerge(r); err != nil {
return err
}
for _, t := range snaptype.AllSnapshotTypes {
for _, t := range snaptype.BlockSnapshotTypes {
if len(toMerge[t]) == 0 {
continue
}
if err := onDelete(toMerge[t]); err != nil {
return err
}
}
time.Sleep(1 * time.Second) // i working on blocking API - to ensure client does not use old snapsthos - and then delete them
for _, t := range snaptype.BlockSnapshotTypes {
m.removeOldFiles(toMerge[t], snapDir)
}
}
@ -2358,6 +2373,9 @@ func (m *Merger) merge(ctx context.Context, toMerge []string, targetFile string,
}
defer f.Close()
_, fName := filepath.Split(targetFile)
m.logger.Debug("[snapshots] merge", "file", fName)
for _, d := range cList {
if err := d.WithReadAhead(func() error {
g := d.MakeGetter()

View File

@ -58,19 +58,55 @@ func createTestSegmentFile(t *testing.T, from, to uint64, name snaptype.Type, di
}
func TestFindMergeRange(t *testing.T) {
merger := NewMerger("x", 1, log.LvlInfo, nil, params.MainnetChainConfig, nil)
t.Run("big", func(t *testing.T) {
merger := NewMerger("x", 1, log.LvlInfo, MergeSteps, nil, params.MainnetChainConfig, nil, nil)
var ranges []Range
for i := 0; i < 24; i++ {
ranges = append(ranges, Range{from: uint64(i * 100_000), to: uint64((i + 1) * 100_000)})
}
found := merger.FindMergeRanges(ranges)
found := merger.FindMergeRanges(ranges, uint64(24*100_000))
var expect []Range
for i := 0; i < 4; i++ {
expect = append(expect, Range{from: uint64(i * snaptype.Erigon2MergeLimit), to: uint64((i + 1) * snaptype.Erigon2MergeLimit)})
expect := []Range{
{0, 500_000},
{500_000, 1_000_000},
{1_000_000, 1_500_000},
}
require.Equal(t, expect, found)
require.Equal(t, Ranges(expect).String(), Ranges(found).String())
})
t.Run("small", func(t *testing.T) {
var ranges Ranges
for i := 0; i < 240; i++ {
ranges = append(ranges, Range{from: uint64(i * 10_000), to: uint64((i + 1) * 10_000)})
}
found := merger.FindMergeRanges(ranges, uint64(240*10_000))
expect := Ranges{
{0, 500_000},
{500_000, 1_000_000},
{1_000_000, 1_500_000},
{1_500_000, 1_600_000},
{1_600_000, 1_700_000},
{1_700_000, 1_800_000},
{1_800_000, 1_900_000},
{1_900_000, 2_000_000},
{2_000_000, 2_100_000},
{2_100_000, 2_200_000},
{2_200_000, 2_300_000},
{2_300_000, 2_400_000},
}
require.Equal(t, expect.String(), Ranges(found).String())
})
t.Run("IsRecent", func(t *testing.T) {
require.True(t, Range{500_000, 599_000}.IsRecent(1_000_000))
require.True(t, Range{500_000, 501_000}.IsRecent(1_000_000))
require.False(t, Range{499_000, 500_000}.IsRecent(1_000_000))
require.False(t, Range{400_000, 500_000}.IsRecent(1_000_000))
require.False(t, Range{400_000, 401_000}.IsRecent(1_000_000))
require.False(t, Range{500_000, 501_000}.IsRecent(1_100_000))
})
}
@ -79,12 +115,12 @@ func TestMergeSnapshots(t *testing.T) {
logger := log.New()
dir, require := t.TempDir(), require.New(t)
createFile := func(from, to uint64) {
for _, snT := range snaptype.AllSnapshotTypes {
for _, snT := range snaptype.BlockSnapshotTypes {
createTestSegmentFile(t, from, to, snT, dir, logger)
}
}
N := uint64(7)
N := uint64(17)
createFile(0, snaptype.Erigon2MergeLimit)
for i := uint64(snaptype.Erigon2MergeLimit); i < snaptype.Erigon2MergeLimit+N*100_000; i += 100_000 {
createFile(i, i+100_000)
@ -93,10 +129,14 @@ func TestMergeSnapshots(t *testing.T) {
defer s.Close()
require.NoError(s.ReopenFolder())
{
merger := NewMerger(dir, 1, log.LvlInfo, MergeSteps, nil, params.MainnetChainConfig, nil, logger)
ranges := merger.FindMergeRanges(s.Ranges())
merger := NewMerger(dir, 1, log.LvlInfo, nil, params.MainnetChainConfig, logger)
ranges := merger.FindMergeRanges(s.Ranges(), s.SegmentsMax())
require.True(len(ranges) > 0)
err := merger.Merge(context.Background(), s, ranges, s.Dir(), false)
err := merger.Merge(context.Background(), s, ranges, s.Dir(), false, func(r Range) error {
return nil
}, func(l []string) error {
return nil
})
require.NoError(err)
}
@ -108,14 +148,18 @@ func TestMergeSnapshots(t *testing.T) {
require.Equal(5, a)
{
merger := NewMerger(dir, 1, log.LvlInfo, MergeSteps, nil, params.MainnetChainConfig, nil, logger)
ranges := merger.FindMergeRanges(s.Ranges())
merger := NewMerger(dir, 1, log.LvlInfo, nil, params.MainnetChainConfig, logger)
ranges := merger.FindMergeRanges(s.Ranges(), s.SegmentsMax())
require.True(len(ranges) == 0)
err := merger.Merge(context.Background(), s, ranges, s.Dir(), false)
err := merger.Merge(context.Background(), s, ranges, s.Dir(), false, func(r Range) error {
return nil
}, func(l []string) error {
return nil
})
require.NoError(err)
}
expectedFileName = snaptype.SegmentFileName(1_100_000, 1_200_000, snaptype.Transactions)
expectedFileName = snaptype.SegmentFileName(1_800_000, 1_900_000, snaptype.Transactions)
d, err = compress.NewDecompressor(filepath.Join(dir, expectedFileName))
require.NoError(err)
defer d.Close()

View File

@ -191,33 +191,34 @@ func (br *BlockRetire) RetireBorBlocks(ctx context.Context, blockFrom, blockTo u
if notifier != nil && !reflect.ValueOf(notifier).IsNil() { // notify about new snapshots of any size
notifier.OnNewSnapshot()
}
merger := NewBorMerger(tmpDir, workers, lvl, br.mergeSteps, db, chainConfig, notifier, logger)
rangesToMerge := merger.FindMergeRanges(snapshots.Ranges())
merger := NewBorMerger(tmpDir, workers, lvl, db, chainConfig, notifier, logger)
rangesToMerge := merger.FindMergeRanges(snapshots.Ranges(), snapshots.BlocksAvailable())
if len(rangesToMerge) == 0 {
return nil
}
err := merger.Merge(ctx, snapshots, rangesToMerge, snapshots.Dir(), true /* doIndex */)
onMerge := func(r Range) error {
if notifier != nil && !reflect.ValueOf(notifier).IsNil() { // notify about new snapshots of any size
notifier.OnNewSnapshot()
}
if seedNewSnapshots != nil {
downloadRequest := []services.DownloadRequest{
services.NewDownloadRequest(&services.Range{From: r.from, To: r.to}, "", "", true /* Bor */),
}
if err := seedNewSnapshots(downloadRequest); err != nil {
return err
}
}
return nil
}
onDelete := func(files []string) error {
//TODO: add Downloader API to delete files
return nil
}
err := merger.Merge(ctx, snapshots, rangesToMerge, snapshots.Dir(), true /* doIndex */, onMerge, onDelete)
if err != nil {
return err
}
if err := snapshots.ReopenFolder(); err != nil {
return fmt.Errorf("reopen: %w", err)
}
snapshots.LogStat()
if notifier != nil && !reflect.ValueOf(notifier).IsNil() { // notify about new snapshots of any size
notifier.OnNewSnapshot()
}
downloadRequest := make([]services.DownloadRequest, 0, len(rangesToMerge))
for i := range rangesToMerge {
r := &services.Range{From: rangesToMerge[i].from, To: rangesToMerge[i].to}
downloadRequest = append(downloadRequest, services.NewDownloadRequest(r, "", "", true /* Bor */))
}
if seedNewSnapshots != nil {
if err := seedNewSnapshots(downloadRequest); err != nil {
return err
}
}
return nil
}
@ -1059,21 +1060,25 @@ type BorMerger struct {
chainDB kv.RoDB
notifier services.DBEventNotifier
logger log.Logger
mergeSteps []uint64
}
func NewBorMerger(tmpDir string, compressWorkers int, lvl log.Lvl, mergeSteps []uint64, chainDB kv.RoDB, chainConfig *chain.Config, notifier services.DBEventNotifier, logger log.Logger) *BorMerger {
return &BorMerger{tmpDir: tmpDir, compressWorkers: compressWorkers, lvl: lvl, mergeSteps: mergeSteps, chainDB: chainDB, chainConfig: chainConfig, notifier: notifier, logger: logger}
func NewBorMerger(tmpDir string, compressWorkers int, lvl log.Lvl, chainDB kv.RoDB, chainConfig *chain.Config, notifier services.DBEventNotifier, logger log.Logger) *BorMerger {
return &BorMerger{tmpDir: tmpDir, compressWorkers: compressWorkers, lvl: lvl, chainDB: chainDB, chainConfig: chainConfig, notifier: notifier, logger: logger}
}
func (m *BorMerger) FindMergeRanges(currentRanges []Range) (toMerge []Range) {
func (m *BorMerger) FindMergeRanges(currentRanges []Range, maxBlockNum uint64) (toMerge []Range) {
for i := len(currentRanges) - 1; i > 0; i-- {
r := currentRanges[i]
if r.to-r.from >= snaptype.Erigon2MergeLimit { // is complete .seg
continue
isRecent := r.IsRecent(maxBlockNum)
mergeLimit, mergeSteps := uint64(snaptype.Erigon2RecentMergeLimit), MergeSteps
if isRecent {
mergeLimit, mergeSteps = snaptype.Erigon2MergeLimit, RecentMergeSteps
}
for _, span := range m.mergeSteps {
if r.to-r.from >= mergeLimit {
continue
}
for _, span := range mergeSteps {
if r.to%span != 0 {
continue
}
@ -1115,7 +1120,7 @@ func (m *BorMerger) filesByRange(snapshots *BorRoSnapshots, from, to uint64) (ma
}
// Merge does merge segments in given ranges
func (m *BorMerger) Merge(ctx context.Context, snapshots *BorRoSnapshots, mergeRanges []Range, snapDir string, doIndex bool) error {
func (m *BorMerger) Merge(ctx context.Context, snapshots *BorRoSnapshots, mergeRanges []Range, snapDir string, doIndex bool, onMerge func(r Range) error, onDelete func(l []string) error) error {
if len(mergeRanges) == 0 {
return nil
}
@ -1147,11 +1152,19 @@ func (m *BorMerger) Merge(ctx context.Context, snapshots *BorRoSnapshots, mergeR
return fmt.Errorf("ReopenSegments: %w", err)
}
snapshots.LogStat()
if m.notifier != nil { // notify about new snapshots of any size
m.notifier.OnNewSnapshot()
time.Sleep(1 * time.Second) // i working on blocking API - to ensure client does not use old snapsthos - and then delete them
if err := onMerge(r); err != nil {
return err
}
for _, t := range []snaptype.Type{snaptype.BorEvents} {
for _, t := range snaptype.BlockSnapshotTypes {
if len(toMerge[t]) == 0 {
continue
}
if err := onDelete(toMerge[t]); err != nil {
return err
}
}
time.Sleep(1 * time.Second) // i working on blocking API - to ensure client does not use old snapsthos - and then delete them
for _, t := range snaptype.BlockSnapshotTypes {
m.removeOldFiles(toMerge[t], snapDir)
}
}

View File

@ -35,7 +35,7 @@ const (
)
func BuildProtoRequest(downloadRequest []services.DownloadRequest) *proto_downloader.DownloadRequest {
req := &proto_downloader.DownloadRequest{Items: make([]*proto_downloader.DownloadItem, 0, len(snaptype.AllSnapshotTypes))}
req := &proto_downloader.DownloadRequest{Items: make([]*proto_downloader.DownloadItem, 0, len(snaptype.BlockSnapshotTypes))}
for _, r := range downloadRequest {
if r.Path != "" {
if r.TorrentHash != "" {
@ -50,13 +50,13 @@ func BuildProtoRequest(downloadRequest []services.DownloadRequest) *proto_downlo
}
} else {
if r.Bor {
for _, t := range []snaptype.Type{snaptype.BorEvents, snaptype.BorSpans} {
for _, t := range snaptype.BorSnapshotTypes {
req.Items = append(req.Items, &proto_downloader.DownloadItem{
Path: snaptype.SegmentFileName(r.Ranges.From, r.Ranges.To, t),
})
}
} else {
for _, t := range snaptype.AllSnapshotTypes {
for _, t := range snaptype.BlockSnapshotTypes {
req.Items = append(req.Items, &proto_downloader.DownloadItem{
Path: snaptype.SegmentFileName(r.Ranges.From, r.Ranges.To, t),
})

View File

@ -417,7 +417,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
return block, nil
}
blockRetire := freezeblocks.NewBlockRetire(1, dirs, mock.BlockReader, blockWriter, freezeblocks.MergeSteps, mock.DB, mock.Notifications.Events, logger)
blockRetire := freezeblocks.NewBlockRetire(1, dirs, mock.BlockReader, blockWriter, mock.DB, mock.Notifications.Events, logger)
mock.Sync = stagedsync.New(
stagedsync.DefaultStages(mock.Ctx,
stagedsync.StageSnapshotsCfg(mock.DB, *mock.ChainConfig, dirs, blockRetire, snapshotsDownloader, mock.BlockReader, mock.Notifications.Events, mock.HistoryV3, mock.agg, nil),