erigon-pulse/turbo/snapshotsync/snapshot_builder.go
b00ris 66c7c669fb
Snapshot builder simplification (#2064)
* fix test

* get rid of ObjectDatabase

* sn_builder_prototype2

* save state

* save state

* integration step1

* fix lint

* fix

* fix test

* integrate migrator.finish

* fix lint

* fix build

* fix typo

* fix
2021-06-04 13:30:02 +01:00

421 lines
13 KiB
Go

package snapshotsync
import (
"context"
"encoding/binary"
"errors"
"fmt"
"github.com/ledgerwatch/erigon/params"
"io/ioutil"
"os"
"path"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/anacrolix/torrent/metainfo"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/ethdb"
"github.com/ledgerwatch/erigon/log"
)
func NewMigrator(snapshotDir string, currentSnapshotBlock uint64, currentSnapshotInfohash []byte, useMdbx bool) *SnapshotMigrator {
return &SnapshotMigrator{
snapshotsDir: snapshotDir,
HeadersCurrentSnapshot: currentSnapshotBlock,
HeadersNewSnapshotInfohash: currentSnapshotInfohash,
useMdbx: useMdbx,
replaceChan: make(chan struct{}),
}
}
type SnapshotMigrator struct {
snapshotsDir string
HeadersCurrentSnapshot uint64
HeadersNewSnapshot uint64
HeadersNewSnapshotInfohash []byte
useMdbx bool
started uint64
replaceChan chan struct{}
replaced uint64
}
func (sm *SnapshotMigrator) AsyncStages(migrateToBlock uint64, dbi ethdb.RwKV, rwTX ethdb.Tx, bittorrent *Client, async bool) error {
if sm.HeadersCurrentSnapshot >= migrateToBlock || atomic.LoadUint64(&sm.HeadersNewSnapshot) >= migrateToBlock || atomic.LoadUint64(&sm.started) > 0 {
return nil
}
atomic.StoreUint64(&sm.started, 1)
snapshotPath := SnapshotName(sm.snapshotsDir, "headers", migrateToBlock)
sm.HeadersNewSnapshot = migrateToBlock
atomic.StoreUint64(&sm.replaced, 0)
stages := []func(db ethdb.RoKV, tx ethdb.Tx, toBlock uint64) error{
func(db ethdb.RoKV, tx ethdb.Tx, toBlock uint64) error {
return CreateHeadersSnapshot(context.Background(), tx, toBlock, snapshotPath, sm.useMdbx)
},
func(db ethdb.RoKV, tx ethdb.Tx, toBlock uint64) error {
//replace snapshot
if _, ok := db.(ethdb.SnapshotUpdater); !ok {
return errors.New("db don't implement snapshotUpdater interface")
}
snapshotKV, err := OpenHeadersSnapshot(snapshotPath, sm.useMdbx)
if err != nil {
return err
}
db.(ethdb.SnapshotUpdater).UpdateSnapshots([]string{dbutils.HeadersBucket}, snapshotKV, sm.replaceChan)
return nil
},
func(db ethdb.RoKV, tx ethdb.Tx, toBlock uint64) error {
//todo headers infohash
var infohash []byte
var err error
infohash, err = tx.GetOne(dbutils.BittorrentInfoBucket, dbutils.CurrentHeadersSnapshotHash)
if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) {
log.Error("Get infohash", "err", err, "block", toBlock)
return err
}
if len(infohash) == 20 {
var hash metainfo.Hash
copy(hash[:], infohash)
log.Info("Stop seeding snapshot", "type", "headers", "infohash", hash.String())
err = bittorrent.StopSeeding(hash)
if err != nil {
log.Error("Stop seeding", "err", err, "block", toBlock)
return err
}
log.Info("Stopped seeding snapshot", "type", "headers", "infohash", hash.String())
//atomic.StoreUint64(&sm.Stage, StageStartSeedingNew)
} else {
log.Warn("Hasn't stopped snapshot", "infohash", common.Bytes2Hex(infohash))
}
return nil
},
func(db ethdb.RoKV, tx ethdb.Tx, toBlock uint64) error {
log.Info("Start seeding snapshot", "type", "headers")
seedingInfoHash, err := bittorrent.SeedSnapshot("headers", snapshotPath)
if err != nil {
log.Error("Seeding", "err", err)
return err
}
sm.HeadersNewSnapshotInfohash = seedingInfoHash[:]
log.Info("Started seeding snapshot", "type", "headers", "infohash", seedingInfoHash.String())
atomic.StoreUint64(&sm.started, 2)
return nil
},
}
startStages := func(tx ethdb.Tx) (innerErr error) {
defer func() {
if innerErr != nil {
atomic.StoreUint64(&sm.started, 0)
atomic.StoreUint64(&sm.HeadersNewSnapshot, 0)
log.Error("Error on stage. Rollback", "err", innerErr)
}
}()
for i := range stages {
innerErr = stages[i](dbi, tx, migrateToBlock)
if innerErr != nil {
return innerErr
}
}
return nil
}
if async {
go func() {
//@todo think about possibility that write tx has uncommited data that we don't have in readTXs
readTX, err := dbi.BeginRo(context.Background())
if err != nil {
//return fmt.Errorf("begin err: %w", err)
return
}
defer readTX.Rollback()
innerErr := startStages(readTX)
if innerErr != nil {
log.Error("Error ", "err", innerErr)
}
}()
} else {
return startStages(rwTX)
}
return nil
}
func (sm *SnapshotMigrator) Replaced() bool {
select {
case <-sm.replaceChan:
log.Info("Snapshot replaced")
atomic.StoreUint64(&sm.replaced, 1)
default:
}
return atomic.LoadUint64(&sm.replaced) == 1
}
func (sm *SnapshotMigrator) SyncStages(migrateToBlock uint64, dbi ethdb.RwKV, rwTX ethdb.RwTx) error {
log.Info("SyncStages", "started", atomic.LoadUint64(&sm.started))
if atomic.LoadUint64(&sm.started) == 2 && sm.Replaced() {
syncStages := []func(db ethdb.RoKV, tx ethdb.RwTx, toBlock uint64) error{
func(db ethdb.RoKV, tx ethdb.RwTx, toBlock uint64) error {
log.Info("Prune db", "current", sm.HeadersCurrentSnapshot, "new", atomic.LoadUint64(&sm.HeadersNewSnapshot))
return RemoveHeadersData(db, tx, sm.HeadersCurrentSnapshot, atomic.LoadUint64(&sm.HeadersNewSnapshot))
},
func(db ethdb.RoKV, tx ethdb.RwTx, toBlock uint64) error {
log.Info("Save CurrentHeadersSnapshotHash", "new", common.Bytes2Hex(sm.HeadersNewSnapshotInfohash), "new", atomic.LoadUint64(&sm.HeadersNewSnapshot))
c, err := tx.RwCursor(dbutils.BittorrentInfoBucket)
if err != nil {
return err
}
if len(sm.HeadersNewSnapshotInfohash) == 20 {
err = c.Put(dbutils.CurrentHeadersSnapshotHash, sm.HeadersNewSnapshotInfohash)
if err != nil {
return err
}
}
return c.Put(dbutils.CurrentHeadersSnapshotBlock, dbutils.EncodeBlockNumber(atomic.LoadUint64(&sm.HeadersNewSnapshot)))
},
}
for i := range syncStages {
innerErr := syncStages[i](dbi, rwTX, migrateToBlock)
if innerErr != nil {
return innerErr
}
}
atomic.StoreUint64(&sm.started, 3)
}
return nil
}
func (sm *SnapshotMigrator) Final(tx ethdb.Tx) error {
if atomic.LoadUint64(&sm.started) < 3 {
return nil
}
v, err := tx.GetOne(dbutils.BittorrentInfoBucket, dbutils.CurrentHeadersSnapshotBlock)
if errors.Is(err, ethdb.ErrKeyNotFound) {
return nil
}
if err != nil {
return err
}
if len(v) != 8 {
log.Error("Incorrect length", "ln", len(v))
return nil
}
if sm.HeadersCurrentSnapshot < atomic.LoadUint64(&sm.HeadersNewSnapshot) && sm.HeadersCurrentSnapshot != 0 {
oldSnapshotPath := SnapshotName(sm.snapshotsDir, "headers", sm.HeadersCurrentSnapshot)
log.Info("Removing old snapshot", "path", oldSnapshotPath)
tt := time.Now()
err = os.RemoveAll(oldSnapshotPath)
if err != nil {
log.Error("Remove snapshot", "err", err)
return err
}
log.Info("Removed old snapshot", "path", oldSnapshotPath, "t", time.Since(tt))
}
if binary.BigEndian.Uint64(v) == atomic.LoadUint64(&sm.HeadersNewSnapshot) {
atomic.StoreUint64(&sm.HeadersCurrentSnapshot, sm.HeadersNewSnapshot)
atomic.StoreUint64(&sm.started, 0)
atomic.StoreUint64(&sm.replaced, 0)
log.Info("CurrentHeadersSnapshotBlock commited", "block", binary.BigEndian.Uint64(v))
return nil
}
return nil
}
func (sm *SnapshotMigrator) RemoveNonCurrentSnapshots() error {
files, err := ioutil.ReadDir(sm.snapshotsDir)
if err != nil {
return err
}
for i := range files {
snapshotName := files[i].Name()
if files[i].IsDir() && strings.HasPrefix(snapshotName, "headers") {
snapshotBlock, innerErr := strconv.ParseUint(strings.TrimPrefix(snapshotName, "headers"), 10, 64)
if innerErr != nil {
log.Warn("unknown snapshot", "name", snapshotName, "err", innerErr)
continue
}
if snapshotBlock != sm.HeadersCurrentSnapshot {
snapshotPath := path.Join(sm.snapshotsDir, snapshotName)
innerErr = os.RemoveAll(snapshotPath)
if innerErr != nil {
log.Warn("useless snapshot has't removed", "path", snapshotPath, "err", innerErr)
}
log.Info("removed useless snapshot", "path", snapshotPath)
}
}
}
return nil
}
//CalculateEpoch - returns latest available snapshot block that possible to create.
func CalculateEpoch(block, epochSize uint64) uint64 {
return block - (block+params.FullImmutabilityThreshold)%epochSize
}
func SnapshotName(baseDir, name string, blockNum uint64) string {
return path.Join(baseDir, name) + strconv.FormatUint(blockNum, 10)
}
func GetSnapshotInfo(db ethdb.Database) (uint64, []byte, error) {
v, err := db.Get(dbutils.BittorrentInfoBucket, dbutils.CurrentHeadersSnapshotBlock)
if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) {
return 0, nil, err
}
var snapshotBlock uint64
if len(v) == 8 {
snapshotBlock = binary.BigEndian.Uint64(v)
}
infohash, err := db.Get(dbutils.BittorrentInfoBucket, dbutils.CurrentHeadersSnapshotHash)
if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) {
return 0, nil, err
}
return snapshotBlock, infohash, nil
}
func OpenHeadersSnapshot(dbPath string, useMdbx bool) (ethdb.RwKV, error) {
if useMdbx {
return ethdb.NewMDBX().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg {
return dbutils.BucketsCfg{
dbutils.HeadersBucket: dbutils.BucketsConfigs[dbutils.HeadersBucket],
}
}).Readonly().Path(dbPath).Open()
} else {
return ethdb.NewLMDB().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg {
return dbutils.BucketsCfg{
dbutils.HeadersBucket: dbutils.BucketsConfigs[dbutils.HeadersBucket],
}
}).Readonly().Path(dbPath).Open()
}
}
func CreateHeadersSnapshot(ctx context.Context, readTX ethdb.Tx, toBlock uint64, snapshotPath string, useMdbx bool) error {
// remove created snapshot if it's not saved in main db(to avoid append error)
err := os.RemoveAll(snapshotPath)
if err != nil {
return err
}
var snKV ethdb.RwKV
if useMdbx {
snKV, err = ethdb.NewMDBX().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg {
return dbutils.BucketsCfg{
dbutils.HeadersBucket: dbutils.BucketsConfigs[dbutils.HeadersBucket],
}
}).Path(snapshotPath).Open()
if err != nil {
return err
}
} else {
snKV, err = ethdb.NewLMDB().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg {
return dbutils.BucketsCfg{
dbutils.HeadersBucket: dbutils.BucketsConfigs[dbutils.HeadersBucket],
}
}).Path(snapshotPath).Open()
if err != nil {
return err
}
}
sntx, err := snKV.BeginRw(context.Background())
if err != nil {
return fmt.Errorf("begin err: %w", err)
}
defer sntx.Rollback()
err = GenerateHeadersSnapshot(ctx, readTX, sntx, toBlock)
if err != nil {
return fmt.Errorf("generate err: %w", err)
}
err = sntx.Commit()
if err != nil {
return fmt.Errorf("commit err: %w", err)
}
snKV.Close()
return nil
}
func GenerateHeadersSnapshot(ctx context.Context, db ethdb.Tx, sntx ethdb.RwTx, toBlock uint64) error {
headerCursor, err := sntx.RwCursor(dbutils.HeadersBucket)
if err != nil {
return err
}
var hash common.Hash
var header []byte
t := time.NewTicker(time.Second * 30)
defer t.Stop()
tt := time.Now()
for i := uint64(0); i <= toBlock; i++ {
if common.IsCanceled(ctx) {
return common.ErrStopped
}
select {
case <-t.C:
log.Info("Headers snapshot generation", "t", time.Since(tt), "block", i)
default:
}
hash, err = rawdb.ReadCanonicalHash(db, i)
if err != nil {
return err
}
header = rawdb.ReadHeaderRLP(db, hash, i)
if len(header) < 2 {
return fmt.Errorf("header %d is empty, %v", i, header)
}
err = headerCursor.Append(dbutils.HeaderKey(i, hash), header)
if err != nil {
return err
}
}
return nil
}
func RemoveHeadersData(db ethdb.RoKV, tx ethdb.RwTx, currentSnapshot, newSnapshot uint64) (err error) {
log.Info("Remove data", "from", currentSnapshot, "to", newSnapshot)
if _, ok := db.(ethdb.SnapshotUpdater); !ok {
return errors.New("db don't implement snapshotUpdater interface")
}
headerSnapshot := db.(ethdb.SnapshotUpdater).SnapshotKV(dbutils.HeadersBucket)
if headerSnapshot == nil {
log.Info("headerSnapshot is empty")
return nil
}
writeTX := tx.(ethdb.DBTX).DBTX()
c, err := writeTX.RwCursor(dbutils.HeadersBucket)
if err != nil {
return fmt.Errorf("get headers cursor %w", err)
}
return headerSnapshot.View(context.Background(), func(tx ethdb.Tx) error {
c2, err := tx.Cursor(dbutils.HeadersBucket)
if err != nil {
return err
}
defer c2.Close()
defer c2.Close()
return ethdb.Walk(c2, dbutils.EncodeBlockNumber(currentSnapshot), 0, func(k, v []byte) (bool, error) {
innerErr := c.Delete(k, nil)
if innerErr != nil {
return false, fmt.Errorf("remove %v err:%w", common.Bytes2Hex(k), innerErr)
}
return true, nil
})
})
}