diff --git a/eth/stagedsync/stage_senders.go b/eth/stagedsync/stage_senders.go index 98b81a912..c95888454 100644 --- a/eth/stagedsync/stage_senders.go +++ b/eth/stagedsync/stage_senders.go @@ -374,42 +374,8 @@ func PruneSendersStage(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Co } if cfg.snapshots != nil && cfg.snapshots.Cfg().RetireEnabled { - if err := cfg.snapshots.EnsureExpectedBlocksAreAvailable(cfg.snapshotHashesCfg); err != nil { - return err - } - blockFrom := cfg.snapshots.BlocksAvailable() + 1 - blockTo := s.ForwardProgress - params.FullImmutabilityThreshold - if blockTo-blockFrom > 1000 { - log.Info("[snapshots] Retire blocks", "from", blockFrom, "to", blockTo) - chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { // move to own goroutine, because in this goroutine already living RwTx - // in future we will do it in background - if err := snapshotsync.RetireBlocks(ctx, blockFrom, blockTo, *chainID, cfg.tmpdir, cfg.snapshots, cfg.db, 1, log.LvlDebug); err != nil { - panic(err) - //return err - } - if err := cfg.snapshots.ReopenSegments(); err != nil { - panic(err) - //return err - } - if err := cfg.snapshots.ReopenIndices(); err != nil { - panic(err) - - //return err - } - // RoSnapshots must be atomic? Or we can create new instance? - // seed new 500K files - - //if err := rawdb.DeleteAncientBlocks(tx, blockFrom, blockTo); err != nil { - // return nil - //} - - defer wg.Done() - }() - wg.Wait() - fmt.Printf("sn runtime dump: %d-%d\n", blockFrom, blockTo) + if err := retireBlocks(s, tx, cfg, ctx); err != nil { + return fmt.Errorf("retireBlocks: %w", err) } } @@ -425,3 +391,47 @@ func PruneSendersStage(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Co } return nil } + +func retireBlocks(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Context) (err error) { + if err := cfg.snapshots.EnsureExpectedBlocksAreAvailable(cfg.snapshotHashesCfg); err != nil { + return err + } + blockFrom := cfg.snapshots.BlocksAvailable() + 1 + blockTo := s.ForwardProgress - params.FullImmutabilityThreshold + if blockTo-blockFrom < 1000 { + return nil + } + //TODO: avoid too large deletes + + log.Info("[snapshots] Retire blocks", "from", blockFrom, "to", blockTo) + chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { // move to own goroutine, because in this goroutine already living RwTx + // in future we will do it in background + if err := snapshotsync.RetireBlocks(ctx, blockFrom, blockTo, *chainID, cfg.tmpdir, cfg.snapshots, cfg.db, 1, log.LvlDebug); err != nil { + panic(err) + //return err + } + if err := cfg.snapshots.ReopenSegments(); err != nil { + panic(err) + //return err + } + if err := cfg.snapshots.ReopenIndices(); err != nil { + panic(err) + + //return err + } + // RoSnapshots must be atomic? Or we can create new instance? + // seed new 500K files + + //if err := rawdb.DeleteAncientBlocks(tx, blockFrom, blockTo); err != nil { + // return nil + //} + + defer wg.Done() + }() + wg.Wait() + fmt.Printf("sn runtime dump: %d-%d\n", blockFrom, blockTo) + return nil +} diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go index f48ffe291..a41bc1a02 100644 --- a/turbo/snapshotsync/block_snapshots.go +++ b/turbo/snapshotsync/block_snapshots.go @@ -50,32 +50,62 @@ type BlocksSnapshot struct { From, To uint64 // [from,to) } -type Type string +type Type int const ( - Headers Type = "headers" - Bodies Type = "bodies" - Transactions Type = "transactions" + Headers Type = iota + Bodies + Transactions + NumberOfTypes ) +func (ft Type) String() string { + switch ft { + case Headers: + return "headers" + case Bodies: + return "bodies" + case Transactions: + return "transactions" + default: + panic(fmt.Sprintf("unknown file type: %d", ft)) + } +} + +func ParseFileType(s string) (Type, bool) { + switch s { + case "headers": + return Headers, true + case "bodies": + return Bodies, true + case "transactions": + return Transactions, true + default: + return NumberOfTypes, false + } +} + +type IdxType string + const ( - Transactions2Block Type = "transactions-to-block" - TransactionsId Type = "transactions-id" + Transactions2Block IdxType = "transactions-to-block" + TransactionsId IdxType = "transactions-id" ) +func (it IdxType) String() string { return string(it) } + var AllSnapshotTypes = []Type{Headers, Bodies, Transactions} -var AllIdxTypes = []Type{Headers, Bodies, Transactions, Transactions2Block} var ( ErrInvalidFileName = fmt.Errorf("invalid compressed file name") ) -func FileName(from, to uint64, t Type) string { - return fmt.Sprintf("v1-%06d-%06d-%s", from/1_000, to/1_000, t) +func FileName(from, to uint64, fileType string) string { + return fmt.Sprintf("v1-%06d-%06d-%s", from/1_000, to/1_000, fileType) } -func SegmentFileName(from, to uint64, t Type) string { return FileName(from, to, t) + ".seg" } -func DatFileName(from, to uint64, t Type) string { return FileName(from, to, t) + ".dat" } -func IdxFileName(from, to uint64, t Type) string { return FileName(from, to, t) + ".idx" } +func SegmentFileName(from, to uint64, t Type) string { return FileName(from, to, t.String()) + ".seg" } +func DatFileName(from, to uint64, fType string) string { return FileName(from, to, fType) + ".dat" } +func IdxFileName(from, to uint64, fType string) string { return FileName(from, to, fType) + ".idx" } func (s BlocksSnapshot) Has(block uint64) bool { return block >= s.From && block < s.To } @@ -151,7 +181,7 @@ func (s *RoSnapshots) ReopenSomeIndices(types ...Type) (err error) { bs.HeaderHashIdx.Close() bs.HeaderHashIdx = nil } - bs.HeaderHashIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Headers))) + bs.HeaderHashIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Headers.String()))) if err != nil { return err } @@ -160,7 +190,7 @@ func (s *RoSnapshots) ReopenSomeIndices(types ...Type) (err error) { bs.BodyNumberIdx.Close() bs.BodyNumberIdx = nil } - bs.BodyNumberIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Bodies))) + bs.BodyNumberIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Bodies.String()))) if err != nil { return err } @@ -169,7 +199,7 @@ func (s *RoSnapshots) ReopenSomeIndices(types ...Type) (err error) { bs.TxnHashIdx.Close() bs.TxnHashIdx = nil } - bs.TxnHashIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Transactions))) + bs.TxnHashIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Transactions.String()))) if err != nil { return err } @@ -178,7 +208,7 @@ func (s *RoSnapshots) ReopenSomeIndices(types ...Type) (err error) { bs.TxnIdsIdx.Close() bs.TxnIdsIdx = nil } - bs.TxnIdsIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, TransactionsId))) + bs.TxnIdsIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, TransactionsId.String()))) if err != nil { return err } @@ -187,7 +217,7 @@ func (s *RoSnapshots) ReopenSomeIndices(types ...Type) (err error) { bs.TxnHash2BlockNumIdx.Close() bs.TxnHash2BlockNumIdx = nil } - bs.TxnHash2BlockNumIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Transactions2Block))) + bs.TxnHash2BlockNumIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Transactions2Block.String()))) if err != nil { return err } @@ -601,7 +631,11 @@ func ParseFileName(dir string, f os.FileInfo) (res FileInfo, err error) { return } var snapshotType Type - switch Type(parts[3]) { + ft, ok := ParseFileType(parts[3]) + if !ok { + return res, fmt.Errorf("unexpected snapshot suffix: %s,%w", parts[2], ErrInvalidFileName) + } + switch ft { case Headers: snapshotType = Headers case Bodies: @@ -984,7 +1018,7 @@ func TransactionsHashIdx(ctx context.Context, chainID uint256.Int, sn *BlocksSna BucketSize: 2000, LeafSize: 8, TmpDir: tmpDir, - IndexFile: filepath.Join(dir, IdxFileName(sn.From, sn.To, Transactions)), + IndexFile: filepath.Join(dir, IdxFileName(sn.From, sn.To, Transactions.String())), BaseDataID: firstTxID, }) if err != nil { @@ -996,7 +1030,7 @@ func TransactionsHashIdx(ctx context.Context, chainID uint256.Int, sn *BlocksSna BucketSize: 2000, LeafSize: 8, TmpDir: tmpDir, - IndexFile: filepath.Join(dir, IdxFileName(sn.From, sn.To, TransactionsId)), + IndexFile: filepath.Join(dir, IdxFileName(sn.From, sn.To, TransactionsId.String())), BaseDataID: firstTxID, }) if err != nil { @@ -1008,7 +1042,7 @@ func TransactionsHashIdx(ctx context.Context, chainID uint256.Int, sn *BlocksSna BucketSize: 2000, LeafSize: 8, TmpDir: tmpDir, - IndexFile: filepath.Join(dir, IdxFileName(sn.From, sn.To, Transactions2Block)), + IndexFile: filepath.Join(dir, IdxFileName(sn.From, sn.To, Transactions2Block.String())), BaseDataID: firstBlockNum, }) if err != nil { diff --git a/turbo/snapshotsync/block_snapshots_test.go b/turbo/snapshotsync/block_snapshots_test.go index 0a40a2719..eb3f09948 100644 --- a/turbo/snapshotsync/block_snapshots_test.go +++ b/turbo/snapshotsync/block_snapshots_test.go @@ -31,7 +31,7 @@ func createTestSegmentFile(t *testing.T, from, to uint64, name Type, dir string) KeyCount: 1, BucketSize: 10, TmpDir: dir, - IndexFile: filepath.Join(dir, IdxFileName(from, to, name)), + IndexFile: filepath.Join(dir, IdxFileName(from, to, name.String())), LeafSize: 8, }) require.NoError(t, err) @@ -45,7 +45,7 @@ func createTestSegmentFile(t *testing.T, from, to uint64, name Type, dir string) KeyCount: 1, BucketSize: 10, TmpDir: dir, - IndexFile: filepath.Join(dir, IdxFileName(from, to, Transactions2Block)), + IndexFile: filepath.Join(dir, IdxFileName(from, to, Transactions2Block.String())), LeafSize: 8, }) require.NoError(t, err) @@ -57,7 +57,7 @@ func createTestSegmentFile(t *testing.T, from, to uint64, name Type, dir string) } } -func TestMerge(t *testing.T) { +func TestMergeSnapshots(t *testing.T) { dir, require := t.TempDir(), require.New(t) createFile := func(from, to uint64) { for _, snT := range AllSnapshotTypes { diff --git a/turbo/snapshotsync/client.go b/turbo/snapshotsync/client.go deleted file mode 100644 index 707640cfb..000000000 --- a/turbo/snapshotsync/client.go +++ /dev/null @@ -1,22 +0,0 @@ -package snapshotsync - -import ( - proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader" - "google.golang.org/grpc" -) - -//go:generate ls ./../../interfaces/snapshot_downloader -//go:generate protoc --go_out=. --go-grpc_out=. --proto_path=./../../interfaces/snapshot_downloader "external_downloader.proto" -I=. -I=./../../build/include/google - -func NewClient(addr string) (proto_downloader.DownloaderClient, func() error, error) { - opts := []grpc.DialOption{ - grpc.WithInsecure(), - } - - conn, err := grpc.Dial(addr, opts...) - if err != nil { - return nil, nil, err - } - - return proto_downloader.NewDownloaderClient(conn), conn.Close, nil -} diff --git a/turbo/snapshotsync/headers_snapshot.go b/turbo/snapshotsync/headers_snapshot.go deleted file mode 100644 index adcd09ff1..000000000 --- a/turbo/snapshotsync/headers_snapshot.go +++ /dev/null @@ -1 +0,0 @@ -package snapshotsync diff --git a/turbo/snapshotsync/snapshot_builder.go b/turbo/snapshotsync/snapshot_builder.go deleted file mode 100644 index 33443659e..000000000 --- a/turbo/snapshotsync/snapshot_builder.go +++ /dev/null @@ -1,42 +0,0 @@ -package snapshotsync - -import ( - "context" - "encoding/binary" - "path/filepath" - "strconv" - - "github.com/ledgerwatch/erigon-lib/kv" -) - -func SnapshotName(baseDir, name string, blockNum uint64) string { - return filepath.Join(baseDir, name) + strconv.FormatUint(blockNum, 10) -} - -func GetSnapshotInfo(db kv.RwDB) (uint64, []byte, error) { - tx, err := db.BeginRo(context.Background()) - if err != nil { - return 0, nil, err - } - defer tx.Rollback() - v, err := tx.GetOne(kv.BittorrentInfo, kv.CurrentHeadersSnapshotBlock) - if err != nil { - return 0, nil, err - } - if v == nil { - return 0, nil, err - } - var snapshotBlock uint64 - if len(v) == 8 { - snapshotBlock = binary.BigEndian.Uint64(v) - } - - infohash, err := tx.GetOne(kv.BittorrentInfo, kv.CurrentHeadersSnapshotHash) - if err != nil { - return 0, nil, err - } - if infohash == nil { - return 0, nil, err - } - return snapshotBlock, infohash, nil -} diff --git a/turbo/snapshotsync/snapshot_builder_test.go b/turbo/snapshotsync/snapshot_builder_test.go deleted file mode 100644 index adcd09ff1..000000000 --- a/turbo/snapshotsync/snapshot_builder_test.go +++ /dev/null @@ -1 +0,0 @@ -package snapshotsync