snapshots: enum file types (#3683)

This commit is contained in:
Alex Sharov 2022-03-12 15:27:55 +07:00 committed by GitHub
parent f80ae03b46
commit aed679c8a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 104 additions and 126 deletions

View File

@ -374,42 +374,8 @@ func PruneSendersStage(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Co
}
if cfg.snapshots != nil && cfg.snapshots.Cfg().RetireEnabled {
if err := cfg.snapshots.EnsureExpectedBlocksAreAvailable(cfg.snapshotHashesCfg); err != nil {
return err
}
blockFrom := cfg.snapshots.BlocksAvailable() + 1
blockTo := s.ForwardProgress - params.FullImmutabilityThreshold
if blockTo-blockFrom > 1000 {
log.Info("[snapshots] Retire blocks", "from", blockFrom, "to", blockTo)
chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID)
wg := sync.WaitGroup{}
wg.Add(1)
go func() { // move to own goroutine, because in this goroutine already living RwTx
// in future we will do it in background
if err := snapshotsync.RetireBlocks(ctx, blockFrom, blockTo, *chainID, cfg.tmpdir, cfg.snapshots, cfg.db, 1, log.LvlDebug); err != nil {
panic(err)
//return err
}
if err := cfg.snapshots.ReopenSegments(); err != nil {
panic(err)
//return err
}
if err := cfg.snapshots.ReopenIndices(); err != nil {
panic(err)
//return err
}
// RoSnapshots must be atomic? Or we can create new instance?
// seed new 500K files
//if err := rawdb.DeleteAncientBlocks(tx, blockFrom, blockTo); err != nil {
// return nil
//}
defer wg.Done()
}()
wg.Wait()
fmt.Printf("sn runtime dump: %d-%d\n", blockFrom, blockTo)
if err := retireBlocks(s, tx, cfg, ctx); err != nil {
return fmt.Errorf("retireBlocks: %w", err)
}
}
@ -425,3 +391,47 @@ func PruneSendersStage(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Co
}
return nil
}
func retireBlocks(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Context) (err error) {
if err := cfg.snapshots.EnsureExpectedBlocksAreAvailable(cfg.snapshotHashesCfg); err != nil {
return err
}
blockFrom := cfg.snapshots.BlocksAvailable() + 1
blockTo := s.ForwardProgress - params.FullImmutabilityThreshold
if blockTo-blockFrom < 1000 {
return nil
}
//TODO: avoid too large deletes
log.Info("[snapshots] Retire blocks", "from", blockFrom, "to", blockTo)
chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID)
wg := sync.WaitGroup{}
wg.Add(1)
go func() { // move to own goroutine, because in this goroutine already living RwTx
// in future we will do it in background
if err := snapshotsync.RetireBlocks(ctx, blockFrom, blockTo, *chainID, cfg.tmpdir, cfg.snapshots, cfg.db, 1, log.LvlDebug); err != nil {
panic(err)
//return err
}
if err := cfg.snapshots.ReopenSegments(); err != nil {
panic(err)
//return err
}
if err := cfg.snapshots.ReopenIndices(); err != nil {
panic(err)
//return err
}
// RoSnapshots must be atomic? Or we can create new instance?
// seed new 500K files
//if err := rawdb.DeleteAncientBlocks(tx, blockFrom, blockTo); err != nil {
// return nil
//}
defer wg.Done()
}()
wg.Wait()
fmt.Printf("sn runtime dump: %d-%d\n", blockFrom, blockTo)
return nil
}

View File

@ -50,32 +50,62 @@ type BlocksSnapshot struct {
From, To uint64 // [from,to)
}
type Type string
type Type int
const (
Headers Type = "headers"
Bodies Type = "bodies"
Transactions Type = "transactions"
Headers Type = iota
Bodies
Transactions
NumberOfTypes
)
func (ft Type) String() string {
switch ft {
case Headers:
return "headers"
case Bodies:
return "bodies"
case Transactions:
return "transactions"
default:
panic(fmt.Sprintf("unknown file type: %d", ft))
}
}
func ParseFileType(s string) (Type, bool) {
switch s {
case "headers":
return Headers, true
case "bodies":
return Bodies, true
case "transactions":
return Transactions, true
default:
return NumberOfTypes, false
}
}
type IdxType string
const (
Transactions2Block Type = "transactions-to-block"
TransactionsId Type = "transactions-id"
Transactions2Block IdxType = "transactions-to-block"
TransactionsId IdxType = "transactions-id"
)
func (it IdxType) String() string { return string(it) }
var AllSnapshotTypes = []Type{Headers, Bodies, Transactions}
var AllIdxTypes = []Type{Headers, Bodies, Transactions, Transactions2Block}
var (
ErrInvalidFileName = fmt.Errorf("invalid compressed file name")
)
func FileName(from, to uint64, t Type) string {
return fmt.Sprintf("v1-%06d-%06d-%s", from/1_000, to/1_000, t)
func FileName(from, to uint64, fileType string) string {
return fmt.Sprintf("v1-%06d-%06d-%s", from/1_000, to/1_000, fileType)
}
func SegmentFileName(from, to uint64, t Type) string { return FileName(from, to, t) + ".seg" }
func DatFileName(from, to uint64, t Type) string { return FileName(from, to, t) + ".dat" }
func IdxFileName(from, to uint64, t Type) string { return FileName(from, to, t) + ".idx" }
func SegmentFileName(from, to uint64, t Type) string { return FileName(from, to, t.String()) + ".seg" }
func DatFileName(from, to uint64, fType string) string { return FileName(from, to, fType) + ".dat" }
func IdxFileName(from, to uint64, fType string) string { return FileName(from, to, fType) + ".idx" }
func (s BlocksSnapshot) Has(block uint64) bool { return block >= s.From && block < s.To }
@ -151,7 +181,7 @@ func (s *RoSnapshots) ReopenSomeIndices(types ...Type) (err error) {
bs.HeaderHashIdx.Close()
bs.HeaderHashIdx = nil
}
bs.HeaderHashIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Headers)))
bs.HeaderHashIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Headers.String())))
if err != nil {
return err
}
@ -160,7 +190,7 @@ func (s *RoSnapshots) ReopenSomeIndices(types ...Type) (err error) {
bs.BodyNumberIdx.Close()
bs.BodyNumberIdx = nil
}
bs.BodyNumberIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Bodies)))
bs.BodyNumberIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Bodies.String())))
if err != nil {
return err
}
@ -169,7 +199,7 @@ func (s *RoSnapshots) ReopenSomeIndices(types ...Type) (err error) {
bs.TxnHashIdx.Close()
bs.TxnHashIdx = nil
}
bs.TxnHashIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Transactions)))
bs.TxnHashIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Transactions.String())))
if err != nil {
return err
}
@ -178,7 +208,7 @@ func (s *RoSnapshots) ReopenSomeIndices(types ...Type) (err error) {
bs.TxnIdsIdx.Close()
bs.TxnIdsIdx = nil
}
bs.TxnIdsIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, TransactionsId)))
bs.TxnIdsIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, TransactionsId.String())))
if err != nil {
return err
}
@ -187,7 +217,7 @@ func (s *RoSnapshots) ReopenSomeIndices(types ...Type) (err error) {
bs.TxnHash2BlockNumIdx.Close()
bs.TxnHash2BlockNumIdx = nil
}
bs.TxnHash2BlockNumIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Transactions2Block)))
bs.TxnHash2BlockNumIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Transactions2Block.String())))
if err != nil {
return err
}
@ -601,7 +631,11 @@ func ParseFileName(dir string, f os.FileInfo) (res FileInfo, err error) {
return
}
var snapshotType Type
switch Type(parts[3]) {
ft, ok := ParseFileType(parts[3])
if !ok {
return res, fmt.Errorf("unexpected snapshot suffix: %s,%w", parts[2], ErrInvalidFileName)
}
switch ft {
case Headers:
snapshotType = Headers
case Bodies:
@ -984,7 +1018,7 @@ func TransactionsHashIdx(ctx context.Context, chainID uint256.Int, sn *BlocksSna
BucketSize: 2000,
LeafSize: 8,
TmpDir: tmpDir,
IndexFile: filepath.Join(dir, IdxFileName(sn.From, sn.To, Transactions)),
IndexFile: filepath.Join(dir, IdxFileName(sn.From, sn.To, Transactions.String())),
BaseDataID: firstTxID,
})
if err != nil {
@ -996,7 +1030,7 @@ func TransactionsHashIdx(ctx context.Context, chainID uint256.Int, sn *BlocksSna
BucketSize: 2000,
LeafSize: 8,
TmpDir: tmpDir,
IndexFile: filepath.Join(dir, IdxFileName(sn.From, sn.To, TransactionsId)),
IndexFile: filepath.Join(dir, IdxFileName(sn.From, sn.To, TransactionsId.String())),
BaseDataID: firstTxID,
})
if err != nil {
@ -1008,7 +1042,7 @@ func TransactionsHashIdx(ctx context.Context, chainID uint256.Int, sn *BlocksSna
BucketSize: 2000,
LeafSize: 8,
TmpDir: tmpDir,
IndexFile: filepath.Join(dir, IdxFileName(sn.From, sn.To, Transactions2Block)),
IndexFile: filepath.Join(dir, IdxFileName(sn.From, sn.To, Transactions2Block.String())),
BaseDataID: firstBlockNum,
})
if err != nil {

View File

@ -31,7 +31,7 @@ func createTestSegmentFile(t *testing.T, from, to uint64, name Type, dir string)
KeyCount: 1,
BucketSize: 10,
TmpDir: dir,
IndexFile: filepath.Join(dir, IdxFileName(from, to, name)),
IndexFile: filepath.Join(dir, IdxFileName(from, to, name.String())),
LeafSize: 8,
})
require.NoError(t, err)
@ -45,7 +45,7 @@ func createTestSegmentFile(t *testing.T, from, to uint64, name Type, dir string)
KeyCount: 1,
BucketSize: 10,
TmpDir: dir,
IndexFile: filepath.Join(dir, IdxFileName(from, to, Transactions2Block)),
IndexFile: filepath.Join(dir, IdxFileName(from, to, Transactions2Block.String())),
LeafSize: 8,
})
require.NoError(t, err)
@ -57,7 +57,7 @@ func createTestSegmentFile(t *testing.T, from, to uint64, name Type, dir string)
}
}
func TestMerge(t *testing.T) {
func TestMergeSnapshots(t *testing.T) {
dir, require := t.TempDir(), require.New(t)
createFile := func(from, to uint64) {
for _, snT := range AllSnapshotTypes {

View File

@ -1,22 +0,0 @@
package snapshotsync
import (
proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader"
"google.golang.org/grpc"
)
//go:generate ls ./../../interfaces/snapshot_downloader
//go:generate protoc --go_out=. --go-grpc_out=. --proto_path=./../../interfaces/snapshot_downloader "external_downloader.proto" -I=. -I=./../../build/include/google
func NewClient(addr string) (proto_downloader.DownloaderClient, func() error, error) {
opts := []grpc.DialOption{
grpc.WithInsecure(),
}
conn, err := grpc.Dial(addr, opts...)
if err != nil {
return nil, nil, err
}
return proto_downloader.NewDownloaderClient(conn), conn.Close, nil
}

View File

@ -1 +0,0 @@
package snapshotsync

View File

@ -1,42 +0,0 @@
package snapshotsync
import (
"context"
"encoding/binary"
"path/filepath"
"strconv"
"github.com/ledgerwatch/erigon-lib/kv"
)
func SnapshotName(baseDir, name string, blockNum uint64) string {
return filepath.Join(baseDir, name) + strconv.FormatUint(blockNum, 10)
}
func GetSnapshotInfo(db kv.RwDB) (uint64, []byte, error) {
tx, err := db.BeginRo(context.Background())
if err != nil {
return 0, nil, err
}
defer tx.Rollback()
v, err := tx.GetOne(kv.BittorrentInfo, kv.CurrentHeadersSnapshotBlock)
if err != nil {
return 0, nil, err
}
if v == nil {
return 0, nil, err
}
var snapshotBlock uint64
if len(v) == 8 {
snapshotBlock = binary.BigEndian.Uint64(v)
}
infohash, err := tx.GetOne(kv.BittorrentInfo, kv.CurrentHeadersSnapshotHash)
if err != nil {
return 0, nil, err
}
if infohash == nil {
return 0, nil, err
}
return snapshotBlock, infohash, nil
}

View File

@ -1 +0,0 @@
package snapshotsync