mirror of
https://gitlab.com/pulsechaincom/go-pulse.git
synced 2025-01-16 15:18:46 +00:00
26d271dfbb
* core/state/snapshot: implement storage iterator * core/state/snapshot, tests: implement helper function * core/state/snapshot: fix storage issue If an account is deleted in the tx_1 but recreated in the tx_2, the it can happen that in this diff layer, both destructedSet and storageData records this account. In this case, the storage iterator should be able to iterate the slots belong to new account but disable further iteration in deeper layers(belong to old account) * core/state/snapshot: address peter and martin's comment * core/state: address comments * core/state/snapshot: fix test
613 lines
23 KiB
Go
613 lines
23 KiB
Go
// Copyright 2019 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
// Package snapshot implements a journalled, dynamic state dump.
|
|
package snapshot
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
|
"github.com/ethereum/go-ethereum/ethdb"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/ethereum/go-ethereum/metrics"
|
|
"github.com/ethereum/go-ethereum/trie"
|
|
)
|
|
|
|
var (
|
|
snapshotCleanAccountHitMeter = metrics.NewRegisteredMeter("state/snapshot/clean/account/hit", nil)
|
|
snapshotCleanAccountMissMeter = metrics.NewRegisteredMeter("state/snapshot/clean/account/miss", nil)
|
|
snapshotCleanAccountInexMeter = metrics.NewRegisteredMeter("state/snapshot/clean/account/inex", nil)
|
|
snapshotCleanAccountReadMeter = metrics.NewRegisteredMeter("state/snapshot/clean/account/read", nil)
|
|
snapshotCleanAccountWriteMeter = metrics.NewRegisteredMeter("state/snapshot/clean/account/write", nil)
|
|
|
|
snapshotCleanStorageHitMeter = metrics.NewRegisteredMeter("state/snapshot/clean/storage/hit", nil)
|
|
snapshotCleanStorageMissMeter = metrics.NewRegisteredMeter("state/snapshot/clean/storage/miss", nil)
|
|
snapshotCleanStorageInexMeter = metrics.NewRegisteredMeter("state/snapshot/clean/storage/inex", nil)
|
|
snapshotCleanStorageReadMeter = metrics.NewRegisteredMeter("state/snapshot/clean/storage/read", nil)
|
|
snapshotCleanStorageWriteMeter = metrics.NewRegisteredMeter("state/snapshot/clean/storage/write", nil)
|
|
|
|
snapshotDirtyAccountHitMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/account/hit", nil)
|
|
snapshotDirtyAccountMissMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/account/miss", nil)
|
|
snapshotDirtyAccountInexMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/account/inex", nil)
|
|
snapshotDirtyAccountReadMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/account/read", nil)
|
|
snapshotDirtyAccountWriteMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/account/write", nil)
|
|
|
|
snapshotDirtyStorageHitMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/storage/hit", nil)
|
|
snapshotDirtyStorageMissMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/storage/miss", nil)
|
|
snapshotDirtyStorageInexMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/storage/inex", nil)
|
|
snapshotDirtyStorageReadMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/storage/read", nil)
|
|
snapshotDirtyStorageWriteMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/storage/write", nil)
|
|
|
|
snapshotDirtyAccountHitDepthHist = metrics.NewRegisteredHistogram("state/snapshot/dirty/account/hit/depth", nil, metrics.NewExpDecaySample(1028, 0.015))
|
|
snapshotDirtyStorageHitDepthHist = metrics.NewRegisteredHistogram("state/snapshot/dirty/storage/hit/depth", nil, metrics.NewExpDecaySample(1028, 0.015))
|
|
|
|
snapshotFlushAccountItemMeter = metrics.NewRegisteredMeter("state/snapshot/flush/account/item", nil)
|
|
snapshotFlushAccountSizeMeter = metrics.NewRegisteredMeter("state/snapshot/flush/account/size", nil)
|
|
snapshotFlushStorageItemMeter = metrics.NewRegisteredMeter("state/snapshot/flush/storage/item", nil)
|
|
snapshotFlushStorageSizeMeter = metrics.NewRegisteredMeter("state/snapshot/flush/storage/size", nil)
|
|
|
|
snapshotBloomIndexTimer = metrics.NewRegisteredResettingTimer("state/snapshot/bloom/index", nil)
|
|
snapshotBloomErrorGauge = metrics.NewRegisteredGaugeFloat64("state/snapshot/bloom/error", nil)
|
|
|
|
snapshotBloomAccountTrueHitMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/account/truehit", nil)
|
|
snapshotBloomAccountFalseHitMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/account/falsehit", nil)
|
|
snapshotBloomAccountMissMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/account/miss", nil)
|
|
|
|
snapshotBloomStorageTrueHitMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/storage/truehit", nil)
|
|
snapshotBloomStorageFalseHitMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/storage/falsehit", nil)
|
|
snapshotBloomStorageMissMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/storage/miss", nil)
|
|
|
|
// ErrSnapshotStale is returned from data accessors if the underlying snapshot
|
|
// layer had been invalidated due to the chain progressing forward far enough
|
|
// to not maintain the layer's original state.
|
|
ErrSnapshotStale = errors.New("snapshot stale")
|
|
|
|
// ErrNotCoveredYet is returned from data accessors if the underlying snapshot
|
|
// is being generated currently and the requested data item is not yet in the
|
|
// range of accounts covered.
|
|
ErrNotCoveredYet = errors.New("not covered yet")
|
|
|
|
// errSnapshotCycle is returned if a snapshot is attempted to be inserted
|
|
// that forms a cycle in the snapshot tree.
|
|
errSnapshotCycle = errors.New("snapshot cycle")
|
|
)
|
|
|
|
// Snapshot represents the functionality supported by a snapshot storage layer.
|
|
type Snapshot interface {
|
|
// Root returns the root hash for which this snapshot was made.
|
|
Root() common.Hash
|
|
|
|
// Account directly retrieves the account associated with a particular hash in
|
|
// the snapshot slim data format.
|
|
Account(hash common.Hash) (*Account, error)
|
|
|
|
// AccountRLP directly retrieves the account RLP associated with a particular
|
|
// hash in the snapshot slim data format.
|
|
AccountRLP(hash common.Hash) ([]byte, error)
|
|
|
|
// Storage directly retrieves the storage data associated with a particular hash,
|
|
// within a particular account.
|
|
Storage(accountHash, storageHash common.Hash) ([]byte, error)
|
|
}
|
|
|
|
// snapshot is the internal version of the snapshot data layer that supports some
|
|
// additional methods compared to the public API.
|
|
type snapshot interface {
|
|
Snapshot
|
|
|
|
// Parent returns the subsequent layer of a snapshot, or nil if the base was
|
|
// reached.
|
|
//
|
|
// Note, the method is an internal helper to avoid type switching between the
|
|
// disk and diff layers. There is no locking involved.
|
|
Parent() snapshot
|
|
|
|
// Update creates a new layer on top of the existing snapshot diff tree with
|
|
// the specified data items.
|
|
//
|
|
// Note, the maps are retained by the method to avoid copying everything.
|
|
Update(blockRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer
|
|
|
|
// Journal commits an entire diff hierarchy to disk into a single journal entry.
|
|
// This is meant to be used during shutdown to persist the snapshot without
|
|
// flattening everything down (bad for reorgs).
|
|
Journal(buffer *bytes.Buffer) (common.Hash, error)
|
|
|
|
// Stale return whether this layer has become stale (was flattened across) or
|
|
// if it's still live.
|
|
Stale() bool
|
|
|
|
// AccountIterator creates an account iterator over an arbitrary layer.
|
|
AccountIterator(seek common.Hash) AccountIterator
|
|
|
|
// StorageIterator creates a storage iterator over an arbitrary layer.
|
|
StorageIterator(account common.Hash, seek common.Hash) (StorageIterator, bool)
|
|
}
|
|
|
|
// SnapshotTree is an Ethereum state snapshot tree. It consists of one persistent
|
|
// base layer backed by a key-value store, on top of which arbitrarily many in-
|
|
// memory diff layers are topped. The memory diffs can form a tree with branching,
|
|
// but the disk layer is singleton and common to all. If a reorg goes deeper than
|
|
// the disk layer, everything needs to be deleted.
|
|
//
|
|
// The goal of a state snapshot is twofold: to allow direct access to account and
|
|
// storage data to avoid expensive multi-level trie lookups; and to allow sorted,
|
|
// cheap iteration of the account/storage tries for sync aid.
|
|
type Tree struct {
|
|
diskdb ethdb.KeyValueStore // Persistent database to store the snapshot
|
|
triedb *trie.Database // In-memory cache to access the trie through
|
|
cache int // Megabytes permitted to use for read caches
|
|
layers map[common.Hash]snapshot // Collection of all known layers
|
|
lock sync.RWMutex
|
|
}
|
|
|
|
// New attempts to load an already existing snapshot from a persistent key-value
|
|
// store (with a number of memory layers from a journal), ensuring that the head
|
|
// of the snapshot matches the expected one.
|
|
//
|
|
// If the snapshot is missing or inconsistent, the entirety is deleted and will
|
|
// be reconstructed from scratch based on the tries in the key-value store, on a
|
|
// background thread.
|
|
func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, async bool) *Tree {
|
|
// Create a new, empty snapshot tree
|
|
snap := &Tree{
|
|
diskdb: diskdb,
|
|
triedb: triedb,
|
|
cache: cache,
|
|
layers: make(map[common.Hash]snapshot),
|
|
}
|
|
if !async {
|
|
defer snap.waitBuild()
|
|
}
|
|
// Attempt to load a previously persisted snapshot and rebuild one if failed
|
|
head, err := loadSnapshot(diskdb, triedb, cache, root)
|
|
if err != nil {
|
|
log.Warn("Failed to load snapshot, regenerating", "err", err)
|
|
snap.Rebuild(root)
|
|
return snap
|
|
}
|
|
// Existing snapshot loaded, seed all the layers
|
|
for head != nil {
|
|
snap.layers[head.Root()] = head
|
|
head = head.Parent()
|
|
}
|
|
return snap
|
|
}
|
|
|
|
// waitBuild blocks until the snapshot finishes rebuilding. This method is meant
|
|
// to be used by tests to ensure we're testing what we believe we are.
|
|
func (t *Tree) waitBuild() {
|
|
// Find the rebuild termination channel
|
|
var done chan struct{}
|
|
|
|
t.lock.RLock()
|
|
for _, layer := range t.layers {
|
|
if layer, ok := layer.(*diskLayer); ok {
|
|
done = layer.genPending
|
|
break
|
|
}
|
|
}
|
|
t.lock.RUnlock()
|
|
|
|
// Wait until the snapshot is generated
|
|
if done != nil {
|
|
<-done
|
|
}
|
|
}
|
|
|
|
// Snapshot retrieves a snapshot belonging to the given block root, or nil if no
|
|
// snapshot is maintained for that block.
|
|
func (t *Tree) Snapshot(blockRoot common.Hash) Snapshot {
|
|
t.lock.RLock()
|
|
defer t.lock.RUnlock()
|
|
|
|
return t.layers[blockRoot]
|
|
}
|
|
|
|
// Update adds a new snapshot into the tree, if that can be linked to an existing
|
|
// old parent. It is disallowed to insert a disk layer (the origin of all).
|
|
func (t *Tree) Update(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) error {
|
|
// Reject noop updates to avoid self-loops in the snapshot tree. This is a
|
|
// special case that can only happen for Clique networks where empty blocks
|
|
// don't modify the state (0 block subsidy).
|
|
//
|
|
// Although we could silently ignore this internally, it should be the caller's
|
|
// responsibility to avoid even attempting to insert such a snapshot.
|
|
if blockRoot == parentRoot {
|
|
return errSnapshotCycle
|
|
}
|
|
// Generate a new snapshot on top of the parent
|
|
parent := t.Snapshot(parentRoot).(snapshot)
|
|
if parent == nil {
|
|
return fmt.Errorf("parent [%#x] snapshot missing", parentRoot)
|
|
}
|
|
snap := parent.Update(blockRoot, destructs, accounts, storage)
|
|
|
|
// Save the new snapshot for later
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
t.layers[snap.root] = snap
|
|
return nil
|
|
}
|
|
|
|
// Cap traverses downwards the snapshot tree from a head block hash until the
|
|
// number of allowed layers are crossed. All layers beyond the permitted number
|
|
// are flattened downwards.
|
|
func (t *Tree) Cap(root common.Hash, layers int) error {
|
|
// Retrieve the head snapshot to cap from
|
|
snap := t.Snapshot(root)
|
|
if snap == nil {
|
|
return fmt.Errorf("snapshot [%#x] missing", root)
|
|
}
|
|
diff, ok := snap.(*diffLayer)
|
|
if !ok {
|
|
return fmt.Errorf("snapshot [%#x] is disk layer", root)
|
|
}
|
|
// Run the internal capping and discard all stale layers
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
// Flattening the bottom-most diff layer requires special casing since there's
|
|
// no child to rewire to the grandparent. In that case we can fake a temporary
|
|
// child for the capping and then remove it.
|
|
var persisted *diskLayer
|
|
|
|
switch layers {
|
|
case 0:
|
|
// If full commit was requested, flatten the diffs and merge onto disk
|
|
diff.lock.RLock()
|
|
base := diffToDisk(diff.flatten().(*diffLayer))
|
|
diff.lock.RUnlock()
|
|
|
|
// Replace the entire snapshot tree with the flat base
|
|
t.layers = map[common.Hash]snapshot{base.root: base}
|
|
return nil
|
|
|
|
case 1:
|
|
// If full flattening was requested, flatten the diffs but only merge if the
|
|
// memory limit was reached
|
|
var (
|
|
bottom *diffLayer
|
|
base *diskLayer
|
|
)
|
|
diff.lock.RLock()
|
|
bottom = diff.flatten().(*diffLayer)
|
|
if bottom.memory >= aggregatorMemoryLimit {
|
|
base = diffToDisk(bottom)
|
|
}
|
|
diff.lock.RUnlock()
|
|
|
|
// If all diff layers were removed, replace the entire snapshot tree
|
|
if base != nil {
|
|
t.layers = map[common.Hash]snapshot{base.root: base}
|
|
return nil
|
|
}
|
|
// Merge the new aggregated layer into the snapshot tree, clean stales below
|
|
t.layers[bottom.root] = bottom
|
|
|
|
default:
|
|
// Many layers requested to be retained, cap normally
|
|
persisted = t.cap(diff, layers)
|
|
}
|
|
// Remove any layer that is stale or links into a stale layer
|
|
children := make(map[common.Hash][]common.Hash)
|
|
for root, snap := range t.layers {
|
|
if diff, ok := snap.(*diffLayer); ok {
|
|
parent := diff.parent.Root()
|
|
children[parent] = append(children[parent], root)
|
|
}
|
|
}
|
|
var remove func(root common.Hash)
|
|
remove = func(root common.Hash) {
|
|
delete(t.layers, root)
|
|
for _, child := range children[root] {
|
|
remove(child)
|
|
}
|
|
delete(children, root)
|
|
}
|
|
for root, snap := range t.layers {
|
|
if snap.Stale() {
|
|
remove(root)
|
|
}
|
|
}
|
|
// If the disk layer was modified, regenerate all the cummulative blooms
|
|
if persisted != nil {
|
|
var rebloom func(root common.Hash)
|
|
rebloom = func(root common.Hash) {
|
|
if diff, ok := t.layers[root].(*diffLayer); ok {
|
|
diff.rebloom(persisted)
|
|
}
|
|
for _, child := range children[root] {
|
|
rebloom(child)
|
|
}
|
|
}
|
|
rebloom(persisted.root)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// cap traverses downwards the diff tree until the number of allowed layers are
|
|
// crossed. All diffs beyond the permitted number are flattened downwards. If the
|
|
// layer limit is reached, memory cap is also enforced (but not before).
|
|
//
|
|
// The method returns the new disk layer if diffs were persistend into it.
|
|
func (t *Tree) cap(diff *diffLayer, layers int) *diskLayer {
|
|
// Dive until we run out of layers or reach the persistent database
|
|
for ; layers > 2; layers-- {
|
|
// If we still have diff layers below, continue down
|
|
if parent, ok := diff.parent.(*diffLayer); ok {
|
|
diff = parent
|
|
} else {
|
|
// Diff stack too shallow, return without modifications
|
|
return nil
|
|
}
|
|
}
|
|
// We're out of layers, flatten anything below, stopping if it's the disk or if
|
|
// the memory limit is not yet exceeded.
|
|
switch parent := diff.parent.(type) {
|
|
case *diskLayer:
|
|
return nil
|
|
|
|
case *diffLayer:
|
|
// Flatten the parent into the grandparent. The flattening internally obtains a
|
|
// write lock on grandparent.
|
|
flattened := parent.flatten().(*diffLayer)
|
|
t.layers[flattened.root] = flattened
|
|
|
|
diff.lock.Lock()
|
|
defer diff.lock.Unlock()
|
|
|
|
diff.parent = flattened
|
|
if flattened.memory < aggregatorMemoryLimit {
|
|
// Accumulator layer is smaller than the limit, so we can abort, unless
|
|
// there's a snapshot being generated currently. In that case, the trie
|
|
// will move fron underneath the generator so we **must** merge all the
|
|
// partial data down into the snapshot and restart the generation.
|
|
if flattened.parent.(*diskLayer).genAbort == nil {
|
|
return nil
|
|
}
|
|
}
|
|
default:
|
|
panic(fmt.Sprintf("unknown data layer: %T", parent))
|
|
}
|
|
// If the bottom-most layer is larger than our memory cap, persist to disk
|
|
bottom := diff.parent.(*diffLayer)
|
|
|
|
bottom.lock.RLock()
|
|
base := diffToDisk(bottom)
|
|
bottom.lock.RUnlock()
|
|
|
|
t.layers[base.root] = base
|
|
diff.parent = base
|
|
return base
|
|
}
|
|
|
|
// diffToDisk merges a bottom-most diff into the persistent disk layer underneath
|
|
// it. The method will panic if called onto a non-bottom-most diff layer.
|
|
func diffToDisk(bottom *diffLayer) *diskLayer {
|
|
var (
|
|
base = bottom.parent.(*diskLayer)
|
|
batch = base.diskdb.NewBatch()
|
|
stats *generatorStats
|
|
)
|
|
// If the disk layer is running a snapshot generator, abort it
|
|
if base.genAbort != nil {
|
|
abort := make(chan *generatorStats)
|
|
base.genAbort <- abort
|
|
stats = <-abort
|
|
}
|
|
// Start by temporarily deleting the current snapshot block marker. This
|
|
// ensures that in the case of a crash, the entire snapshot is invalidated.
|
|
rawdb.DeleteSnapshotRoot(batch)
|
|
|
|
// Mark the original base as stale as we're going to create a new wrapper
|
|
base.lock.Lock()
|
|
if base.stale {
|
|
panic("parent disk layer is stale") // we've committed into the same base from two children, boo
|
|
}
|
|
base.stale = true
|
|
base.lock.Unlock()
|
|
|
|
// Destroy all the destructed accounts from the database
|
|
for hash := range bottom.destructSet {
|
|
// Skip any account not covered yet by the snapshot
|
|
if base.genMarker != nil && bytes.Compare(hash[:], base.genMarker) > 0 {
|
|
continue
|
|
}
|
|
// Remove all storage slots
|
|
rawdb.DeleteAccountSnapshot(batch, hash)
|
|
base.cache.Set(hash[:], nil)
|
|
|
|
it := rawdb.IterateStorageSnapshots(base.diskdb, hash)
|
|
for it.Next() {
|
|
if key := it.Key(); len(key) == 65 { // TODO(karalabe): Yuck, we should move this into the iterator
|
|
batch.Delete(key)
|
|
base.cache.Del(key[1:])
|
|
|
|
snapshotFlushStorageItemMeter.Mark(1)
|
|
}
|
|
}
|
|
it.Release()
|
|
}
|
|
// Push all updated accounts into the database
|
|
for hash, data := range bottom.accountData {
|
|
// Skip any account not covered yet by the snapshot
|
|
if base.genMarker != nil && bytes.Compare(hash[:], base.genMarker) > 0 {
|
|
continue
|
|
}
|
|
// Push the account to disk
|
|
rawdb.WriteAccountSnapshot(batch, hash, data)
|
|
base.cache.Set(hash[:], data)
|
|
snapshotCleanAccountWriteMeter.Mark(int64(len(data)))
|
|
|
|
if batch.ValueSize() > ethdb.IdealBatchSize {
|
|
if err := batch.Write(); err != nil {
|
|
log.Crit("Failed to write account snapshot", "err", err)
|
|
}
|
|
batch.Reset()
|
|
}
|
|
snapshotFlushAccountItemMeter.Mark(1)
|
|
snapshotFlushAccountSizeMeter.Mark(int64(len(data)))
|
|
}
|
|
// Push all the storage slots into the database
|
|
for accountHash, storage := range bottom.storageData {
|
|
// Skip any account not covered yet by the snapshot
|
|
if base.genMarker != nil && bytes.Compare(accountHash[:], base.genMarker) > 0 {
|
|
continue
|
|
}
|
|
// Generation might be mid-account, track that case too
|
|
midAccount := base.genMarker != nil && bytes.Equal(accountHash[:], base.genMarker[:common.HashLength])
|
|
|
|
for storageHash, data := range storage {
|
|
// Skip any slot not covered yet by the snapshot
|
|
if midAccount && bytes.Compare(storageHash[:], base.genMarker[common.HashLength:]) > 0 {
|
|
continue
|
|
}
|
|
if len(data) > 0 {
|
|
rawdb.WriteStorageSnapshot(batch, accountHash, storageHash, data)
|
|
base.cache.Set(append(accountHash[:], storageHash[:]...), data)
|
|
snapshotCleanStorageWriteMeter.Mark(int64(len(data)))
|
|
} else {
|
|
rawdb.DeleteStorageSnapshot(batch, accountHash, storageHash)
|
|
base.cache.Set(append(accountHash[:], storageHash[:]...), nil)
|
|
}
|
|
snapshotFlushStorageItemMeter.Mark(1)
|
|
snapshotFlushStorageSizeMeter.Mark(int64(len(data)))
|
|
}
|
|
if batch.ValueSize() > ethdb.IdealBatchSize {
|
|
if err := batch.Write(); err != nil {
|
|
log.Crit("Failed to write storage snapshot", "err", err)
|
|
}
|
|
batch.Reset()
|
|
}
|
|
}
|
|
// Update the snapshot block marker and write any remainder data
|
|
rawdb.WriteSnapshotRoot(batch, bottom.root)
|
|
if err := batch.Write(); err != nil {
|
|
log.Crit("Failed to write leftover snapshot", "err", err)
|
|
}
|
|
res := &diskLayer{
|
|
root: bottom.root,
|
|
cache: base.cache,
|
|
diskdb: base.diskdb,
|
|
triedb: base.triedb,
|
|
genMarker: base.genMarker,
|
|
genPending: base.genPending,
|
|
}
|
|
// If snapshot generation hasn't finished yet, port over all the starts and
|
|
// continue where the previous round left off.
|
|
//
|
|
// Note, the `base.genAbort` comparison is not used normally, it's checked
|
|
// to allow the tests to play with the marker without triggering this path.
|
|
if base.genMarker != nil && base.genAbort != nil {
|
|
res.genMarker = base.genMarker
|
|
res.genAbort = make(chan chan *generatorStats)
|
|
go res.generate(stats)
|
|
}
|
|
return res
|
|
}
|
|
|
|
// Journal commits an entire diff hierarchy to disk into a single journal entry.
|
|
// This is meant to be used during shutdown to persist the snapshot without
|
|
// flattening everything down (bad for reorgs).
|
|
//
|
|
// The method returns the root hash of the base layer that needs to be persisted
|
|
// to disk as a trie too to allow continuing any pending generation op.
|
|
func (t *Tree) Journal(root common.Hash) (common.Hash, error) {
|
|
// Retrieve the head snapshot to journal from var snap snapshot
|
|
snap := t.Snapshot(root)
|
|
if snap == nil {
|
|
return common.Hash{}, fmt.Errorf("snapshot [%#x] missing", root)
|
|
}
|
|
// Run the journaling
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
journal := new(bytes.Buffer)
|
|
base, err := snap.(snapshot).Journal(journal)
|
|
if err != nil {
|
|
return common.Hash{}, err
|
|
}
|
|
// Store the journal into the database and return
|
|
rawdb.WriteSnapshotJournal(t.diskdb, journal.Bytes())
|
|
return base, nil
|
|
}
|
|
|
|
// Rebuild wipes all available snapshot data from the persistent database and
|
|
// discard all caches and diff layers. Afterwards, it starts a new snapshot
|
|
// generator with the given root hash.
|
|
func (t *Tree) Rebuild(root common.Hash) {
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
// Track whether there's a wipe currently running and keep it alive if so
|
|
var wiper chan struct{}
|
|
|
|
// Iterate over and mark all layers stale
|
|
for _, layer := range t.layers {
|
|
switch layer := layer.(type) {
|
|
case *diskLayer:
|
|
// If the base layer is generating, abort it and save
|
|
if layer.genAbort != nil {
|
|
abort := make(chan *generatorStats)
|
|
layer.genAbort <- abort
|
|
|
|
if stats := <-abort; stats != nil {
|
|
wiper = stats.wiping
|
|
}
|
|
}
|
|
// Layer should be inactive now, mark it as stale
|
|
layer.lock.Lock()
|
|
layer.stale = true
|
|
layer.lock.Unlock()
|
|
|
|
case *diffLayer:
|
|
// If the layer is a simple diff, simply mark as stale
|
|
layer.lock.Lock()
|
|
atomic.StoreUint32(&layer.stale, 1)
|
|
layer.lock.Unlock()
|
|
|
|
default:
|
|
panic(fmt.Sprintf("unknown layer type: %T", layer))
|
|
}
|
|
}
|
|
// Start generating a new snapshot from scratch on a backgroung thread. The
|
|
// generator will run a wiper first if there's not one running right now.
|
|
log.Info("Rebuilding state snapshot")
|
|
t.layers = map[common.Hash]snapshot{
|
|
root: generateSnapshot(t.diskdb, t.triedb, t.cache, root, wiper),
|
|
}
|
|
}
|
|
|
|
// AccountIterator creates a new account iterator for the specified root hash and
|
|
// seeks to a starting account hash.
|
|
func (t *Tree) AccountIterator(root common.Hash, seek common.Hash) (AccountIterator, error) {
|
|
return newFastAccountIterator(t, root, seek)
|
|
}
|
|
|
|
// StorageIterator creates a new storage iterator for the specified root hash and
|
|
// account. The iterator will be move to the specific start position.
|
|
func (t *Tree) StorageIterator(root common.Hash, account common.Hash, seek common.Hash) (StorageIterator, error) {
|
|
return newFastStorageIterator(t, root, account, seek)
|
|
}
|