mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-03 09:37:38 +00:00
Snapshots: retire testing tool (#3684)
This commit is contained in:
parent
aed679c8a0
commit
f03d08c5ce
@ -1,375 +0,0 @@
|
|||||||
package debug
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/binary"
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/holiman/uint256"
|
|
||||||
"github.com/ledgerwatch/erigon-lib/kv"
|
|
||||||
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
|
|
||||||
"github.com/ledgerwatch/erigon/common"
|
|
||||||
"github.com/ledgerwatch/erigon/common/dbutils"
|
|
||||||
"github.com/ledgerwatch/erigon/consensus/ethash"
|
|
||||||
"github.com/ledgerwatch/erigon/core"
|
|
||||||
"github.com/ledgerwatch/erigon/core/rawdb"
|
|
||||||
"github.com/ledgerwatch/erigon/core/state"
|
|
||||||
"github.com/ledgerwatch/erigon/core/types"
|
|
||||||
"github.com/ledgerwatch/erigon/core/types/accounts"
|
|
||||||
"github.com/ledgerwatch/erigon/core/vm"
|
|
||||||
"github.com/ledgerwatch/erigon/ethdb"
|
|
||||||
"github.com/ledgerwatch/erigon/ethdb/snapshotdb"
|
|
||||||
"github.com/ledgerwatch/erigon/rlp"
|
|
||||||
"github.com/ledgerwatch/log/v3"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
AccountDiff = "accdiff"
|
|
||||||
StorageDiff = "stdiff"
|
|
||||||
ContractDiff = "contractdiff"
|
|
||||||
Deleted = "it is deleted"
|
|
||||||
)
|
|
||||||
|
|
||||||
func WithBlock(block uint64, key []byte) []byte {
|
|
||||||
b := make([]byte, 8)
|
|
||||||
binary.BigEndian.PutUint64(b, block)
|
|
||||||
return append(b, key...)
|
|
||||||
}
|
|
||||||
func TestMatreshkaStream(t *testing.T) {
|
|
||||||
t.Skip()
|
|
||||||
chaindatadir := "/media/b00ris/nvme/fresh_sync/tg/chaindata"
|
|
||||||
tmpDbDir := "/home/b00ris/event_stream"
|
|
||||||
|
|
||||||
chaindata, err := mdbx.Open(chaindatadir, log.New(), true)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
//tmpDb:=ethdb.NewMemDatabase()
|
|
||||||
os.RemoveAll(tmpDbDir)
|
|
||||||
|
|
||||||
db, err := mdbx.NewMDBX(log.New()).Path(tmpDbDir).WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg {
|
|
||||||
defaultBuckets[AccountDiff] = kv.TableCfgItem{}
|
|
||||||
defaultBuckets[StorageDiff] = kv.TableCfgItem{}
|
|
||||||
defaultBuckets[ContractDiff] = kv.TableCfgItem{}
|
|
||||||
return defaultBuckets
|
|
||||||
}).Open()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
chainConfig, _, genesisErr := core.CommitGenesisBlock(db, core.DefaultGenesisBlock())
|
|
||||||
if genesisErr != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if err := db.Update(context.Background(), func(tx kv.RwTx) error {
|
|
||||||
return tx.ClearBucket(kv.HeadHeaderKey)
|
|
||||||
}); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
snkv := snapshotdb.NewSnapshotKV().DB(db).
|
|
||||||
//broken
|
|
||||||
//SnapshotDB([]string{dbutils.Headers, dbutils.HeaderCanonical, dbutils.HeaderTD, dbutils.HeaderNumber, dbutils.BlockBody, dbutils.HeadHeaderKey, dbutils.Senders}, chaindata.RwDB()).
|
|
||||||
Open()
|
|
||||||
_ = chaindata
|
|
||||||
defer snkv.Close()
|
|
||||||
|
|
||||||
tx, err := snkv.BeginRw(context.Background())
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer tx.Rollback()
|
|
||||||
//
|
|
||||||
//tx, err := db.Begin(context.Background(), ethdb.RW)
|
|
||||||
//if err != nil {
|
|
||||||
// t.Fatal(err)
|
|
||||||
//}
|
|
||||||
psCursor, err := tx.Cursor(kv.PlainState)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
i := 5
|
|
||||||
err = ethdb.Walk(psCursor, []byte{}, 0, func(k, v []byte) (bool, error) {
|
|
||||||
fmt.Println(common.Bytes2Hex(k))
|
|
||||||
i--
|
|
||||||
if i == 0 {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
return true, nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
currentBlock := rawdb.ReadCurrentHeader(tx)
|
|
||||||
fmt.Println("currentBlock", currentBlock.Number.Uint64())
|
|
||||||
blockNum := uint64(1)
|
|
||||||
limit := currentBlock.Number.Uint64()
|
|
||||||
getHeader := func(hash common.Hash, number uint64) *types.Header { return rawdb.ReadHeader(tx, hash, number) }
|
|
||||||
|
|
||||||
stateReaderWriter := NewDebugReaderWriter(state.NewPlainStateReader(tx), state.NewPlainStateWriter(tx, tx, blockNum))
|
|
||||||
tt := time.Now()
|
|
||||||
ttt := time.Now()
|
|
||||||
for currentBlock := blockNum; currentBlock < blockNum+limit; currentBlock++ {
|
|
||||||
stateReaderWriter.UpdateWriter(state.NewPlainStateWriter(tx, tx, currentBlock))
|
|
||||||
block, err := rawdb.ReadBlockByNumber(tx, currentBlock)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err, currentBlock)
|
|
||||||
}
|
|
||||||
|
|
||||||
contractHasTEVM := ethdb.GetHasTEVM(tx)
|
|
||||||
|
|
||||||
_, _, err = core.ExecuteBlockEphemerally(chainConfig, &vm.Config{NoReceipts: true}, getHeader, ethash.NewFaker(), block, stateReaderWriter, stateReaderWriter, nil, nil, contractHasTEVM)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err, currentBlock)
|
|
||||||
}
|
|
||||||
cs := stateReaderWriter.UpdatedAccouts()
|
|
||||||
accDiffLen := len(cs)
|
|
||||||
for i := range cs {
|
|
||||||
if len(cs[i].Value) == 0 {
|
|
||||||
cs[i].Value = []byte(Deleted)
|
|
||||||
}
|
|
||||||
err = tx.Put(AccountDiff, WithBlock(currentBlock, cs[i].Key), cs[i].Value)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err, cs[i].Key, currentBlock)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
cs = stateReaderWriter.UpdatedStorage()
|
|
||||||
stDiffLen := len(cs)
|
|
||||||
for i := range cs {
|
|
||||||
if len(cs[i].Value) == 0 {
|
|
||||||
cs[i].Value = []byte(Deleted)
|
|
||||||
}
|
|
||||||
err = tx.Put(StorageDiff, WithBlock(currentBlock, cs[i].Key), cs[i].Value)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err, cs[i].Key, currentBlock)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
cs = stateReaderWriter.UpdatedCodes()
|
|
||||||
codesDiffLen := len(cs)
|
|
||||||
for i := range cs {
|
|
||||||
if len(cs[i].Value) == 0 {
|
|
||||||
cs[i].Value = []byte(Deleted)
|
|
||||||
}
|
|
||||||
err = tx.Put(ContractDiff, WithBlock(currentBlock, cs[i].Key), cs[i].Value)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err, cs[i].Key, currentBlock)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
stateReaderWriter.Reset()
|
|
||||||
if currentBlock%10000 == 0 {
|
|
||||||
err = tx.Commit()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err, currentBlock)
|
|
||||||
}
|
|
||||||
tx, err = snkv.BeginRw(context.Background())
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err, currentBlock)
|
|
||||||
}
|
|
||||||
defer tx.Rollback()
|
|
||||||
|
|
||||||
dr := time.Since(ttt)
|
|
||||||
fmt.Println(currentBlock, "finished", "acc-", accDiffLen, "st-", stDiffLen, "codes - ", codesDiffLen, "all -", time.Since(tt), "chunk - ", dr, "blocks/s", 10000/dr.Seconds())
|
|
||||||
ttt = time.Now()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
err = tx.Commit()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println("End")
|
|
||||||
//spew.Dump("readAcc",len(stateReaderWriter.readAcc))
|
|
||||||
//spew.Dump("readStr",len(stateReaderWriter.readStorage))
|
|
||||||
//spew.Dump("createdContracts", len(stateReaderWriter.createdContracts))
|
|
||||||
//spew.Dump("deleted",len(stateReaderWriter.deletedAcc))
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ state.StateReader = &DebugReaderWriter{}
|
|
||||||
var _ state.WriterWithChangeSets = &DebugReaderWriter{}
|
|
||||||
|
|
||||||
func NewDebugReaderWriter(r state.StateReader, w state.WriterWithChangeSets) *DebugReaderWriter {
|
|
||||||
return &DebugReaderWriter{
|
|
||||||
r: r,
|
|
||||||
w: w,
|
|
||||||
//readAcc: make(map[common.Address]struct{}),
|
|
||||||
//readStorage: make(map[string]struct{}),
|
|
||||||
//readCodes: make(map[common.Hash]struct{}),
|
|
||||||
//readIncarnations: make(map[common.Address]struct{}),
|
|
||||||
|
|
||||||
updatedAcc: make(map[common.Address][]byte),
|
|
||||||
updatedStorage: make(map[string][]byte),
|
|
||||||
updatedCodes: make(map[common.Hash][]byte),
|
|
||||||
//deletedAcc: make(map[common.Address]struct{}),
|
|
||||||
//createdContracts: make(map[common.Address]struct{}),
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type DebugReaderWriter struct {
|
|
||||||
r state.StateReader
|
|
||||||
w state.WriterWithChangeSets
|
|
||||||
//readAcc map[common.Address]struct{}
|
|
||||||
//readStorage map[string]struct{}
|
|
||||||
//readCodes map[common.Hash] struct{}
|
|
||||||
//readIncarnations map[common.Address] struct{}
|
|
||||||
updatedAcc map[common.Address][]byte
|
|
||||||
updatedStorage map[string][]byte
|
|
||||||
updatedCodes map[common.Hash][]byte
|
|
||||||
//deletedAcc map[common.Address]struct{}
|
|
||||||
//createdContracts map[common.Address]struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DebugReaderWriter) Reset() {
|
|
||||||
d.updatedAcc = map[common.Address][]byte{}
|
|
||||||
d.updatedStorage = map[string][]byte{}
|
|
||||||
d.updatedCodes = map[common.Hash][]byte{}
|
|
||||||
}
|
|
||||||
func (d *DebugReaderWriter) UpdateWriter(w state.WriterWithChangeSets) {
|
|
||||||
d.w = w
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DebugReaderWriter) ReadAccountData(address common.Address) (*accounts.Account, error) {
|
|
||||||
//d.readAcc[address] = struct{}{}
|
|
||||||
return d.r.ReadAccountData(address)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DebugReaderWriter) ReadAccountStorage(address common.Address, incarnation uint64, key *common.Hash) ([]byte, error) {
|
|
||||||
//d.readStorage[string(dbutils.PlainGenerateCompositeStorageKey(address.Bytes(),incarnation, key.Bytes()))] = struct{}{}
|
|
||||||
return d.r.ReadAccountStorage(address, incarnation, key)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DebugReaderWriter) ReadAccountCode(address common.Address, incarnation uint64, codeHash common.Hash) ([]byte, error) {
|
|
||||||
//d.readCodes[codeHash] = struct{}{}
|
|
||||||
return d.r.ReadAccountCode(address, incarnation, codeHash)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DebugReaderWriter) ReadAccountCodeSize(address common.Address, incarnation uint64, codeHash common.Hash) (int, error) {
|
|
||||||
return d.r.ReadAccountCodeSize(address, incarnation, codeHash)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DebugReaderWriter) ReadAccountIncarnation(address common.Address) (uint64, error) {
|
|
||||||
//d.readIncarnations[address] = struct{}{}
|
|
||||||
return d.r.ReadAccountIncarnation(address)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DebugReaderWriter) WriteChangeSets() error {
|
|
||||||
return d.w.WriteChangeSets()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DebugReaderWriter) WriteHistory() error {
|
|
||||||
return d.w.WriteHistory()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DebugReaderWriter) UpdateAccountData(address common.Address, original, account *accounts.Account) error {
|
|
||||||
b, err := rlp.EncodeToBytes(account)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
d.updatedAcc[address] = b
|
|
||||||
return d.w.UpdateAccountData(address, original, account)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DebugReaderWriter) UpdateAccountCode(address common.Address, incarnation uint64, codeHash common.Hash, code []byte) error {
|
|
||||||
d.updatedCodes[codeHash] = code
|
|
||||||
return d.w.UpdateAccountCode(address, incarnation, codeHash, code)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DebugReaderWriter) DeleteAccount(address common.Address, original *accounts.Account) error {
|
|
||||||
d.updatedAcc[address] = nil
|
|
||||||
//d.deletedAcc[address]= struct{}{}
|
|
||||||
return d.w.DeleteAccount(address, original)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DebugReaderWriter) WriteAccountStorage(address common.Address, incarnation uint64, key *common.Hash, original, value *uint256.Int) error {
|
|
||||||
d.updatedStorage[string(dbutils.PlainGenerateCompositeStorageKey(address.Bytes(), incarnation, key.Bytes()))] = value.Bytes()
|
|
||||||
return d.w.WriteAccountStorage(address, incarnation, key, original, value)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DebugReaderWriter) CreateContract(address common.Address) error {
|
|
||||||
//d.createdContracts[address] = struct{}{}
|
|
||||||
return d.w.CreateContract(address)
|
|
||||||
}
|
|
||||||
|
|
||||||
type Change struct {
|
|
||||||
Key []byte
|
|
||||||
Value []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DebugReaderWriter) UpdatedAccouts() []Change {
|
|
||||||
ch := make([]Change, 0, len(d.updatedAcc))
|
|
||||||
for k, v := range d.updatedAcc {
|
|
||||||
ch = append(ch, Change{
|
|
||||||
Key: common.CopyBytes(k.Bytes()),
|
|
||||||
Value: common.CopyBytes(v),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return ch
|
|
||||||
}
|
|
||||||
func (d *DebugReaderWriter) UpdatedStorage() []Change {
|
|
||||||
ch := make([]Change, 0, len(d.updatedStorage))
|
|
||||||
for k, v := range d.updatedStorage {
|
|
||||||
ch = append(ch, Change{
|
|
||||||
Key: common.CopyBytes([]byte(k)),
|
|
||||||
Value: common.CopyBytes(v),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return ch
|
|
||||||
|
|
||||||
}
|
|
||||||
func (d *DebugReaderWriter) UpdatedCodes() []Change {
|
|
||||||
ch := make([]Change, 0, len(d.updatedCodes))
|
|
||||||
for k, v := range d.updatedCodes {
|
|
||||||
ch = append(ch, Change{
|
|
||||||
Key: common.CopyBytes(k.Bytes()),
|
|
||||||
Value: common.CopyBytes(v),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return ch
|
|
||||||
}
|
|
||||||
|
|
||||||
//func (d *DebugReaderWriter) AllAccounts() map[common.Address]struct{} {
|
|
||||||
// accs:=make(map[common.Address]struct{})
|
|
||||||
// for i:=range d.readAcc {
|
|
||||||
// accs[i]=struct{}{}
|
|
||||||
// }
|
|
||||||
// for i:=range d.updatedAcc {
|
|
||||||
// accs[i]=struct{}{}
|
|
||||||
// }
|
|
||||||
// for i:=range d.readIncarnations {
|
|
||||||
// accs[i]=struct{}{}
|
|
||||||
// }
|
|
||||||
// for i:=range d.deletedAcc {
|
|
||||||
// accs[i]=struct{}{}
|
|
||||||
// }
|
|
||||||
// for i:=range d.createdContracts {
|
|
||||||
// accs[i]=struct{}{}
|
|
||||||
// }
|
|
||||||
// return accs
|
|
||||||
//}
|
|
||||||
//func (d *DebugReaderWriter) AllStorage() map[string]struct{} {
|
|
||||||
// st:=make(map[string]struct{})
|
|
||||||
// for i:=range d.readStorage {
|
|
||||||
// st[i]=struct{}{}
|
|
||||||
// }
|
|
||||||
// for i:=range d.updatedStorage {
|
|
||||||
// st[i]=struct{}{}
|
|
||||||
// }
|
|
||||||
// return st
|
|
||||||
//}
|
|
||||||
//func (d *DebugReaderWriter) AllCodes() map[common.Hash]struct{} {
|
|
||||||
// c:=make(map[common.Hash]struct{})
|
|
||||||
// for i:=range d.readCodes {
|
|
||||||
// c[i]=struct{}{}
|
|
||||||
// }
|
|
||||||
// for i:=range d.updatedCodes {
|
|
||||||
// c[i]=struct{}{}
|
|
||||||
// }
|
|
||||||
// return c
|
|
||||||
//}
|
|
@ -243,11 +243,11 @@ func AddTorrentFiles(ctx context.Context, snapshotsDir *dir.Rw, torrentClient *t
|
|||||||
// ResolveAbsentTorrents - add hard-coded hashes (if client doesn't have) as magnet links and download everything
|
// ResolveAbsentTorrents - add hard-coded hashes (if client doesn't have) as magnet links and download everything
|
||||||
func ResolveAbsentTorrents(ctx context.Context, torrentClient *torrent.Client, preverifiedHashes []metainfo.Hash, snapshotDir *dir.Rw, silent bool) error {
|
func ResolveAbsentTorrents(ctx context.Context, torrentClient *torrent.Client, preverifiedHashes []metainfo.Hash, snapshotDir *dir.Rw, silent bool) error {
|
||||||
mi := &metainfo.MetaInfo{AnnounceList: Trackers}
|
mi := &metainfo.MetaInfo{AnnounceList: Trackers}
|
||||||
for _, infoHash := range preverifiedHashes {
|
for i := range preverifiedHashes {
|
||||||
if _, ok := torrentClient.Torrent(infoHash); ok {
|
if _, ok := torrentClient.Torrent(preverifiedHashes[i]); ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
magnet := mi.Magnet(&infoHash, nil)
|
magnet := mi.Magnet(&preverifiedHashes[i], nil)
|
||||||
t, err := torrentClient.AddMagnet(magnet.String())
|
t, err := torrentClient.AddMagnet(magnet.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -3,7 +3,7 @@ package downloader
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"crypto/sha1"
|
"crypto/sha1" //nolint:gosec
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@ -206,7 +206,7 @@ func verifyTorrent(info *metainfo.Info, root string, consumer func(i int, good b
|
|||||||
span.InitIndex()
|
span.InitIndex()
|
||||||
for i, numPieces := 0, info.NumPieces(); i < numPieces; i += 1 {
|
for i, numPieces := 0, info.NumPieces(); i < numPieces; i += 1 {
|
||||||
p := info.Piece(i)
|
p := info.Piece(i)
|
||||||
hash := sha1.New()
|
hash := sha1.New() //nolint:gosec
|
||||||
_, err := io.Copy(hash, io.NewSectionReader(span, p.Offset(), p.Length()))
|
_, err := io.Copy(hash, io.NewSectionReader(span, p.Offset(), p.Length()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -403,7 +403,6 @@ func retireBlocks(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Context
|
|||||||
}
|
}
|
||||||
//TODO: avoid too large deletes
|
//TODO: avoid too large deletes
|
||||||
|
|
||||||
log.Info("[snapshots] Retire blocks", "from", blockFrom, "to", blockTo)
|
|
||||||
chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID)
|
chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID)
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@ -432,6 +431,5 @@ func retireBlocks(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Context
|
|||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
}()
|
}()
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
fmt.Printf("sn runtime dump: %d-%d\n", blockFrom, blockTo)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
2
go.mod
2
go.mod
@ -40,7 +40,7 @@ require (
|
|||||||
github.com/json-iterator/go v1.1.12
|
github.com/json-iterator/go v1.1.12
|
||||||
github.com/julienschmidt/httprouter v1.3.0
|
github.com/julienschmidt/httprouter v1.3.0
|
||||||
github.com/kevinburke/go-bindata v3.21.0+incompatible
|
github.com/kevinburke/go-bindata v3.21.0+incompatible
|
||||||
github.com/ledgerwatch/erigon-lib v0.0.0-20220310121515-3123b6d895c5
|
github.com/ledgerwatch/erigon-lib v0.0.0-20220312093458-c1f1365f9224
|
||||||
github.com/ledgerwatch/log/v3 v3.4.1
|
github.com/ledgerwatch/log/v3 v3.4.1
|
||||||
github.com/ledgerwatch/secp256k1 v1.0.0
|
github.com/ledgerwatch/secp256k1 v1.0.0
|
||||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
|
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
|
||||||
|
4
go.sum
4
go.sum
@ -641,8 +641,8 @@ github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3P
|
|||||||
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
|
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
|
||||||
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
|
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
|
||||||
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
|
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
|
||||||
github.com/ledgerwatch/erigon-lib v0.0.0-20220310121515-3123b6d895c5 h1:f81JYvbRxP0T//AG+wmfZNxiwz2mDPK1cVJuhwcAiYc=
|
github.com/ledgerwatch/erigon-lib v0.0.0-20220312093458-c1f1365f9224 h1:GqWMSF+pwVuW7cAosXMOzH/h3sIrEQ3TM4Ivwdl+D4M=
|
||||||
github.com/ledgerwatch/erigon-lib v0.0.0-20220310121515-3123b6d895c5/go.mod h1:mag5WaGOUTVOLvFkT4wpjR5YHMmm4hynWJ3YfQ44Elg=
|
github.com/ledgerwatch/erigon-lib v0.0.0-20220312093458-c1f1365f9224/go.mod h1:mag5WaGOUTVOLvFkT4wpjR5YHMmm4hynWJ3YfQ44Elg=
|
||||||
github.com/ledgerwatch/log/v3 v3.4.1 h1:/xGwlVulXnsO9Uq+tzaExc8OWmXXHU0dnLalpbnY5Bc=
|
github.com/ledgerwatch/log/v3 v3.4.1 h1:/xGwlVulXnsO9Uq+tzaExc8OWmXXHU0dnLalpbnY5Bc=
|
||||||
github.com/ledgerwatch/log/v3 v3.4.1/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY=
|
github.com/ledgerwatch/log/v3 v3.4.1/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY=
|
||||||
github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ=
|
github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ=
|
||||||
|
@ -72,6 +72,7 @@ var snapshotCommand = cli.Command{
|
|||||||
utils.DataDirFlag,
|
utils.DataDirFlag,
|
||||||
SnapshotFromFlag,
|
SnapshotFromFlag,
|
||||||
SnapshotToFlag,
|
SnapshotToFlag,
|
||||||
|
SnapshotEveryFlag,
|
||||||
}, debug.Flags...),
|
}, debug.Flags...),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -88,6 +89,11 @@ var (
|
|||||||
Usage: "To block number. Zero - means unlimited.",
|
Usage: "To block number. Zero - means unlimited.",
|
||||||
Value: 0,
|
Value: 0,
|
||||||
}
|
}
|
||||||
|
SnapshotEveryFlag = cli.Uint64Flag{
|
||||||
|
Name: "every",
|
||||||
|
Usage: "Do operation every N blocks",
|
||||||
|
Value: 1_000,
|
||||||
|
}
|
||||||
SnapshotSegmentSizeFlag = cli.Uint64Flag{
|
SnapshotSegmentSizeFlag = cli.Uint64Flag{
|
||||||
Name: "segment.size",
|
Name: "segment.size",
|
||||||
Usage: "Amount of blocks in each segment",
|
Usage: "Amount of blocks in each segment",
|
||||||
@ -135,8 +141,9 @@ func doRetireCommand(cliCtx *cli.Context) error {
|
|||||||
tmpDir := filepath.Join(datadir, etl.TmpDirName)
|
tmpDir := filepath.Join(datadir, etl.TmpDirName)
|
||||||
from := cliCtx.Uint64(SnapshotFromFlag.Name)
|
from := cliCtx.Uint64(SnapshotFromFlag.Name)
|
||||||
to := cliCtx.Uint64(SnapshotToFlag.Name)
|
to := cliCtx.Uint64(SnapshotToFlag.Name)
|
||||||
|
every := cliCtx.Uint64(SnapshotEveryFlag.Name)
|
||||||
|
|
||||||
chainDB := mdbx.NewMDBX(log.New()).Path(path.Join(datadir, "chaindata")).Readonly().MustOpen()
|
chainDB := mdbx.NewMDBX(log.New()).Label(kv.ChainDB).Path(path.Join(datadir, "chaindata")).Readonly().MustOpen()
|
||||||
defer chainDB.Close()
|
defer chainDB.Close()
|
||||||
|
|
||||||
cfg := ethconfig.NewSnapshotCfg(true, true)
|
cfg := ethconfig.NewSnapshotCfg(true, true)
|
||||||
@ -148,10 +155,13 @@ func doRetireCommand(cliCtx *cli.Context) error {
|
|||||||
chainConfig := tool.ChainConfigFromDB(chainDB)
|
chainConfig := tool.ChainConfigFromDB(chainDB)
|
||||||
chainID, _ := uint256.FromBig(chainConfig.ChainID)
|
chainID, _ := uint256.FromBig(chainConfig.ChainID)
|
||||||
snapshots := snapshotsync.NewRoSnapshots(cfg, snapshotDir)
|
snapshots := snapshotsync.NewRoSnapshots(cfg, snapshotDir)
|
||||||
|
snapshots.ReopenSegments()
|
||||||
|
|
||||||
if err := snapshotsync.RetireBlocks(ctx, from, to, *chainID, tmpDir, snapshots, chainDB, 1, log.LvlInfo); err != nil {
|
for i := from; i < to; i += every {
|
||||||
panic(err)
|
if err := snapshotsync.RetireBlocks(ctx, i, i+every, *chainID, tmpDir, snapshots, chainDB, runtime.NumCPU()/2, log.LvlInfo); err != nil {
|
||||||
//return err
|
panic(err)
|
||||||
|
//return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -171,7 +181,7 @@ func doSnapshotCommand(cliCtx *cli.Context) error {
|
|||||||
tmpDir := filepath.Join(datadir, etl.TmpDirName)
|
tmpDir := filepath.Join(datadir, etl.TmpDirName)
|
||||||
dir.MustExist(tmpDir)
|
dir.MustExist(tmpDir)
|
||||||
|
|
||||||
chainDB := mdbx.NewMDBX(log.New()).Path(filepath.Join(datadir, "chaindata")).Readonly().MustOpen()
|
chainDB := mdbx.NewMDBX(log.New()).Label(kv.ChainDB).Path(filepath.Join(datadir, "chaindata")).Readonly().MustOpen()
|
||||||
defer chainDB.Close()
|
defer chainDB.Close()
|
||||||
|
|
||||||
if err := snapshotBlocks(ctx, chainDB, fromBlock, toBlock, segmentSize, snapshotDir, tmpDir); err != nil {
|
if err := snapshotBlocks(ctx, chainDB, fromBlock, toBlock, segmentSize, snapshotDir, tmpDir); err != nil {
|
||||||
|
@ -666,7 +666,7 @@ func min(a, b uint64) uint64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func RetireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB, workers int, lvl log.Lvl) error {
|
func RetireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB, workers int, lvl log.Lvl) error {
|
||||||
log.Log(lvl, "[snapshots] Retire Blocks", "from", blockFrom, "to", blockTo)
|
log.Log(lvl, "[snapshots] Retire Blocks", "range", fmt.Sprintf("%dk-%dk", blockFrom/1000, blockTo/1000))
|
||||||
// in future we will do it in background
|
// in future we will do it in background
|
||||||
if err := DumpBlocks(ctx, blockFrom, blockTo, DEFAULT_SEGMENT_SIZE, tmpDir, snapshots.Dir(), db, workers, lvl); err != nil {
|
if err := DumpBlocks(ctx, blockFrom, blockTo, DEFAULT_SEGMENT_SIZE, tmpDir, snapshots.Dir(), db, workers, lvl); err != nil {
|
||||||
return fmt.Errorf("DumpBlocks: %w", err)
|
return fmt.Errorf("DumpBlocks: %w", err)
|
||||||
@ -679,7 +679,6 @@ func RetireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25
|
|||||||
if len(ranges) == 0 {
|
if len(ranges) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := merger.Merge(ctx, snapshots, ranges, &dir.Rw{Path: snapshots.Dir()}); err != nil {
|
if err := merger.Merge(ctx, snapshots, ranges, &dir.Rw{Path: snapshots.Dir()}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -878,7 +877,8 @@ func DumpTxs(ctx context.Context, db kv.RoDB, segmentFile, tmpDir string, blockF
|
|||||||
}
|
}
|
||||||
|
|
||||||
_, fileName := filepath.Split(segmentFile)
|
_, fileName := filepath.Split(segmentFile)
|
||||||
log.Log(lvl, "[snapshots] Compression", "ratio", f.Ratio.String(), "file", fileName)
|
ext := filepath.Ext(fileName)
|
||||||
|
log.Log(lvl, "[snapshots] Compression", "ratio", f.Ratio.String(), "file", fileName[:len(fileName)-len(ext)])
|
||||||
|
|
||||||
return firstTxID, nil
|
return firstTxID, nil
|
||||||
}
|
}
|
||||||
@ -1427,6 +1427,8 @@ type mergeRange struct {
|
|||||||
from, to uint64
|
from, to uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r mergeRange) String() string { return fmt.Sprintf("%dk-%dk", r.from/1000, r.to/1000) }
|
||||||
|
|
||||||
func (*Merger) FindMergeRanges(snapshots *RoSnapshots) (res []mergeRange) {
|
func (*Merger) FindMergeRanges(snapshots *RoSnapshots) (res []mergeRange) {
|
||||||
for i := len(snapshots.blocks) - 1; i > 0; i-- {
|
for i := len(snapshots.blocks) - 1; i > 0; i-- {
|
||||||
sn := snapshots.blocks[i]
|
sn := snapshots.blocks[i]
|
||||||
@ -1469,6 +1471,7 @@ func (m *Merger) filesByRange(snapshots *RoSnapshots, from, to uint64) (toMergeH
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges []mergeRange, snapshotDir *dir.Rw) error {
|
func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges []mergeRange, snapshotDir *dir.Rw) error {
|
||||||
|
log.Log(m.lvl, "[snapshots] Merge segments", "ranges", fmt.Sprintf("%v", mergeRanges))
|
||||||
for _, r := range mergeRanges {
|
for _, r := range mergeRanges {
|
||||||
toMergeHeaders, toMergeBodies, toMergeTxs := m.filesByRange(snapshots, r.from, r.to)
|
toMergeHeaders, toMergeBodies, toMergeTxs := m.filesByRange(snapshots, r.from, r.to)
|
||||||
if err := m.merge(ctx, toMergeBodies, filepath.Join(snapshotDir.Path, SegmentFileName(r.from, r.to, Bodies))); err != nil {
|
if err := m.merge(ctx, toMergeBodies, filepath.Join(snapshotDir.Path, SegmentFileName(r.from, r.to, Bodies))); err != nil {
|
||||||
@ -1497,8 +1500,6 @@ func (m *Merger) merge(ctx context.Context, toMerge []string, targetFile string)
|
|||||||
_, fName := filepath.Split(f)
|
_, fName := filepath.Split(f)
|
||||||
fileNames[i] = fName
|
fileNames[i] = fName
|
||||||
}
|
}
|
||||||
_, fName := filepath.Split(targetFile)
|
|
||||||
log.Log(m.lvl, "[snapshots] Merging", "files", fileNames, "to", fName)
|
|
||||||
f, err := compress.NewCompressor(ctx, "merge", targetFile, m.tmpDir, compress.MinPatternScore, m.workers)
|
f, err := compress.NewCompressor(ctx, "merge", targetFile, m.tmpDir, compress.MinPatternScore, m.workers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -1518,6 +1519,11 @@ func (m *Merger) merge(ctx context.Context, toMerge []string, targetFile string)
|
|||||||
if err := f.AddWord(word); err != nil {
|
if err := f.AddWord(word); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
|
@ -1,60 +0,0 @@
|
|||||||
package snapshotsync
|
|
||||||
|
|
||||||
/*
|
|
||||||
import (
|
|
||||||
"github.com/ledgerwatch/erigon-lib/gointerfaces/snapshotsync"
|
|
||||||
"github.com/ledgerwatch/erigon-lib/kv"
|
|
||||||
kv2 "github.com/ledgerwatch/erigon-lib/kv/mdbx"
|
|
||||||
"github.com/ledgerwatch/erigon/ethdb/snapshotdb"
|
|
||||||
"github.com/ledgerwatch/log/v3"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
BucketConfigs = map[snapshotsync.Type]kv.TableCfg{
|
|
||||||
snapshotsync.SnapshotType_bodies: {
|
|
||||||
kv.BlockBody: kv.TableCfgItem{},
|
|
||||||
kv.EthTx: kv.TableCfgItem{},
|
|
||||||
},
|
|
||||||
snapshotsync.SnapshotType_headers: {
|
|
||||||
kv.Headers: kv.TableCfgItem{},
|
|
||||||
},
|
|
||||||
snapshotsync.SnapshotType_state: {
|
|
||||||
kv.PlainState: kv.TableCfgItem{
|
|
||||||
Flags: kv.DupSort,
|
|
||||||
AutoDupSortKeysConversion: true,
|
|
||||||
DupFromLen: 60,
|
|
||||||
DupToLen: 28,
|
|
||||||
},
|
|
||||||
kv.PlainContractCode: kv.TableCfgItem{},
|
|
||||||
kv.Code: kv.TableCfgItem{},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
func WrapBySnapshotsFromDownloader(db kv.RwDB, snapshots map[snapshotsync.Type]*snapshotsync.SnapshotsInfo) (kv.RwDB, error) {
|
|
||||||
snKV := snapshotdb.NewSnapshotKV().DB(db)
|
|
||||||
for k, v := range snapshots {
|
|
||||||
log.Info("Wrap db by", "snapshot", k.String(), "dir", v.Dbpath)
|
|
||||||
chainSnapshotCfg := BucketConfigs[k]
|
|
||||||
snapshotKV, err := kv2.NewMDBX(log.New()).Readonly().Path(v.Dbpath).WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg {
|
|
||||||
return chainSnapshotCfg
|
|
||||||
}).Open()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Error("Can't open snapshot", "err", err)
|
|
||||||
return nil, err
|
|
||||||
} else { //nolint
|
|
||||||
switch k {
|
|
||||||
case snapshotsync.SnapshotType_headers:
|
|
||||||
snKV = snKV.HeadersSnapshot(snapshotKV)
|
|
||||||
case snapshotsync.SnapshotType_bodies:
|
|
||||||
snKV = snKV.BodiesSnapshot(snapshotKV)
|
|
||||||
case snapshotsync.SnapshotType_state:
|
|
||||||
snKV = snKV.StateSnapshot(snapshotKV)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return snKV.Open(), nil
|
|
||||||
}
|
|
||||||
*/
|
|
Loading…
Reference in New Issue
Block a user