erigon-pulse/turbo/snapshotsync/wrapdb.go
b00ris 57473175ff
Body snapshot (#2100)
* fix test

* get rid of ObjectDatabase

* sn_builder_prototype2

* save state

* save state

* integration step1

* fix lint

* fix

* fix test

* integrate migrator.finish

* fix lint

* fix build

* fix typo

* save state

* body snapshot test

* unique tx

* body snapshot generation using walk

* move methods out of test

* block data verification added

* fix lint

* test with remove works correctly

* fix lint

* remove experiment test

* fix test

* add comment

* add second layer of remove test

* rename test

* fix typos

* fix lint

* revert testdata

* body snapshot migration save state

* fix body snapshot migration

* fix after merge

* remove debug test

* debug windows build

* fix build

* fix

* fix lint

* debug

* fix

* fix windows build

* simplify snapshot management&&get rid of lazy tx

* fix lint

* fix windows path

* debug

* debug

* debug

* debug

* remove geometry experiments

* skip windows tests

* clean

* fix

* fix ;int
2021-07-06 23:33:26 +01:00

216 lines
6.1 KiB
Go

package snapshotsync
import (
"context"
"encoding/binary"
"errors"
"time"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/ethdb"
kv2 "github.com/ledgerwatch/erigon/ethdb/kv"
"github.com/ledgerwatch/erigon/log"
)
var (
BucketConfigs = map[SnapshotType]dbutils.BucketsCfg{
SnapshotType_bodies: {
dbutils.BlockBodyPrefix: dbutils.BucketConfigItem{},
dbutils.EthTx: dbutils.BucketConfigItem{},
},
SnapshotType_headers: {
dbutils.HeadersBucket: dbutils.BucketConfigItem{},
},
SnapshotType_state: {
dbutils.PlainStateBucket: dbutils.BucketConfigItem{
Flags: dbutils.DupSort,
AutoDupSortKeysConversion: true,
DupFromLen: 60,
DupToLen: 28,
},
dbutils.PlainContractCodeBucket: dbutils.BucketConfigItem{},
dbutils.CodeBucket: dbutils.BucketConfigItem{},
},
}
)
//nolint
func WrapBySnapshotsFromDir(kv ethdb.RwKV, snapshotDir string, mode SnapshotMode) (ethdb.RwKV, error) {
//todo remove it
return nil, errors.New("deprecated") //nolint
}
func WrapBySnapshotsFromDownloader(kv ethdb.RwKV, snapshots map[SnapshotType]*SnapshotsInfo) (ethdb.RwKV, error) {
snKV := kv2.NewSnapshotKV().DB(kv)
for k, v := range snapshots {
log.Info("Wrap db by", "snapshot", k.String(), "dir", v.Dbpath)
cfg := BucketConfigs[k]
snapshotKV, err := kv2.NewMDBX().Readonly().Path(v.Dbpath).WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg {
return cfg
}).Open()
if err != nil {
log.Error("Can't open snapshot", "err", err)
return nil, err
} else { //nolint
switch k {
case SnapshotType_headers:
snKV = snKV.HeadersSnapshot(snapshotKV)
case SnapshotType_bodies:
snKV = snKV.BodiesSnapshot(snapshotKV)
case SnapshotType_state:
snKV = snKV.StateSnapshot(snapshotKV)
}
}
}
return snKV.Open(), nil
}
func WrapSnapshots(chainDb ethdb.RwKV, snapshotsDir string) (ethdb.RwKV, error) {
var snapshotBlock uint64
var hasSnapshotBlock bool
if err := chainDb.View(context.Background(), func(tx ethdb.Tx) error {
v, err := tx.GetOne(dbutils.BittorrentInfoBucket, dbutils.CurrentHeadersSnapshotBlock)
if err != nil {
return err
}
hasSnapshotBlock = len(v) == 8
if hasSnapshotBlock {
snapshotBlock = binary.BigEndian.Uint64(v)
}
return nil
}); err != nil {
return chainDb, err
}
snKVOpts := kv2.NewSnapshotKV().DB(chainDb)
if hasSnapshotBlock {
snKV, innerErr := OpenHeadersSnapshot(SnapshotName(snapshotsDir, "headers", snapshotBlock))
if innerErr != nil {
return chainDb, innerErr
}
snKVOpts = snKVOpts.HeadersSnapshot(snKV)
}
return snKVOpts.Open(), nil
}
func DownloadSnapshots(torrentClient *Client, ExternalSnapshotDownloaderAddr string, networkID uint64, snapshotMode SnapshotMode, chainDb ethdb.Database) error {
var downloadedSnapshots map[SnapshotType]*SnapshotsInfo
if ExternalSnapshotDownloaderAddr != "" {
cli, cl, innerErr := NewClient(ExternalSnapshotDownloaderAddr)
if innerErr != nil {
return innerErr
}
defer cl() //nolint
_, innerErr = cli.Download(context.Background(), &DownloadSnapshotRequest{
NetworkId: networkID,
Type: snapshotMode.ToSnapshotTypes(),
})
if innerErr != nil {
return innerErr
}
waitDownload := func() (map[SnapshotType]*SnapshotsInfo, error) {
snapshotReadinessCheck := func(mp map[SnapshotType]*SnapshotsInfo, tp SnapshotType) bool {
if mp[tp].Readiness != int32(100) {
log.Info("Downloading", "snapshot", tp, "%", mp[tp].Readiness)
return false
}
return true
}
for {
downloadedSnapshots = make(map[SnapshotType]*SnapshotsInfo)
snapshots, err1 := cli.Snapshots(context.Background(), &SnapshotsRequest{NetworkId: networkID})
if err1 != nil {
return nil, err1
}
for i := range snapshots.Info {
if downloadedSnapshots[snapshots.Info[i].Type].SnapshotBlock < snapshots.Info[i].SnapshotBlock && snapshots.Info[i] != nil {
downloadedSnapshots[snapshots.Info[i].Type] = snapshots.Info[i]
}
}
downloaded := true
if snapshotMode.Headers {
if !snapshotReadinessCheck(downloadedSnapshots, SnapshotType_headers) {
downloaded = false
}
}
if snapshotMode.Bodies {
if !snapshotReadinessCheck(downloadedSnapshots, SnapshotType_bodies) {
downloaded = false
}
}
if snapshotMode.State {
if !snapshotReadinessCheck(downloadedSnapshots, SnapshotType_state) {
downloaded = false
}
}
if snapshotMode.Receipts {
if !snapshotReadinessCheck(downloadedSnapshots, SnapshotType_receipts) {
downloaded = false
}
}
if downloaded {
return downloadedSnapshots, nil
}
time.Sleep(time.Second * 10)
}
}
downloadedSnapshots, innerErr := waitDownload()
if innerErr != nil {
return innerErr
}
snapshotKV := chainDb.(ethdb.HasRwKV).RwKV()
snapshotKV, innerErr = WrapBySnapshotsFromDownloader(snapshotKV, downloadedSnapshots)
if innerErr != nil {
return innerErr
}
chainDb.(ethdb.HasRwKV).SetRwKV(snapshotKV)
innerErr = PostProcessing(chainDb, downloadedSnapshots)
if innerErr != nil {
return innerErr
}
} else {
err := torrentClient.Load(chainDb)
if err != nil {
return err
}
err = torrentClient.AddSnapshotsTorrents(context.Background(), chainDb, networkID, snapshotMode)
if err == nil {
torrentClient.Download()
var innerErr error
snapshotKV := chainDb.(ethdb.HasRwKV).RwKV()
downloadedSnapshots, innerErr := torrentClient.GetSnapshots(chainDb, networkID)
if innerErr != nil {
return innerErr
}
snapshotKV, innerErr = WrapBySnapshotsFromDownloader(snapshotKV, downloadedSnapshots)
if innerErr != nil {
return innerErr
}
chainDb.(ethdb.HasRwKV).SetRwKV(snapshotKV)
tx, err := chainDb.Begin(context.Background(), ethdb.RW)
if err != nil {
return err
}
defer tx.Rollback()
innerErr = PostProcessing(chainDb, downloadedSnapshots)
if err = tx.Commit(); err != nil {
return err
}
if innerErr != nil {
return innerErr
}
} else {
log.Error("There was an error in snapshot init. Swithing to regular sync", "err", err)
}
}
return nil
}