diff --git a/cmd/downloader/main.go b/cmd/downloader/main.go index 7ae1bcdf6..4d702b102 100644 --- a/cmd/downloader/main.go +++ b/cmd/downloader/main.go @@ -7,6 +7,7 @@ import ( "net" "os" "path/filepath" + "runtime" "strings" "time" @@ -177,6 +178,7 @@ func Downloader(ctx context.Context, logger log.Logger) error { return err } + cfg.ClientConfig.PieceHashersPerTorrent = 4 * runtime.NumCPU() cfg.ClientConfig.DisableIPv6 = disableIPV6 cfg.ClientConfig.DisableIPv4 = disableIPV4 diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 9849a7bad..8896ae7b2 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -1551,7 +1551,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig, } notifications := &shards.Notifications{} - blockRetire := freezeblocks.NewBlockRetire(1, dirs, blockReader, blockWriter, freezeblocks.MergeSteps, db, notifications.Events, logger) + blockRetire := freezeblocks.NewBlockRetire(1, dirs, blockReader, blockWriter, db, notifications.Events, logger) stages := stages2.NewDefaultStages(context.Background(), db, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil, heimdallClient, logger) sync := stagedsync.New(stages, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger) diff --git a/erigon-lib/direct/downloader_client.go b/erigon-lib/direct/downloader_client.go index a6924a1eb..abb85adc8 100644 --- a/erigon-lib/direct/downloader_client.go +++ b/erigon-lib/direct/downloader_client.go @@ -35,6 +35,9 @@ func NewDownloaderClient(server proto_downloader.DownloaderServer) *DownloaderCl func (c *DownloaderClient) Download(ctx context.Context, in *proto_downloader.DownloadRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { return c.server.Download(ctx, in) } +func (c *DownloaderClient) Delete(ctx context.Context, in *proto_downloader.DeleteRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + return c.server.Delete(ctx, in) +} func (c *DownloaderClient) Verify(ctx context.Context, in *proto_downloader.VerifyRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { return c.server.Verify(ctx, in) } diff --git a/erigon-lib/downloader/downloader_grpc_server.go b/erigon-lib/downloader/downloader_grpc_server.go index 8ac525128..61398d74b 100644 --- a/erigon-lib/downloader/downloader_grpc_server.go +++ b/erigon-lib/downloader/downloader_grpc_server.go @@ -19,6 +19,8 @@ package downloader import ( "context" "fmt" + "os" + "path/filepath" "time" "github.com/anacrolix/torrent/metainfo" @@ -44,6 +46,7 @@ type GrpcServer struct { // Download - create new .torrent ONLY if initialSync, everything else Erigon can generate by itself func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.DownloadRequest) (*emptypb.Empty, error) { + defer s.d.ReCalcStats(10 * time.Second) // immediately call ReCalc to set stat.Complete flag logEvery := time.NewTicker(20 * time.Second) defer logEvery.Stop() @@ -70,7 +73,33 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow return nil, err } } - s.d.ReCalcStats(10 * time.Second) // immediately call ReCalc to set stat.Complete flag + return &emptypb.Empty{}, nil +} + +// Delete - stop seeding, remove file, remove .torrent +func (s *GrpcServer) Delete(ctx context.Context, request *proto_downloader.DeleteRequest) (*emptypb.Empty, error) { + defer s.d.ReCalcStats(10 * time.Second) // immediately call ReCalc to set stat.Complete flag + torrents := s.d.torrentClient.Torrents() + for _, name := range request.Paths { + if name == "" { + return nil, fmt.Errorf("field 'path' is required") + } + for _, t := range torrents { + select { + case <-t.GotInfo(): + continue + default: + } + if t.Name() == name { + t.Drop() + break + } + } + + fPath := filepath.Join(s.d.SnapDir(), name) + _ = os.Remove(fPath) + _ = os.Remove(fPath + ".torrent") + } return &emptypb.Empty{}, nil } diff --git a/erigon-lib/downloader/downloadercfg/downloadercfg.go b/erigon-lib/downloader/downloadercfg/downloadercfg.go index dadc4d337..b9e1d0dc2 100644 --- a/erigon-lib/downloader/downloadercfg/downloadercfg.go +++ b/erigon-lib/downloader/downloadercfg/downloadercfg.go @@ -29,7 +29,6 @@ import ( lg "github.com/anacrolix/log" "github.com/anacrolix/torrent" "github.com/c2h5oh/datasize" - "github.com/ledgerwatch/erigon-lib/common/cmp" "github.com/ledgerwatch/erigon-lib/common/datadir" "github.com/ledgerwatch/erigon-lib/common/dir" "github.com/ledgerwatch/log/v3" @@ -60,7 +59,10 @@ type Cfg struct { func Default() *torrent.ClientConfig { torrentConfig := torrent.NewDefaultClientConfig() - torrentConfig.PieceHashersPerTorrent = cmp.Max(1, runtime.NumCPU()-1) + // better don't increase because erigon periodically producing "new seedable files" - and adding them to downloader. + // it must not impact chain tip sync - so, limit resources to minimum by default. + // but when downloader is started as a separated process - rise it to max + //torrentConfig.PieceHashersPerTorrent = cmp.Max(1, runtime.NumCPU()-1) torrentConfig.MinDialTimeout = 6 * time.Second //default: 3s torrentConfig.HandshakesTimeout = 8 * time.Second //default: 4s diff --git a/erigon-lib/downloader/snaptype/files.go b/erigon-lib/downloader/snaptype/files.go index 562fe159f..069707cfe 100644 --- a/erigon-lib/downloader/snaptype/files.go +++ b/erigon-lib/downloader/snaptype/files.go @@ -44,6 +44,8 @@ const ( BeaconBlocks ) +var BorSnapshotTypes = []Type{BorEvents, BorSpans} + func (ft Type) String() string { switch ft { case Headers: @@ -90,7 +92,7 @@ const ( func (it IdxType) String() string { return string(it) } -var AllSnapshotTypes = []Type{Headers, Bodies, Transactions} +var BlockSnapshotTypes = []Type{Headers, Bodies, Transactions} var ( ErrInvalidFileName = fmt.Errorf("invalid compressed file name") @@ -175,8 +177,10 @@ type FileInfo struct { } func (f FileInfo) TorrentFileExists() bool { return dir.FileExist(f.Path + ".torrent") } -func (f FileInfo) Seedable() bool { return f.To-f.From == Erigon2MergeLimit } -func (f FileInfo) NeedTorrentFile() bool { return f.Seedable() && !f.TorrentFileExists() } +func (f FileInfo) Seedable() bool { + return f.To-f.From == Erigon2MergeLimit || f.To-f.From == Erigon2RecentMergeLimit +} +func (f FileInfo) NeedTorrentFile() bool { return f.Seedable() && !f.TorrentFileExists() } func IdxFiles(dir string) (res []FileInfo, err error) { return FilesWithExt(dir, ".idx") } func Segments(dir string) (res []FileInfo, err error) { return FilesWithExt(dir, ".seg") } diff --git a/erigon-lib/go.mod b/erigon-lib/go.mod index 109b39233..0b664a647 100644 --- a/erigon-lib/go.mod +++ b/erigon-lib/go.mod @@ -4,8 +4,8 @@ go 1.20 require ( github.com/erigontech/mdbx-go v0.35.2-0.20231101074031-9f999220e9ed - github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231019214918-3eb2303a41f3 - github.com/ledgerwatch/interfaces v0.0.0-20231011121315-f58b806039f0 + github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231101135659-d85154191b66 + github.com/ledgerwatch/interfaces v0.0.0-20231031050643-c86352e41520 github.com/ledgerwatch/log/v3 v3.9.0 github.com/ledgerwatch/secp256k1 v1.0.0 ) diff --git a/erigon-lib/go.sum b/erigon-lib/go.sum index e99a7f6a4..b979ec8ea 100644 --- a/erigon-lib/go.sum +++ b/erigon-lib/go.sum @@ -291,10 +291,10 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= -github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231019214918-3eb2303a41f3 h1:59TKwBsS+Fn4iGh+PCxw7s73+/0WmDeK6bskDe3tFzY= -github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231019214918-3eb2303a41f3/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo= -github.com/ledgerwatch/interfaces v0.0.0-20231011121315-f58b806039f0 h1:7z6cyoCKP6qxtKSO74eAY6XiHWKaOi+melvPeMCXLl8= -github.com/ledgerwatch/interfaces v0.0.0-20231011121315-f58b806039f0/go.mod h1:ugQv1QllJzBny3cKZKxUrSnykkjkBgm27eQM6dnGAcc= +github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231101135659-d85154191b66 h1:0GY0mXeC6ThOw1x50tF1dDWso0kL6a5SS7k/zrsIF60= +github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231101135659-d85154191b66/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo= +github.com/ledgerwatch/interfaces v0.0.0-20231031050643-c86352e41520 h1:j/PRJWbPrbk8wpVjU77SWS8xJ/N+dcxPs1relNSolUs= +github.com/ledgerwatch/interfaces v0.0.0-20231031050643-c86352e41520/go.mod h1:ugQv1QllJzBny3cKZKxUrSnykkjkBgm27eQM6dnGAcc= github.com/ledgerwatch/log/v3 v3.9.0 h1:iDwrXe0PVwBC68Dd94YSsHbMgQ3ufsgjzXtFNFVZFRk= github.com/ledgerwatch/log/v3 v3.9.0/go.mod h1:EiAY6upmI/6LkNhOVxb4eVsmsP11HZCnZ3PlJMjYiqE= github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ= diff --git a/erigon-lib/gointerfaces/downloader/downloader.pb.go b/erigon-lib/gointerfaces/downloader/downloader.pb.go index 773282e31..e7dfe2f04 100644 --- a/erigon-lib/gointerfaces/downloader/downloader.pb.go +++ b/erigon-lib/gointerfaces/downloader/downloader.pb.go @@ -127,6 +127,54 @@ func (x *DownloadRequest) GetItems() []*DownloadItem { return nil } +// DeleteRequest: stop seeding, delete file, delete .torrent +type DeleteRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Paths []string `protobuf:"bytes,1,rep,name=paths,proto3" json:"paths,omitempty"` +} + +func (x *DeleteRequest) Reset() { + *x = DeleteRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_downloader_downloader_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DeleteRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DeleteRequest) ProtoMessage() {} + +func (x *DeleteRequest) ProtoReflect() protoreflect.Message { + mi := &file_downloader_downloader_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DeleteRequest.ProtoReflect.Descriptor instead. +func (*DeleteRequest) Descriptor() ([]byte, []int) { + return file_downloader_downloader_proto_rawDescGZIP(), []int{2} +} + +func (x *DeleteRequest) GetPaths() []string { + if x != nil { + return x.Paths + } + return nil +} + type VerifyRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -136,7 +184,7 @@ type VerifyRequest struct { func (x *VerifyRequest) Reset() { *x = VerifyRequest{} if protoimpl.UnsafeEnabled { - mi := &file_downloader_downloader_proto_msgTypes[2] + mi := &file_downloader_downloader_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -149,7 +197,7 @@ func (x *VerifyRequest) String() string { func (*VerifyRequest) ProtoMessage() {} func (x *VerifyRequest) ProtoReflect() protoreflect.Message { - mi := &file_downloader_downloader_proto_msgTypes[2] + mi := &file_downloader_downloader_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -162,7 +210,7 @@ func (x *VerifyRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use VerifyRequest.ProtoReflect.Descriptor instead. func (*VerifyRequest) Descriptor() ([]byte, []int) { - return file_downloader_downloader_proto_rawDescGZIP(), []int{2} + return file_downloader_downloader_proto_rawDescGZIP(), []int{3} } type StatsRequest struct { @@ -174,7 +222,7 @@ type StatsRequest struct { func (x *StatsRequest) Reset() { *x = StatsRequest{} if protoimpl.UnsafeEnabled { - mi := &file_downloader_downloader_proto_msgTypes[3] + mi := &file_downloader_downloader_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -187,7 +235,7 @@ func (x *StatsRequest) String() string { func (*StatsRequest) ProtoMessage() {} func (x *StatsRequest) ProtoReflect() protoreflect.Message { - mi := &file_downloader_downloader_proto_msgTypes[3] + mi := &file_downloader_downloader_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -200,7 +248,7 @@ func (x *StatsRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use StatsRequest.ProtoReflect.Descriptor instead. func (*StatsRequest) Descriptor() ([]byte, []int) { - return file_downloader_downloader_proto_rawDescGZIP(), []int{3} + return file_downloader_downloader_proto_rawDescGZIP(), []int{4} } type StatsReply struct { @@ -228,7 +276,7 @@ type StatsReply struct { func (x *StatsReply) Reset() { *x = StatsReply{} if protoimpl.UnsafeEnabled { - mi := &file_downloader_downloader_proto_msgTypes[4] + mi := &file_downloader_downloader_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -241,7 +289,7 @@ func (x *StatsReply) String() string { func (*StatsReply) ProtoMessage() {} func (x *StatsReply) ProtoReflect() protoreflect.Message { - mi := &file_downloader_downloader_proto_msgTypes[4] + mi := &file_downloader_downloader_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -254,7 +302,7 @@ func (x *StatsReply) ProtoReflect() protoreflect.Message { // Deprecated: Use StatsReply.ProtoReflect.Descriptor instead. func (*StatsReply) Descriptor() ([]byte, []int) { - return file_downloader_downloader_proto_rawDescGZIP(), []int{4} + return file_downloader_downloader_proto_rawDescGZIP(), []int{5} } func (x *StatsReply) GetMetadataReady() int32 { @@ -345,47 +393,53 @@ var file_downloader_downloader_proto_rawDesc = []byte{ 0x12, 0x2e, 0x0a, 0x05, 0x69, 0x74, 0x65, 0x6d, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65, 0x72, 0x2e, 0x44, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x74, 0x65, 0x6d, 0x52, 0x05, 0x69, 0x74, 0x65, 0x6d, 0x73, - 0x22, 0x0f, 0x0a, 0x0d, 0x56, 0x65, 0x72, 0x69, 0x66, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x22, 0x0e, 0x0a, 0x0c, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x22, 0xee, 0x02, 0x0a, 0x0a, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, - 0x12, 0x25, 0x0a, 0x0e, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x72, 0x65, 0x61, - 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0d, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, - 0x74, 0x61, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x1f, 0x0a, 0x0b, 0x66, 0x69, 0x6c, 0x65, 0x73, - 0x5f, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x66, 0x69, - 0x6c, 0x65, 0x73, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x65, 0x65, 0x72, - 0x73, 0x5f, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, - 0x70, 0x65, 0x65, 0x72, 0x73, 0x55, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x12, 0x2b, 0x0a, 0x11, 0x63, - 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x5f, 0x74, 0x6f, 0x74, 0x61, 0x6c, - 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x10, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x73, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x12, 0x1c, 0x0a, 0x09, 0x63, 0x6f, 0x6d, 0x70, - 0x6c, 0x65, 0x74, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x63, 0x6f, 0x6d, - 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x67, 0x72, 0x65, - 0x73, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x02, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x67, 0x72, 0x65, - 0x73, 0x73, 0x12, 0x27, 0x0a, 0x0f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x5f, 0x63, 0x6f, 0x6d, 0x70, - 0x6c, 0x65, 0x74, 0x65, 0x64, 0x18, 0x08, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0e, 0x62, 0x79, 0x74, - 0x65, 0x73, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x12, 0x1f, 0x0a, 0x0b, 0x62, - 0x79, 0x74, 0x65, 0x73, 0x5f, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x18, 0x09, 0x20, 0x01, 0x28, 0x04, - 0x52, 0x0a, 0x62, 0x79, 0x74, 0x65, 0x73, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x12, 0x1f, 0x0a, 0x0b, - 0x75, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x5f, 0x72, 0x61, 0x74, 0x65, 0x18, 0x0a, 0x20, 0x01, 0x28, - 0x04, 0x52, 0x0a, 0x75, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x61, 0x74, 0x65, 0x12, 0x23, 0x0a, - 0x0d, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x5f, 0x72, 0x61, 0x74, 0x65, 0x18, 0x0b, - 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x61, - 0x74, 0x65, 0x32, 0xcb, 0x01, 0x0a, 0x0a, 0x44, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65, - 0x72, 0x12, 0x41, 0x0a, 0x08, 0x44, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x1b, 0x2e, - 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65, 0x72, 0x2e, 0x44, 0x6f, 0x77, 0x6e, 0x6c, - 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, - 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, - 0x74, 0x79, 0x22, 0x00, 0x12, 0x3d, 0x0a, 0x06, 0x56, 0x65, 0x72, 0x69, 0x66, 0x79, 0x12, 0x19, - 0x2e, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65, 0x72, 0x2e, 0x56, 0x65, 0x72, 0x69, - 0x66, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, - 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, - 0x79, 0x22, 0x00, 0x12, 0x3b, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x18, 0x2e, 0x64, + 0x22, 0x25, 0x0a, 0x0d, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x14, 0x0a, 0x05, 0x70, 0x61, 0x74, 0x68, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, + 0x52, 0x05, 0x70, 0x61, 0x74, 0x68, 0x73, 0x22, 0x0f, 0x0a, 0x0d, 0x56, 0x65, 0x72, 0x69, 0x66, + 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x0e, 0x0a, 0x0c, 0x53, 0x74, 0x61, 0x74, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xee, 0x02, 0x0a, 0x0a, 0x53, 0x74, 0x61, + 0x74, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x25, 0x0a, 0x0e, 0x6d, 0x65, 0x74, 0x61, 0x64, + 0x61, 0x74, 0x61, 0x5f, 0x72, 0x65, 0x61, 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, + 0x0d, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x1f, + 0x0a, 0x0b, 0x66, 0x69, 0x6c, 0x65, 0x73, 0x5f, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x05, 0x52, 0x0a, 0x66, 0x69, 0x6c, 0x65, 0x73, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x12, + 0x21, 0x0a, 0x0c, 0x70, 0x65, 0x65, 0x72, 0x73, 0x5f, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x70, 0x65, 0x65, 0x72, 0x73, 0x55, 0x6e, 0x69, 0x71, + 0x75, 0x65, 0x12, 0x2b, 0x0a, 0x11, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, + 0x73, 0x5f, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x10, 0x63, + 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x12, + 0x1c, 0x0a, 0x09, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, + 0x28, 0x08, 0x52, 0x09, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x12, 0x1a, 0x0a, + 0x08, 0x70, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x02, 0x52, + 0x08, 0x70, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x27, 0x0a, 0x0f, 0x62, 0x79, 0x74, + 0x65, 0x73, 0x5f, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x18, 0x08, 0x20, 0x01, + 0x28, 0x04, 0x52, 0x0e, 0x62, 0x79, 0x74, 0x65, 0x73, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, + 0x65, 0x64, 0x12, 0x1f, 0x0a, 0x0b, 0x62, 0x79, 0x74, 0x65, 0x73, 0x5f, 0x74, 0x6f, 0x74, 0x61, + 0x6c, 0x18, 0x09, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x62, 0x79, 0x74, 0x65, 0x73, 0x54, 0x6f, + 0x74, 0x61, 0x6c, 0x12, 0x1f, 0x0a, 0x0b, 0x75, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x5f, 0x72, 0x61, + 0x74, 0x65, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x75, 0x70, 0x6c, 0x6f, 0x61, 0x64, + 0x52, 0x61, 0x74, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, + 0x5f, 0x72, 0x61, 0x74, 0x65, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x64, 0x6f, 0x77, + 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x61, 0x74, 0x65, 0x32, 0x8a, 0x02, 0x0a, 0x0a, 0x44, 0x6f, + 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65, 0x72, 0x12, 0x41, 0x0a, 0x08, 0x44, 0x6f, 0x77, 0x6e, + 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x1b, 0x2e, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65, + 0x72, 0x2e, 0x44, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3d, 0x0a, 0x06, 0x44, + 0x65, 0x6c, 0x65, 0x74, 0x65, 0x12, 0x19, 0x2e, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, + 0x65, 0x72, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3d, 0x0a, 0x06, 0x56, 0x65, + 0x72, 0x69, 0x66, 0x79, 0x12, 0x19, 0x2e, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65, + 0x72, 0x2e, 0x56, 0x65, 0x72, 0x69, 0x66, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3b, 0x0a, 0x05, 0x53, 0x74, 0x61, + 0x74, 0x73, 0x12, 0x18, 0x2e, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65, 0x72, 0x2e, + 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65, 0x72, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, - 0x64, 0x65, 0x72, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, - 0x42, 0x19, 0x5a, 0x17, 0x2e, 0x2f, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65, 0x72, - 0x3b, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, + 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x19, 0x5a, 0x17, 0x2e, 0x2f, 0x64, 0x6f, 0x77, 0x6e, + 0x6c, 0x6f, 0x61, 0x64, 0x65, 0x72, 0x3b, 0x64, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65, + 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -400,27 +454,30 @@ func file_downloader_downloader_proto_rawDescGZIP() []byte { return file_downloader_downloader_proto_rawDescData } -var file_downloader_downloader_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_downloader_downloader_proto_msgTypes = make([]protoimpl.MessageInfo, 6) var file_downloader_downloader_proto_goTypes = []interface{}{ (*DownloadItem)(nil), // 0: downloader.DownloadItem (*DownloadRequest)(nil), // 1: downloader.DownloadRequest - (*VerifyRequest)(nil), // 2: downloader.VerifyRequest - (*StatsRequest)(nil), // 3: downloader.StatsRequest - (*StatsReply)(nil), // 4: downloader.StatsReply - (*types.H160)(nil), // 5: types.H160 - (*emptypb.Empty)(nil), // 6: google.protobuf.Empty + (*DeleteRequest)(nil), // 2: downloader.DeleteRequest + (*VerifyRequest)(nil), // 3: downloader.VerifyRequest + (*StatsRequest)(nil), // 4: downloader.StatsRequest + (*StatsReply)(nil), // 5: downloader.StatsReply + (*types.H160)(nil), // 6: types.H160 + (*emptypb.Empty)(nil), // 7: google.protobuf.Empty } var file_downloader_downloader_proto_depIdxs = []int32{ - 5, // 0: downloader.DownloadItem.torrent_hash:type_name -> types.H160 + 6, // 0: downloader.DownloadItem.torrent_hash:type_name -> types.H160 0, // 1: downloader.DownloadRequest.items:type_name -> downloader.DownloadItem 1, // 2: downloader.Downloader.Download:input_type -> downloader.DownloadRequest - 2, // 3: downloader.Downloader.Verify:input_type -> downloader.VerifyRequest - 3, // 4: downloader.Downloader.Stats:input_type -> downloader.StatsRequest - 6, // 5: downloader.Downloader.Download:output_type -> google.protobuf.Empty - 6, // 6: downloader.Downloader.Verify:output_type -> google.protobuf.Empty - 4, // 7: downloader.Downloader.Stats:output_type -> downloader.StatsReply - 5, // [5:8] is the sub-list for method output_type - 2, // [2:5] is the sub-list for method input_type + 2, // 3: downloader.Downloader.Delete:input_type -> downloader.DeleteRequest + 3, // 4: downloader.Downloader.Verify:input_type -> downloader.VerifyRequest + 4, // 5: downloader.Downloader.Stats:input_type -> downloader.StatsRequest + 7, // 6: downloader.Downloader.Download:output_type -> google.protobuf.Empty + 7, // 7: downloader.Downloader.Delete:output_type -> google.protobuf.Empty + 7, // 8: downloader.Downloader.Verify:output_type -> google.protobuf.Empty + 5, // 9: downloader.Downloader.Stats:output_type -> downloader.StatsReply + 6, // [6:10] is the sub-list for method output_type + 2, // [2:6] is the sub-list for method input_type 2, // [2:2] is the sub-list for extension type_name 2, // [2:2] is the sub-list for extension extendee 0, // [0:2] is the sub-list for field type_name @@ -457,7 +514,7 @@ func file_downloader_downloader_proto_init() { } } file_downloader_downloader_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*VerifyRequest); i { + switch v := v.(*DeleteRequest); i { case 0: return &v.state case 1: @@ -469,7 +526,7 @@ func file_downloader_downloader_proto_init() { } } file_downloader_downloader_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*StatsRequest); i { + switch v := v.(*VerifyRequest); i { case 0: return &v.state case 1: @@ -481,6 +538,18 @@ func file_downloader_downloader_proto_init() { } } file_downloader_downloader_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StatsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_downloader_downloader_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*StatsReply); i { case 0: return &v.state @@ -499,7 +568,7 @@ func file_downloader_downloader_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_downloader_downloader_proto_rawDesc, NumEnums: 0, - NumMessages: 5, + NumMessages: 6, NumExtensions: 0, NumServices: 1, }, diff --git a/erigon-lib/gointerfaces/downloader/downloader_grpc.pb.go b/erigon-lib/gointerfaces/downloader/downloader_grpc.pb.go index 831743bbc..d4520105f 100644 --- a/erigon-lib/gointerfaces/downloader/downloader_grpc.pb.go +++ b/erigon-lib/gointerfaces/downloader/downloader_grpc.pb.go @@ -21,6 +21,7 @@ const _ = grpc.SupportPackageIsVersion7 const ( Downloader_Download_FullMethodName = "/downloader.Downloader/Download" + Downloader_Delete_FullMethodName = "/downloader.Downloader/Delete" Downloader_Verify_FullMethodName = "/downloader.Downloader/Verify" Downloader_Stats_FullMethodName = "/downloader.Downloader/Stats" ) @@ -30,6 +31,7 @@ const ( // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type DownloaderClient interface { Download(ctx context.Context, in *DownloadRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) Verify(ctx context.Context, in *VerifyRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) Stats(ctx context.Context, in *StatsRequest, opts ...grpc.CallOption) (*StatsReply, error) } @@ -51,6 +53,15 @@ func (c *downloaderClient) Download(ctx context.Context, in *DownloadRequest, op return out, nil } +func (c *downloaderClient) Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, Downloader_Delete_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *downloaderClient) Verify(ctx context.Context, in *VerifyRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { out := new(emptypb.Empty) err := c.cc.Invoke(ctx, Downloader_Verify_FullMethodName, in, out, opts...) @@ -74,6 +85,7 @@ func (c *downloaderClient) Stats(ctx context.Context, in *StatsRequest, opts ... // for forward compatibility type DownloaderServer interface { Download(context.Context, *DownloadRequest) (*emptypb.Empty, error) + Delete(context.Context, *DeleteRequest) (*emptypb.Empty, error) Verify(context.Context, *VerifyRequest) (*emptypb.Empty, error) Stats(context.Context, *StatsRequest) (*StatsReply, error) mustEmbedUnimplementedDownloaderServer() @@ -86,6 +98,9 @@ type UnimplementedDownloaderServer struct { func (UnimplementedDownloaderServer) Download(context.Context, *DownloadRequest) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method Download not implemented") } +func (UnimplementedDownloaderServer) Delete(context.Context, *DeleteRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method Delete not implemented") +} func (UnimplementedDownloaderServer) Verify(context.Context, *VerifyRequest) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method Verify not implemented") } @@ -123,6 +138,24 @@ func _Downloader_Download_Handler(srv interface{}, ctx context.Context, dec func return interceptor(ctx, in, info, handler) } +func _Downloader_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(DeleteRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DownloaderServer).Delete(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Downloader_Delete_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DownloaderServer).Delete(ctx, req.(*DeleteRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _Downloader_Verify_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(VerifyRequest) if err := dec(in); err != nil { @@ -170,6 +203,10 @@ var Downloader_ServiceDesc = grpc.ServiceDesc{ MethodName: "Download", Handler: _Downloader_Download_Handler, }, + { + MethodName: "Delete", + Handler: _Downloader_Delete_Handler, + }, { MethodName: "Verify", Handler: _Downloader_Verify_Handler, diff --git a/eth/backend.go b/eth/backend.go index 288f84238..5e5a80624 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -621,7 +621,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger // intiialize engine backend var engine *execution_client.ExecutionClientDirect - blockRetire := freezeblocks.NewBlockRetire(1, dirs, blockReader, blockWriter, freezeblocks.MergeSteps, backend.chainDB, backend.notifications.Events, logger) + blockRetire := freezeblocks.NewBlockRetire(1, dirs, blockReader, blockWriter, backend.chainDB, backend.notifications.Events, logger) miningRPC = privateapi.NewMiningServer(ctx, backend, ethashApi, logger) diff --git a/eth/stagedsync/stage_snapshots.go b/eth/stagedsync/stage_snapshots.go index 5332ae54c..3da1d60ba 100644 --- a/eth/stagedsync/stage_snapshots.go +++ b/eth/stagedsync/stage_snapshots.go @@ -314,12 +314,16 @@ func SnapshotsPrune(s *PruneState, initialCycle bool, cfg SnapshotsCfg, ctx cont } cfg.blockRetire.RetireBlocksInBackground(ctx, s.ForwardProgress, cfg.chainConfig.Bor != nil, log.LvlInfo, func(downloadRequest []services.DownloadRequest) error { - if cfg.snapshotDownloader != nil && !reflect.ValueOf(cfg.snapshotDownloader).IsNil() { - if err := snapshotsync.RequestSnapshotsDownload(ctx, downloadRequest, cfg.snapshotDownloader); err != nil { - return err - } + if cfg.snapshotDownloader == nil || reflect.ValueOf(cfg.snapshotDownloader).IsNil() { + return nil } - return nil + return snapshotsync.RequestSnapshotsDownload(ctx, downloadRequest, cfg.snapshotDownloader) + }, func(l []string) error { + if cfg.snapshotDownloader == nil || reflect.ValueOf(cfg.snapshotDownloader).IsNil() { + return nil + } + _, err := cfg.snapshotDownloader.Delete(ctx, &proto_downloader.DeleteRequest{Paths: l}) + return err }) //cfg.agg.BuildFilesInBackground() } diff --git a/go.mod b/go.mod index b7b2c0a28..da91f15ba 100644 --- a/go.mod +++ b/go.mod @@ -186,7 +186,7 @@ require ( github.com/koron/go-ssdp v0.0.4 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect - github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231027092055-45ee9d86a6cb // indirect + github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231101135659-d85154191b66 // indirect github.com/libp2p/go-buffer-pool v0.1.0 // indirect github.com/libp2p/go-cidranger v1.1.0 // indirect github.com/libp2p/go-flow-metrics v0.1.0 // indirect diff --git a/go.sum b/go.sum index 02969b329..749effc2a 100644 --- a/go.sum +++ b/go.sum @@ -539,8 +539,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231027092055-45ee9d86a6cb h1:Y6eZ4D8rMrAdDoy2za2Lkf8qSHbNNSPGnSdl/B78G4Y= -github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231027092055-45ee9d86a6cb/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo= +github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231101135659-d85154191b66 h1:0GY0mXeC6ThOw1x50tF1dDWso0kL6a5SS7k/zrsIF60= +github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231101135659-d85154191b66/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo= github.com/ledgerwatch/log/v3 v3.9.0 h1:iDwrXe0PVwBC68Dd94YSsHbMgQ3ufsgjzXtFNFVZFRk= github.com/ledgerwatch/log/v3 v3.9.0/go.mod h1:EiAY6upmI/6LkNhOVxb4eVsmsP11HZCnZ3PlJMjYiqE= github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ= diff --git a/turbo/app/snapshots_cmd.go b/turbo/app/snapshots_cmd.go index 47f10f625..5b9060880 100644 --- a/turbo/app/snapshots_cmd.go +++ b/turbo/app/snapshots_cmd.go @@ -404,7 +404,7 @@ func doRetireCommand(cliCtx *cli.Context) error { blockReader := freezeblocks.NewBlockReader(blockSnapshots, borSnapshots) blockWriter := blockio.NewBlockWriter(fromdb.HistV3(db)) - br := freezeblocks.NewBlockRetire(estimate.CompressSnapshot.Workers(), dirs, blockReader, blockWriter, freezeblocks.MergeSteps, db, nil, logger) + br := freezeblocks.NewBlockRetire(estimate.CompressSnapshot.Workers(), dirs, blockReader, blockWriter, db, nil, logger) agg, err := libstate.NewAggregatorV3(ctx, dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, db, logger) if err != nil { return err @@ -460,7 +460,7 @@ func doRetireCommand(cliCtx *cli.Context) error { } for i := from; i < to; i += every { - if err := br.RetireBlocks(ctx, i, i+every, log.LvlInfo, nil); err != nil { + if err := br.RetireBlocks(ctx, i, i+every, log.LvlInfo, nil, nil); err != nil { panic(err) } if err := db.Update(ctx, func(tx kv.RwTx) error { diff --git a/turbo/services/interfaces.go b/turbo/services/interfaces.go index 592006477..2b86e736f 100644 --- a/turbo/services/interfaces.go +++ b/turbo/services/interfaces.go @@ -93,7 +93,7 @@ type BlockSnapshots interface { // BlockRetire - freezing blocks: moving old data from DB to snapshot files type BlockRetire interface { PruneAncientBlocks(tx kv.RwTx, limit int, includeBor bool) error - RetireBlocksInBackground(ctx context.Context, maxBlockNumInDB uint64, includeBor bool, lvl log.Lvl, seedNewSnapshots func(downloadRequest []DownloadRequest) error) + RetireBlocksInBackground(ctx context.Context, maxBlockNumInDB uint64, includeBor bool, lvl log.Lvl, seedNewSnapshots func(downloadRequest []DownloadRequest) error, onDelete func(l []string) error) HasNewFrozenFiles() bool BuildMissedIndicesIfNeed(ctx context.Context, logPrefix string, notifier DBEventNotifier, cc *chain.Config) error } diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index dd36b7946..dc21c29f2 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -953,7 +953,7 @@ func BuildMissedIndices(logPrefix string, ctx context.Context, dirs datadir.Dirs } }() - for _, t := range snaptype.AllSnapshotTypes { + for _, t := range snaptype.BlockSnapshotTypes { for index := range segments { segment := segments[index] if segment.T != t { @@ -1059,7 +1059,7 @@ MainLoop: if f.From == f.To { continue } - for _, t := range snaptype.AllSnapshotTypes { + for _, t := range snaptype.BlockSnapshotTypes { p := filepath.Join(dir, snaptype.SegmentFileName(f.From, f.To, t)) if !dir2.FileExist(p) { continue MainLoop @@ -1202,12 +1202,10 @@ type BlockRetire struct { blockReader services.FullBlockReader blockWriter *blockio.BlockWriter dirs datadir.Dirs - - mergeSteps []uint64 } -func NewBlockRetire(workers int, dirs datadir.Dirs, blockReader services.FullBlockReader, blockWriter *blockio.BlockWriter, mergeSteps []uint64, db kv.RoDB, notifier services.DBEventNotifier, logger log.Logger) *BlockRetire { - return &BlockRetire{workers: workers, tmpDir: dirs.Tmp, dirs: dirs, blockReader: blockReader, blockWriter: blockWriter, mergeSteps: mergeSteps, db: db, notifier: notifier, logger: logger} +func NewBlockRetire(workers int, dirs datadir.Dirs, blockReader services.FullBlockReader, blockWriter *blockio.BlockWriter, db kv.RoDB, notifier services.DBEventNotifier, logger log.Logger) *BlockRetire { + return &BlockRetire{workers: workers, tmpDir: dirs.Tmp, dirs: dirs, blockReader: blockReader, blockWriter: blockWriter, db: db, notifier: notifier, logger: logger} } func (br *BlockRetire) snapshots() *RoSnapshots { return br.blockReader.Snapshots().(*RoSnapshots) } @@ -1268,7 +1266,7 @@ func CanDeleteTo(curBlockNum uint64, blocksInSnapshots uint64) (blockTo uint64) return cmp.Min(hardLimit, blocksInSnapshots+1) } -func (br *BlockRetire) RetireBlocks(ctx context.Context, blockFrom, blockTo uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error) error { +func (br *BlockRetire) RetireBlocks(ctx context.Context, blockFrom, blockTo uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error, onDelete func(l []string) error) error { chainConfig := fromdb.ChainConfig(br.db) notifier, logger, blockReader, tmpDir, db, workers := br.notifier, br.logger, br.blockReader, br.tmpDir, br.db, br.workers logger.Log(lvl, "[snapshots] Retire Blocks", "range", fmt.Sprintf("%dk-%dk", blockFrom/1000, blockTo/1000)) @@ -1286,34 +1284,31 @@ func (br *BlockRetire) RetireBlocks(ctx context.Context, blockFrom, blockTo uint if notifier != nil && !reflect.ValueOf(notifier).IsNil() { // notify about new snapshots of any size notifier.OnNewSnapshot() } - merger := NewMerger(tmpDir, workers, lvl, br.mergeSteps, db, chainConfig, notifier, logger) - rangesToMerge := merger.FindMergeRanges(snapshots.Ranges()) + merger := NewMerger(tmpDir, workers, lvl, db, chainConfig, logger) + rangesToMerge := merger.FindMergeRanges(snapshots.Ranges(), snapshots.BlocksAvailable()) if len(rangesToMerge) == 0 { return nil } - err := merger.Merge(ctx, snapshots, rangesToMerge, snapshots.Dir(), true /* doIndex */) + onMerge := func(r Range) error { + if notifier != nil && !reflect.ValueOf(notifier).IsNil() { // notify about new snapshots of any size + notifier.OnNewSnapshot() + } + + if seedNewSnapshots != nil { + downloadRequest := []services.DownloadRequest{ + services.NewDownloadRequest(&services.Range{From: r.from, To: r.to}, "", "", false /* Bor */), + } + if err := seedNewSnapshots(downloadRequest); err != nil { + return err + } + } + return nil + } + err := merger.Merge(ctx, snapshots, rangesToMerge, snapshots.Dir(), true /* doIndex */, onMerge, onDelete) if err != nil { return err } - if err := snapshots.ReopenFolder(); err != nil { - return fmt.Errorf("reopen: %w", err) - } - snapshots.LogStat() - if notifier != nil && !reflect.ValueOf(notifier).IsNil() { // notify about new snapshots of any size - notifier.OnNewSnapshot() - } - downloadRequest := make([]services.DownloadRequest, 0, len(rangesToMerge)) - for i := range rangesToMerge { - r := &services.Range{From: rangesToMerge[i].from, To: rangesToMerge[i].to} - downloadRequest = append(downloadRequest, services.NewDownloadRequest(r, "", "", false /* Bor */)) - } - - if seedNewSnapshots != nil { - if err := seedNewSnapshots(downloadRequest); err != nil { - return err - } - } return nil } @@ -1338,7 +1333,7 @@ func (br *BlockRetire) PruneAncientBlocks(tx kv.RwTx, limit int, includeBor bool return nil } -func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, forwardProgress uint64, includeBor bool, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error) { +func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, forwardProgress uint64, includeBor bool, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error, onDeleteSnapshots func(l []string) error) { ok := br.working.CompareAndSwap(false, true) if !ok { // go-routine is still working @@ -1349,7 +1344,7 @@ func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, forwardProg blockFrom, blockTo, ok := CanRetire(forwardProgress, br.blockReader.FrozenBlocks()) if ok { - if err := br.RetireBlocks(ctx, blockFrom, blockTo, lvl, seedNewSnapshots); err != nil { + if err := br.RetireBlocks(ctx, blockFrom, blockTo, lvl, seedNewSnapshots, onDeleteSnapshots); err != nil { br.logger.Warn("[snapshots] retire blocks", "err", err, "fromBlock", blockFrom, "toBlock", blockTo) } } @@ -2172,32 +2167,43 @@ type Merger struct { tmpDir string chainConfig *chain.Config chainDB kv.RoDB - notifier services.DBEventNotifier logger log.Logger - mergeSteps []uint64 } -func NewMerger(tmpDir string, compressWorkers int, lvl log.Lvl, mergeSteps []uint64, chainDB kv.RoDB, chainConfig *chain.Config, notifier services.DBEventNotifier, logger log.Logger) *Merger { - return &Merger{tmpDir: tmpDir, compressWorkers: compressWorkers, lvl: lvl, mergeSteps: mergeSteps, chainDB: chainDB, chainConfig: chainConfig, notifier: notifier, logger: logger} +func NewMerger(tmpDir string, compressWorkers int, lvl log.Lvl, chainDB kv.RoDB, chainConfig *chain.Config, logger log.Logger) *Merger { + return &Merger{tmpDir: tmpDir, compressWorkers: compressWorkers, lvl: lvl, chainDB: chainDB, chainConfig: chainConfig, logger: logger} } type Range struct { from, to uint64 } -func (r Range) From() uint64 { return r.from } -func (r Range) To() uint64 { return r.to } +func (r Range) From() uint64 { return r.from } +func (r Range) To() uint64 { return r.to } +func (r Range) IsRecent(max uint64) bool { return max-r.to < snaptype.Erigon2MergeLimit } + +type Ranges []Range + +func (r Ranges) String() string { + return fmt.Sprintf("%d", r) +} var MergeSteps = []uint64{500_000, 100_000, 10_000} +var RecentMergeSteps = []uint64{100_000, 10_000} -func (m *Merger) FindMergeRanges(currentRanges []Range) (toMerge []Range) { +func (m *Merger) FindMergeRanges(currentRanges []Range, maxBlockNum uint64) (toMerge []Range) { for i := len(currentRanges) - 1; i > 0; i-- { r := currentRanges[i] - if r.to-r.from >= snaptype.Erigon2MergeLimit { // is complete .seg - continue + isRecent := r.IsRecent(maxBlockNum) + mergeLimit, mergeSteps := uint64(snaptype.Erigon2MergeLimit), MergeSteps + if isRecent { + mergeLimit, mergeSteps = snaptype.Erigon2RecentMergeLimit, RecentMergeSteps } - for _, span := range m.mergeSteps { + if r.to-r.from >= mergeLimit { + continue + } + for _, span := range mergeSteps { if r.to%span != 0 { continue } @@ -2294,7 +2300,7 @@ func (m *Merger) filesByRange(snapshots *RoSnapshots, from, to uint64) (map[snap } // Merge does merge segments in given ranges -func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges []Range, snapDir string, doIndex bool) error { +func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges []Range, snapDir string, doIndex bool, onMerge func(r Range) error, onDelete func(l []string) error) error { if len(mergeRanges) == 0 { return nil } @@ -2306,7 +2312,7 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges return err } - for _, t := range snaptype.AllSnapshotTypes { + for _, t := range snaptype.BlockSnapshotTypes { segName := snaptype.SegmentFileName(r.from, r.to, t) f, ok := snaptype.ParseFileName(snapDir, segName) if !ok { @@ -2326,11 +2332,20 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges return fmt.Errorf("ReopenSegments: %w", err) } snapshots.LogStat() - if m.notifier != nil { // notify about new snapshots of any size - m.notifier.OnNewSnapshot() - time.Sleep(1 * time.Second) // i working on blocking API - to ensure client does not use old snapsthos - and then delete them + + if err := onMerge(r); err != nil { + return err } - for _, t := range snaptype.AllSnapshotTypes { + for _, t := range snaptype.BlockSnapshotTypes { + if len(toMerge[t]) == 0 { + continue + } + if err := onDelete(toMerge[t]); err != nil { + return err + } + } + time.Sleep(1 * time.Second) // i working on blocking API - to ensure client does not use old snapsthos - and then delete them + for _, t := range snaptype.BlockSnapshotTypes { m.removeOldFiles(toMerge[t], snapDir) } } @@ -2358,6 +2373,9 @@ func (m *Merger) merge(ctx context.Context, toMerge []string, targetFile string, } defer f.Close() + _, fName := filepath.Split(targetFile) + m.logger.Debug("[snapshots] merge", "file", fName) + for _, d := range cList { if err := d.WithReadAhead(func() error { g := d.MakeGetter() diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots_test.go b/turbo/snapshotsync/freezeblocks/block_snapshots_test.go index eb663df09..bfb051dbb 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots_test.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots_test.go @@ -58,19 +58,55 @@ func createTestSegmentFile(t *testing.T, from, to uint64, name snaptype.Type, di } func TestFindMergeRange(t *testing.T) { + merger := NewMerger("x", 1, log.LvlInfo, nil, params.MainnetChainConfig, nil) t.Run("big", func(t *testing.T) { - merger := NewMerger("x", 1, log.LvlInfo, MergeSteps, nil, params.MainnetChainConfig, nil, nil) var ranges []Range for i := 0; i < 24; i++ { ranges = append(ranges, Range{from: uint64(i * 100_000), to: uint64((i + 1) * 100_000)}) } - found := merger.FindMergeRanges(ranges) + found := merger.FindMergeRanges(ranges, uint64(24*100_000)) - var expect []Range - for i := 0; i < 4; i++ { - expect = append(expect, Range{from: uint64(i * snaptype.Erigon2MergeLimit), to: uint64((i + 1) * snaptype.Erigon2MergeLimit)}) + expect := []Range{ + {0, 500_000}, + {500_000, 1_000_000}, + {1_000_000, 1_500_000}, } - require.Equal(t, expect, found) + require.Equal(t, Ranges(expect).String(), Ranges(found).String()) + }) + + t.Run("small", func(t *testing.T) { + var ranges Ranges + for i := 0; i < 240; i++ { + ranges = append(ranges, Range{from: uint64(i * 10_000), to: uint64((i + 1) * 10_000)}) + } + found := merger.FindMergeRanges(ranges, uint64(240*10_000)) + + expect := Ranges{ + {0, 500_000}, + {500_000, 1_000_000}, + {1_000_000, 1_500_000}, + {1_500_000, 1_600_000}, + {1_600_000, 1_700_000}, + {1_700_000, 1_800_000}, + {1_800_000, 1_900_000}, + {1_900_000, 2_000_000}, + {2_000_000, 2_100_000}, + {2_100_000, 2_200_000}, + {2_200_000, 2_300_000}, + {2_300_000, 2_400_000}, + } + + require.Equal(t, expect.String(), Ranges(found).String()) + }) + + t.Run("IsRecent", func(t *testing.T) { + require.True(t, Range{500_000, 599_000}.IsRecent(1_000_000)) + require.True(t, Range{500_000, 501_000}.IsRecent(1_000_000)) + require.False(t, Range{499_000, 500_000}.IsRecent(1_000_000)) + require.False(t, Range{400_000, 500_000}.IsRecent(1_000_000)) + require.False(t, Range{400_000, 401_000}.IsRecent(1_000_000)) + + require.False(t, Range{500_000, 501_000}.IsRecent(1_100_000)) }) } @@ -79,12 +115,12 @@ func TestMergeSnapshots(t *testing.T) { logger := log.New() dir, require := t.TempDir(), require.New(t) createFile := func(from, to uint64) { - for _, snT := range snaptype.AllSnapshotTypes { + for _, snT := range snaptype.BlockSnapshotTypes { createTestSegmentFile(t, from, to, snT, dir, logger) } } - N := uint64(7) + N := uint64(17) createFile(0, snaptype.Erigon2MergeLimit) for i := uint64(snaptype.Erigon2MergeLimit); i < snaptype.Erigon2MergeLimit+N*100_000; i += 100_000 { createFile(i, i+100_000) @@ -93,10 +129,14 @@ func TestMergeSnapshots(t *testing.T) { defer s.Close() require.NoError(s.ReopenFolder()) { - merger := NewMerger(dir, 1, log.LvlInfo, MergeSteps, nil, params.MainnetChainConfig, nil, logger) - ranges := merger.FindMergeRanges(s.Ranges()) + merger := NewMerger(dir, 1, log.LvlInfo, nil, params.MainnetChainConfig, logger) + ranges := merger.FindMergeRanges(s.Ranges(), s.SegmentsMax()) require.True(len(ranges) > 0) - err := merger.Merge(context.Background(), s, ranges, s.Dir(), false) + err := merger.Merge(context.Background(), s, ranges, s.Dir(), false, func(r Range) error { + return nil + }, func(l []string) error { + return nil + }) require.NoError(err) } @@ -108,14 +148,18 @@ func TestMergeSnapshots(t *testing.T) { require.Equal(5, a) { - merger := NewMerger(dir, 1, log.LvlInfo, MergeSteps, nil, params.MainnetChainConfig, nil, logger) - ranges := merger.FindMergeRanges(s.Ranges()) + merger := NewMerger(dir, 1, log.LvlInfo, nil, params.MainnetChainConfig, logger) + ranges := merger.FindMergeRanges(s.Ranges(), s.SegmentsMax()) require.True(len(ranges) == 0) - err := merger.Merge(context.Background(), s, ranges, s.Dir(), false) + err := merger.Merge(context.Background(), s, ranges, s.Dir(), false, func(r Range) error { + return nil + }, func(l []string) error { + return nil + }) require.NoError(err) } - expectedFileName = snaptype.SegmentFileName(1_100_000, 1_200_000, snaptype.Transactions) + expectedFileName = snaptype.SegmentFileName(1_800_000, 1_900_000, snaptype.Transactions) d, err = compress.NewDecompressor(filepath.Join(dir, expectedFileName)) require.NoError(err) defer d.Close() diff --git a/turbo/snapshotsync/freezeblocks/bor_snapshots.go b/turbo/snapshotsync/freezeblocks/bor_snapshots.go index 26846a64e..a1240ccb5 100644 --- a/turbo/snapshotsync/freezeblocks/bor_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/bor_snapshots.go @@ -191,33 +191,34 @@ func (br *BlockRetire) RetireBorBlocks(ctx context.Context, blockFrom, blockTo u if notifier != nil && !reflect.ValueOf(notifier).IsNil() { // notify about new snapshots of any size notifier.OnNewSnapshot() } - merger := NewBorMerger(tmpDir, workers, lvl, br.mergeSteps, db, chainConfig, notifier, logger) - rangesToMerge := merger.FindMergeRanges(snapshots.Ranges()) + merger := NewBorMerger(tmpDir, workers, lvl, db, chainConfig, notifier, logger) + rangesToMerge := merger.FindMergeRanges(snapshots.Ranges(), snapshots.BlocksAvailable()) if len(rangesToMerge) == 0 { return nil } - err := merger.Merge(ctx, snapshots, rangesToMerge, snapshots.Dir(), true /* doIndex */) + onMerge := func(r Range) error { + if notifier != nil && !reflect.ValueOf(notifier).IsNil() { // notify about new snapshots of any size + notifier.OnNewSnapshot() + } + + if seedNewSnapshots != nil { + downloadRequest := []services.DownloadRequest{ + services.NewDownloadRequest(&services.Range{From: r.from, To: r.to}, "", "", true /* Bor */), + } + if err := seedNewSnapshots(downloadRequest); err != nil { + return err + } + } + return nil + } + onDelete := func(files []string) error { + //TODO: add Downloader API to delete files + return nil + } + err := merger.Merge(ctx, snapshots, rangesToMerge, snapshots.Dir(), true /* doIndex */, onMerge, onDelete) if err != nil { return err } - if err := snapshots.ReopenFolder(); err != nil { - return fmt.Errorf("reopen: %w", err) - } - snapshots.LogStat() - if notifier != nil && !reflect.ValueOf(notifier).IsNil() { // notify about new snapshots of any size - notifier.OnNewSnapshot() - } - downloadRequest := make([]services.DownloadRequest, 0, len(rangesToMerge)) - for i := range rangesToMerge { - r := &services.Range{From: rangesToMerge[i].from, To: rangesToMerge[i].to} - downloadRequest = append(downloadRequest, services.NewDownloadRequest(r, "", "", true /* Bor */)) - } - - if seedNewSnapshots != nil { - if err := seedNewSnapshots(downloadRequest); err != nil { - return err - } - } return nil } @@ -1059,21 +1060,25 @@ type BorMerger struct { chainDB kv.RoDB notifier services.DBEventNotifier logger log.Logger - mergeSteps []uint64 } -func NewBorMerger(tmpDir string, compressWorkers int, lvl log.Lvl, mergeSteps []uint64, chainDB kv.RoDB, chainConfig *chain.Config, notifier services.DBEventNotifier, logger log.Logger) *BorMerger { - return &BorMerger{tmpDir: tmpDir, compressWorkers: compressWorkers, lvl: lvl, mergeSteps: mergeSteps, chainDB: chainDB, chainConfig: chainConfig, notifier: notifier, logger: logger} +func NewBorMerger(tmpDir string, compressWorkers int, lvl log.Lvl, chainDB kv.RoDB, chainConfig *chain.Config, notifier services.DBEventNotifier, logger log.Logger) *BorMerger { + return &BorMerger{tmpDir: tmpDir, compressWorkers: compressWorkers, lvl: lvl, chainDB: chainDB, chainConfig: chainConfig, notifier: notifier, logger: logger} } -func (m *BorMerger) FindMergeRanges(currentRanges []Range) (toMerge []Range) { +func (m *BorMerger) FindMergeRanges(currentRanges []Range, maxBlockNum uint64) (toMerge []Range) { for i := len(currentRanges) - 1; i > 0; i-- { r := currentRanges[i] - if r.to-r.from >= snaptype.Erigon2MergeLimit { // is complete .seg - continue + isRecent := r.IsRecent(maxBlockNum) + mergeLimit, mergeSteps := uint64(snaptype.Erigon2RecentMergeLimit), MergeSteps + if isRecent { + mergeLimit, mergeSteps = snaptype.Erigon2MergeLimit, RecentMergeSteps } - for _, span := range m.mergeSteps { + if r.to-r.from >= mergeLimit { + continue + } + for _, span := range mergeSteps { if r.to%span != 0 { continue } @@ -1115,7 +1120,7 @@ func (m *BorMerger) filesByRange(snapshots *BorRoSnapshots, from, to uint64) (ma } // Merge does merge segments in given ranges -func (m *BorMerger) Merge(ctx context.Context, snapshots *BorRoSnapshots, mergeRanges []Range, snapDir string, doIndex bool) error { +func (m *BorMerger) Merge(ctx context.Context, snapshots *BorRoSnapshots, mergeRanges []Range, snapDir string, doIndex bool, onMerge func(r Range) error, onDelete func(l []string) error) error { if len(mergeRanges) == 0 { return nil } @@ -1147,11 +1152,19 @@ func (m *BorMerger) Merge(ctx context.Context, snapshots *BorRoSnapshots, mergeR return fmt.Errorf("ReopenSegments: %w", err) } snapshots.LogStat() - if m.notifier != nil { // notify about new snapshots of any size - m.notifier.OnNewSnapshot() - time.Sleep(1 * time.Second) // i working on blocking API - to ensure client does not use old snapsthos - and then delete them + if err := onMerge(r); err != nil { + return err } - for _, t := range []snaptype.Type{snaptype.BorEvents} { + for _, t := range snaptype.BlockSnapshotTypes { + if len(toMerge[t]) == 0 { + continue + } + if err := onDelete(toMerge[t]); err != nil { + return err + } + } + time.Sleep(1 * time.Second) // i working on blocking API - to ensure client does not use old snapsthos - and then delete them + for _, t := range snaptype.BlockSnapshotTypes { m.removeOldFiles(toMerge[t], snapDir) } } diff --git a/turbo/snapshotsync/snapshotsync.go b/turbo/snapshotsync/snapshotsync.go index 06c0a4a6f..2298ba8ed 100644 --- a/turbo/snapshotsync/snapshotsync.go +++ b/turbo/snapshotsync/snapshotsync.go @@ -35,7 +35,7 @@ const ( ) func BuildProtoRequest(downloadRequest []services.DownloadRequest) *proto_downloader.DownloadRequest { - req := &proto_downloader.DownloadRequest{Items: make([]*proto_downloader.DownloadItem, 0, len(snaptype.AllSnapshotTypes))} + req := &proto_downloader.DownloadRequest{Items: make([]*proto_downloader.DownloadItem, 0, len(snaptype.BlockSnapshotTypes))} for _, r := range downloadRequest { if r.Path != "" { if r.TorrentHash != "" { @@ -50,13 +50,13 @@ func BuildProtoRequest(downloadRequest []services.DownloadRequest) *proto_downlo } } else { if r.Bor { - for _, t := range []snaptype.Type{snaptype.BorEvents, snaptype.BorSpans} { + for _, t := range snaptype.BorSnapshotTypes { req.Items = append(req.Items, &proto_downloader.DownloadItem{ Path: snaptype.SegmentFileName(r.Ranges.From, r.Ranges.To, t), }) } } else { - for _, t := range snaptype.AllSnapshotTypes { + for _, t := range snaptype.BlockSnapshotTypes { req.Items = append(req.Items, &proto_downloader.DownloadItem{ Path: snaptype.SegmentFileName(r.Ranges.From, r.Ranges.To, t), }) diff --git a/turbo/stages/mock/mock_sentry.go b/turbo/stages/mock/mock_sentry.go index ada1746a5..df9e7df5f 100644 --- a/turbo/stages/mock/mock_sentry.go +++ b/turbo/stages/mock/mock_sentry.go @@ -417,7 +417,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK return block, nil } - blockRetire := freezeblocks.NewBlockRetire(1, dirs, mock.BlockReader, blockWriter, freezeblocks.MergeSteps, mock.DB, mock.Notifications.Events, logger) + blockRetire := freezeblocks.NewBlockRetire(1, dirs, mock.BlockReader, blockWriter, mock.DB, mock.Notifications.Events, logger) mock.Sync = stagedsync.New( stagedsync.DefaultStages(mock.Ctx, stagedsync.StageSnapshotsCfg(mock.DB, *mock.ChainConfig, dirs, blockRetire, snapshotsDownloader, mock.BlockReader, mock.Notifications.Events, mock.HistoryV3, mock.agg, nil),