mirror of
https://gitlab.com/pulsechaincom/go-pulse.git
synced 2025-01-17 23:58:46 +00:00
737 lines
27 KiB
Go
737 lines
27 KiB
Go
// Copyright 2019 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
// Package snapshot implements a journalled, dynamic state dump.
|
|
package snapshot
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
|
"github.com/ethereum/go-ethereum/ethdb"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/ethereum/go-ethereum/metrics"
|
|
"github.com/ethereum/go-ethereum/rlp"
|
|
"github.com/ethereum/go-ethereum/trie"
|
|
)
|
|
|
|
var (
|
|
snapshotCleanAccountHitMeter = metrics.NewRegisteredMeter("state/snapshot/clean/account/hit", nil)
|
|
snapshotCleanAccountMissMeter = metrics.NewRegisteredMeter("state/snapshot/clean/account/miss", nil)
|
|
snapshotCleanAccountInexMeter = metrics.NewRegisteredMeter("state/snapshot/clean/account/inex", nil)
|
|
snapshotCleanAccountReadMeter = metrics.NewRegisteredMeter("state/snapshot/clean/account/read", nil)
|
|
snapshotCleanAccountWriteMeter = metrics.NewRegisteredMeter("state/snapshot/clean/account/write", nil)
|
|
|
|
snapshotCleanStorageHitMeter = metrics.NewRegisteredMeter("state/snapshot/clean/storage/hit", nil)
|
|
snapshotCleanStorageMissMeter = metrics.NewRegisteredMeter("state/snapshot/clean/storage/miss", nil)
|
|
snapshotCleanStorageInexMeter = metrics.NewRegisteredMeter("state/snapshot/clean/storage/inex", nil)
|
|
snapshotCleanStorageReadMeter = metrics.NewRegisteredMeter("state/snapshot/clean/storage/read", nil)
|
|
snapshotCleanStorageWriteMeter = metrics.NewRegisteredMeter("state/snapshot/clean/storage/write", nil)
|
|
|
|
snapshotDirtyAccountHitMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/account/hit", nil)
|
|
snapshotDirtyAccountMissMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/account/miss", nil)
|
|
snapshotDirtyAccountInexMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/account/inex", nil)
|
|
snapshotDirtyAccountReadMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/account/read", nil)
|
|
snapshotDirtyAccountWriteMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/account/write", nil)
|
|
|
|
snapshotDirtyStorageHitMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/storage/hit", nil)
|
|
snapshotDirtyStorageMissMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/storage/miss", nil)
|
|
snapshotDirtyStorageInexMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/storage/inex", nil)
|
|
snapshotDirtyStorageReadMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/storage/read", nil)
|
|
snapshotDirtyStorageWriteMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/storage/write", nil)
|
|
|
|
snapshotDirtyAccountHitDepthHist = metrics.NewRegisteredHistogram("state/snapshot/dirty/account/hit/depth", nil, metrics.NewExpDecaySample(1028, 0.015))
|
|
snapshotDirtyStorageHitDepthHist = metrics.NewRegisteredHistogram("state/snapshot/dirty/storage/hit/depth", nil, metrics.NewExpDecaySample(1028, 0.015))
|
|
|
|
snapshotFlushAccountItemMeter = metrics.NewRegisteredMeter("state/snapshot/flush/account/item", nil)
|
|
snapshotFlushAccountSizeMeter = metrics.NewRegisteredMeter("state/snapshot/flush/account/size", nil)
|
|
snapshotFlushStorageItemMeter = metrics.NewRegisteredMeter("state/snapshot/flush/storage/item", nil)
|
|
snapshotFlushStorageSizeMeter = metrics.NewRegisteredMeter("state/snapshot/flush/storage/size", nil)
|
|
|
|
snapshotBloomIndexTimer = metrics.NewRegisteredResettingTimer("state/snapshot/bloom/index", nil)
|
|
snapshotBloomErrorGauge = metrics.NewRegisteredGaugeFloat64("state/snapshot/bloom/error", nil)
|
|
|
|
snapshotBloomAccountTrueHitMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/account/truehit", nil)
|
|
snapshotBloomAccountFalseHitMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/account/falsehit", nil)
|
|
snapshotBloomAccountMissMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/account/miss", nil)
|
|
|
|
snapshotBloomStorageTrueHitMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/storage/truehit", nil)
|
|
snapshotBloomStorageFalseHitMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/storage/falsehit", nil)
|
|
snapshotBloomStorageMissMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/storage/miss", nil)
|
|
|
|
// ErrSnapshotStale is returned from data accessors if the underlying snapshot
|
|
// layer had been invalidated due to the chain progressing forward far enough
|
|
// to not maintain the layer's original state.
|
|
ErrSnapshotStale = errors.New("snapshot stale")
|
|
|
|
// ErrNotCoveredYet is returned from data accessors if the underlying snapshot
|
|
// is being generated currently and the requested data item is not yet in the
|
|
// range of accounts covered.
|
|
ErrNotCoveredYet = errors.New("not covered yet")
|
|
|
|
// ErrNotConstructed is returned if the callers want to iterate the snapshot
|
|
// while the generation is not finished yet.
|
|
ErrNotConstructed = errors.New("snapshot is not constructed")
|
|
|
|
// errSnapshotCycle is returned if a snapshot is attempted to be inserted
|
|
// that forms a cycle in the snapshot tree.
|
|
errSnapshotCycle = errors.New("snapshot cycle")
|
|
)
|
|
|
|
// Snapshot represents the functionality supported by a snapshot storage layer.
|
|
type Snapshot interface {
|
|
// Root returns the root hash for which this snapshot was made.
|
|
Root() common.Hash
|
|
|
|
// Account directly retrieves the account associated with a particular hash in
|
|
// the snapshot slim data format.
|
|
Account(hash common.Hash) (*Account, error)
|
|
|
|
// AccountRLP directly retrieves the account RLP associated with a particular
|
|
// hash in the snapshot slim data format.
|
|
AccountRLP(hash common.Hash) ([]byte, error)
|
|
|
|
// Storage directly retrieves the storage data associated with a particular hash,
|
|
// within a particular account.
|
|
Storage(accountHash, storageHash common.Hash) ([]byte, error)
|
|
}
|
|
|
|
// snapshot is the internal version of the snapshot data layer that supports some
|
|
// additional methods compared to the public API.
|
|
type snapshot interface {
|
|
Snapshot
|
|
|
|
// Parent returns the subsequent layer of a snapshot, or nil if the base was
|
|
// reached.
|
|
//
|
|
// Note, the method is an internal helper to avoid type switching between the
|
|
// disk and diff layers. There is no locking involved.
|
|
Parent() snapshot
|
|
|
|
// Update creates a new layer on top of the existing snapshot diff tree with
|
|
// the specified data items.
|
|
//
|
|
// Note, the maps are retained by the method to avoid copying everything.
|
|
Update(blockRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer
|
|
|
|
// Journal commits an entire diff hierarchy to disk into a single journal entry.
|
|
// This is meant to be used during shutdown to persist the snapshot without
|
|
// flattening everything down (bad for reorgs).
|
|
Journal(buffer *bytes.Buffer) (common.Hash, error)
|
|
|
|
// LegacyJournal is basically identical to Journal. it's the legacy version for
|
|
// flushing legacy journal. Now the only purpose of this function is for testing.
|
|
LegacyJournal(buffer *bytes.Buffer) (common.Hash, error)
|
|
|
|
// Stale return whether this layer has become stale (was flattened across) or
|
|
// if it's still live.
|
|
Stale() bool
|
|
|
|
// AccountIterator creates an account iterator over an arbitrary layer.
|
|
AccountIterator(seek common.Hash) AccountIterator
|
|
|
|
// StorageIterator creates a storage iterator over an arbitrary layer.
|
|
StorageIterator(account common.Hash, seek common.Hash) (StorageIterator, bool)
|
|
}
|
|
|
|
// SnapshotTree is an Ethereum state snapshot tree. It consists of one persistent
|
|
// base layer backed by a key-value store, on top of which arbitrarily many in-
|
|
// memory diff layers are topped. The memory diffs can form a tree with branching,
|
|
// but the disk layer is singleton and common to all. If a reorg goes deeper than
|
|
// the disk layer, everything needs to be deleted.
|
|
//
|
|
// The goal of a state snapshot is twofold: to allow direct access to account and
|
|
// storage data to avoid expensive multi-level trie lookups; and to allow sorted,
|
|
// cheap iteration of the account/storage tries for sync aid.
|
|
type Tree struct {
|
|
diskdb ethdb.KeyValueStore // Persistent database to store the snapshot
|
|
triedb *trie.Database // In-memory cache to access the trie through
|
|
cache int // Megabytes permitted to use for read caches
|
|
layers map[common.Hash]snapshot // Collection of all known layers
|
|
lock sync.RWMutex
|
|
}
|
|
|
|
// New attempts to load an already existing snapshot from a persistent key-value
|
|
// store (with a number of memory layers from a journal), ensuring that the head
|
|
// of the snapshot matches the expected one.
|
|
//
|
|
// If the snapshot is missing or the disk layer is broken, the entire is deleted
|
|
// and will be reconstructed from scratch based on the tries in the key-value
|
|
// store, on a background thread. If the memory layers from the journal is not
|
|
// continuous with disk layer or the journal is missing, all diffs will be discarded
|
|
// iff it's in "recovery" mode, otherwise rebuild is mandatory.
|
|
func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, async bool, recovery bool) *Tree {
|
|
// Create a new, empty snapshot tree
|
|
snap := &Tree{
|
|
diskdb: diskdb,
|
|
triedb: triedb,
|
|
cache: cache,
|
|
layers: make(map[common.Hash]snapshot),
|
|
}
|
|
if !async {
|
|
defer snap.waitBuild()
|
|
}
|
|
// Attempt to load a previously persisted snapshot and rebuild one if failed
|
|
head, err := loadSnapshot(diskdb, triedb, cache, root, recovery)
|
|
if err != nil {
|
|
log.Warn("Failed to load snapshot, regenerating", "err", err)
|
|
snap.Rebuild(root)
|
|
return snap
|
|
}
|
|
// Existing snapshot loaded, seed all the layers
|
|
for head != nil {
|
|
snap.layers[head.Root()] = head
|
|
head = head.Parent()
|
|
}
|
|
return snap
|
|
}
|
|
|
|
// waitBuild blocks until the snapshot finishes rebuilding. This method is meant
|
|
// to be used by tests to ensure we're testing what we believe we are.
|
|
func (t *Tree) waitBuild() {
|
|
// Find the rebuild termination channel
|
|
var done chan struct{}
|
|
|
|
t.lock.RLock()
|
|
for _, layer := range t.layers {
|
|
if layer, ok := layer.(*diskLayer); ok {
|
|
done = layer.genPending
|
|
break
|
|
}
|
|
}
|
|
t.lock.RUnlock()
|
|
|
|
// Wait until the snapshot is generated
|
|
if done != nil {
|
|
<-done
|
|
}
|
|
}
|
|
|
|
// Snapshot retrieves a snapshot belonging to the given block root, or nil if no
|
|
// snapshot is maintained for that block.
|
|
func (t *Tree) Snapshot(blockRoot common.Hash) Snapshot {
|
|
t.lock.RLock()
|
|
defer t.lock.RUnlock()
|
|
|
|
return t.layers[blockRoot]
|
|
}
|
|
|
|
// Update adds a new snapshot into the tree, if that can be linked to an existing
|
|
// old parent. It is disallowed to insert a disk layer (the origin of all).
|
|
func (t *Tree) Update(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) error {
|
|
// Reject noop updates to avoid self-loops in the snapshot tree. This is a
|
|
// special case that can only happen for Clique networks where empty blocks
|
|
// don't modify the state (0 block subsidy).
|
|
//
|
|
// Although we could silently ignore this internally, it should be the caller's
|
|
// responsibility to avoid even attempting to insert such a snapshot.
|
|
if blockRoot == parentRoot {
|
|
return errSnapshotCycle
|
|
}
|
|
// Generate a new snapshot on top of the parent
|
|
parent := t.Snapshot(parentRoot).(snapshot)
|
|
if parent == nil {
|
|
return fmt.Errorf("parent [%#x] snapshot missing", parentRoot)
|
|
}
|
|
snap := parent.Update(blockRoot, destructs, accounts, storage)
|
|
|
|
// Save the new snapshot for later
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
t.layers[snap.root] = snap
|
|
return nil
|
|
}
|
|
|
|
// Cap traverses downwards the snapshot tree from a head block hash until the
|
|
// number of allowed layers are crossed. All layers beyond the permitted number
|
|
// are flattened downwards.
|
|
func (t *Tree) Cap(root common.Hash, layers int) error {
|
|
// Retrieve the head snapshot to cap from
|
|
snap := t.Snapshot(root)
|
|
if snap == nil {
|
|
return fmt.Errorf("snapshot [%#x] missing", root)
|
|
}
|
|
diff, ok := snap.(*diffLayer)
|
|
if !ok {
|
|
return fmt.Errorf("snapshot [%#x] is disk layer", root)
|
|
}
|
|
// If the generator is still running, use a more aggressive cap
|
|
diff.origin.lock.RLock()
|
|
if diff.origin.genMarker != nil && layers > 8 {
|
|
layers = 8
|
|
}
|
|
diff.origin.lock.RUnlock()
|
|
|
|
// Run the internal capping and discard all stale layers
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
// Flattening the bottom-most diff layer requires special casing since there's
|
|
// no child to rewire to the grandparent. In that case we can fake a temporary
|
|
// child for the capping and then remove it.
|
|
var persisted *diskLayer
|
|
|
|
switch layers {
|
|
case 0:
|
|
// If full commit was requested, flatten the diffs and merge onto disk
|
|
diff.lock.RLock()
|
|
base := diffToDisk(diff.flatten().(*diffLayer))
|
|
diff.lock.RUnlock()
|
|
|
|
// Replace the entire snapshot tree with the flat base
|
|
t.layers = map[common.Hash]snapshot{base.root: base}
|
|
return nil
|
|
|
|
case 1:
|
|
// If full flattening was requested, flatten the diffs but only merge if the
|
|
// memory limit was reached
|
|
var (
|
|
bottom *diffLayer
|
|
base *diskLayer
|
|
)
|
|
diff.lock.RLock()
|
|
bottom = diff.flatten().(*diffLayer)
|
|
if bottom.memory >= aggregatorMemoryLimit {
|
|
base = diffToDisk(bottom)
|
|
}
|
|
diff.lock.RUnlock()
|
|
|
|
// If all diff layers were removed, replace the entire snapshot tree
|
|
if base != nil {
|
|
t.layers = map[common.Hash]snapshot{base.root: base}
|
|
return nil
|
|
}
|
|
// Merge the new aggregated layer into the snapshot tree, clean stales below
|
|
t.layers[bottom.root] = bottom
|
|
|
|
default:
|
|
// Many layers requested to be retained, cap normally
|
|
persisted = t.cap(diff, layers)
|
|
}
|
|
// Remove any layer that is stale or links into a stale layer
|
|
children := make(map[common.Hash][]common.Hash)
|
|
for root, snap := range t.layers {
|
|
if diff, ok := snap.(*diffLayer); ok {
|
|
parent := diff.parent.Root()
|
|
children[parent] = append(children[parent], root)
|
|
}
|
|
}
|
|
var remove func(root common.Hash)
|
|
remove = func(root common.Hash) {
|
|
delete(t.layers, root)
|
|
for _, child := range children[root] {
|
|
remove(child)
|
|
}
|
|
delete(children, root)
|
|
}
|
|
for root, snap := range t.layers {
|
|
if snap.Stale() {
|
|
remove(root)
|
|
}
|
|
}
|
|
// If the disk layer was modified, regenerate all the cumulative blooms
|
|
if persisted != nil {
|
|
var rebloom func(root common.Hash)
|
|
rebloom = func(root common.Hash) {
|
|
if diff, ok := t.layers[root].(*diffLayer); ok {
|
|
diff.rebloom(persisted)
|
|
}
|
|
for _, child := range children[root] {
|
|
rebloom(child)
|
|
}
|
|
}
|
|
rebloom(persisted.root)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// cap traverses downwards the diff tree until the number of allowed layers are
|
|
// crossed. All diffs beyond the permitted number are flattened downwards. If the
|
|
// layer limit is reached, memory cap is also enforced (but not before).
|
|
//
|
|
// The method returns the new disk layer if diffs were persistend into it.
|
|
func (t *Tree) cap(diff *diffLayer, layers int) *diskLayer {
|
|
// Dive until we run out of layers or reach the persistent database
|
|
for ; layers > 2; layers-- {
|
|
// If we still have diff layers below, continue down
|
|
if parent, ok := diff.parent.(*diffLayer); ok {
|
|
diff = parent
|
|
} else {
|
|
// Diff stack too shallow, return without modifications
|
|
return nil
|
|
}
|
|
}
|
|
// We're out of layers, flatten anything below, stopping if it's the disk or if
|
|
// the memory limit is not yet exceeded.
|
|
switch parent := diff.parent.(type) {
|
|
case *diskLayer:
|
|
return nil
|
|
|
|
case *diffLayer:
|
|
// Flatten the parent into the grandparent. The flattening internally obtains a
|
|
// write lock on grandparent.
|
|
flattened := parent.flatten().(*diffLayer)
|
|
t.layers[flattened.root] = flattened
|
|
|
|
diff.lock.Lock()
|
|
defer diff.lock.Unlock()
|
|
|
|
diff.parent = flattened
|
|
if flattened.memory < aggregatorMemoryLimit {
|
|
// Accumulator layer is smaller than the limit, so we can abort, unless
|
|
// there's a snapshot being generated currently. In that case, the trie
|
|
// will move fron underneath the generator so we **must** merge all the
|
|
// partial data down into the snapshot and restart the generation.
|
|
if flattened.parent.(*diskLayer).genAbort == nil {
|
|
return nil
|
|
}
|
|
}
|
|
default:
|
|
panic(fmt.Sprintf("unknown data layer: %T", parent))
|
|
}
|
|
// If the bottom-most layer is larger than our memory cap, persist to disk
|
|
bottom := diff.parent.(*diffLayer)
|
|
|
|
bottom.lock.RLock()
|
|
base := diffToDisk(bottom)
|
|
bottom.lock.RUnlock()
|
|
|
|
t.layers[base.root] = base
|
|
diff.parent = base
|
|
return base
|
|
}
|
|
|
|
// diffToDisk merges a bottom-most diff into the persistent disk layer underneath
|
|
// it. The method will panic if called onto a non-bottom-most diff layer.
|
|
//
|
|
// The disk layer persistence should be operated in an atomic way. All updates should
|
|
// be discarded if the whole transition if not finished.
|
|
func diffToDisk(bottom *diffLayer) *diskLayer {
|
|
var (
|
|
base = bottom.parent.(*diskLayer)
|
|
batch = base.diskdb.NewBatch()
|
|
stats *generatorStats
|
|
)
|
|
// If the disk layer is running a snapshot generator, abort it
|
|
if base.genAbort != nil {
|
|
abort := make(chan *generatorStats)
|
|
base.genAbort <- abort
|
|
stats = <-abort
|
|
}
|
|
// Put the deletion in the batch writer, flush all updates in the final step.
|
|
rawdb.DeleteSnapshotRoot(batch)
|
|
|
|
// Mark the original base as stale as we're going to create a new wrapper
|
|
base.lock.Lock()
|
|
if base.stale {
|
|
panic("parent disk layer is stale") // we've committed into the same base from two children, boo
|
|
}
|
|
base.stale = true
|
|
base.lock.Unlock()
|
|
|
|
// Destroy all the destructed accounts from the database
|
|
for hash := range bottom.destructSet {
|
|
// Skip any account not covered yet by the snapshot
|
|
if base.genMarker != nil && bytes.Compare(hash[:], base.genMarker) > 0 {
|
|
continue
|
|
}
|
|
// Remove all storage slots
|
|
rawdb.DeleteAccountSnapshot(batch, hash)
|
|
base.cache.Set(hash[:], nil)
|
|
|
|
it := rawdb.IterateStorageSnapshots(base.diskdb, hash)
|
|
for it.Next() {
|
|
if key := it.Key(); len(key) == 65 { // TODO(karalabe): Yuck, we should move this into the iterator
|
|
batch.Delete(key)
|
|
base.cache.Del(key[1:])
|
|
|
|
snapshotFlushStorageItemMeter.Mark(1)
|
|
}
|
|
}
|
|
it.Release()
|
|
}
|
|
// Push all updated accounts into the database
|
|
for hash, data := range bottom.accountData {
|
|
// Skip any account not covered yet by the snapshot
|
|
if base.genMarker != nil && bytes.Compare(hash[:], base.genMarker) > 0 {
|
|
continue
|
|
}
|
|
// Push the account to disk
|
|
rawdb.WriteAccountSnapshot(batch, hash, data)
|
|
base.cache.Set(hash[:], data)
|
|
snapshotCleanAccountWriteMeter.Mark(int64(len(data)))
|
|
|
|
snapshotFlushAccountItemMeter.Mark(1)
|
|
snapshotFlushAccountSizeMeter.Mark(int64(len(data)))
|
|
}
|
|
// Push all the storage slots into the database
|
|
for accountHash, storage := range bottom.storageData {
|
|
// Skip any account not covered yet by the snapshot
|
|
if base.genMarker != nil && bytes.Compare(accountHash[:], base.genMarker) > 0 {
|
|
continue
|
|
}
|
|
// Generation might be mid-account, track that case too
|
|
midAccount := base.genMarker != nil && bytes.Equal(accountHash[:], base.genMarker[:common.HashLength])
|
|
|
|
for storageHash, data := range storage {
|
|
// Skip any slot not covered yet by the snapshot
|
|
if midAccount && bytes.Compare(storageHash[:], base.genMarker[common.HashLength:]) > 0 {
|
|
continue
|
|
}
|
|
if len(data) > 0 {
|
|
rawdb.WriteStorageSnapshot(batch, accountHash, storageHash, data)
|
|
base.cache.Set(append(accountHash[:], storageHash[:]...), data)
|
|
snapshotCleanStorageWriteMeter.Mark(int64(len(data)))
|
|
} else {
|
|
rawdb.DeleteStorageSnapshot(batch, accountHash, storageHash)
|
|
base.cache.Set(append(accountHash[:], storageHash[:]...), nil)
|
|
}
|
|
snapshotFlushStorageItemMeter.Mark(1)
|
|
snapshotFlushStorageSizeMeter.Mark(int64(len(data)))
|
|
}
|
|
}
|
|
// Update the snapshot block marker and write any remainder data
|
|
rawdb.WriteSnapshotRoot(batch, bottom.root)
|
|
|
|
// Write out the generator progress marker and report
|
|
journalProgress(batch, base.genMarker, stats)
|
|
|
|
// Flush all the updates in the single db operation. Ensure the
|
|
// disk layer transition is atomic.
|
|
if err := batch.Write(); err != nil {
|
|
log.Crit("Failed to write leftover snapshot", "err", err)
|
|
}
|
|
log.Debug("Journalled disk layer", "root", bottom.root, "complete", base.genMarker == nil)
|
|
res := &diskLayer{
|
|
root: bottom.root,
|
|
cache: base.cache,
|
|
diskdb: base.diskdb,
|
|
triedb: base.triedb,
|
|
genMarker: base.genMarker,
|
|
genPending: base.genPending,
|
|
}
|
|
// If snapshot generation hasn't finished yet, port over all the starts and
|
|
// continue where the previous round left off.
|
|
//
|
|
// Note, the `base.genAbort` comparison is not used normally, it's checked
|
|
// to allow the tests to play with the marker without triggering this path.
|
|
if base.genMarker != nil && base.genAbort != nil {
|
|
res.genMarker = base.genMarker
|
|
res.genAbort = make(chan chan *generatorStats)
|
|
go res.generate(stats)
|
|
}
|
|
return res
|
|
}
|
|
|
|
// Journal commits an entire diff hierarchy to disk into a single journal entry.
|
|
// This is meant to be used during shutdown to persist the snapshot without
|
|
// flattening everything down (bad for reorgs).
|
|
//
|
|
// The method returns the root hash of the base layer that needs to be persisted
|
|
// to disk as a trie too to allow continuing any pending generation op.
|
|
func (t *Tree) Journal(root common.Hash) (common.Hash, error) {
|
|
// Retrieve the head snapshot to journal from var snap snapshot
|
|
snap := t.Snapshot(root)
|
|
if snap == nil {
|
|
return common.Hash{}, fmt.Errorf("snapshot [%#x] missing", root)
|
|
}
|
|
// Run the journaling
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
// Firstly write out the metadata of journal
|
|
journal := new(bytes.Buffer)
|
|
if err := rlp.Encode(journal, journalVersion); err != nil {
|
|
return common.Hash{}, err
|
|
}
|
|
diskroot := t.diskRoot()
|
|
if diskroot == (common.Hash{}) {
|
|
return common.Hash{}, errors.New("invalid disk root")
|
|
}
|
|
// Secondly write out the disk layer root, ensure the
|
|
// diff journal is continuous with disk.
|
|
if err := rlp.Encode(journal, diskroot); err != nil {
|
|
return common.Hash{}, err
|
|
}
|
|
// Finally write out the journal of each layer in reverse order.
|
|
base, err := snap.(snapshot).Journal(journal)
|
|
if err != nil {
|
|
return common.Hash{}, err
|
|
}
|
|
// Store the journal into the database and return
|
|
rawdb.WriteSnapshotJournal(t.diskdb, journal.Bytes())
|
|
return base, nil
|
|
}
|
|
|
|
// LegacyJournal is basically identical to Journal. it's the legacy
|
|
// version for flushing legacy journal. Now the only purpose of this
|
|
// function is for testing.
|
|
func (t *Tree) LegacyJournal(root common.Hash) (common.Hash, error) {
|
|
// Retrieve the head snapshot to journal from var snap snapshot
|
|
snap := t.Snapshot(root)
|
|
if snap == nil {
|
|
return common.Hash{}, fmt.Errorf("snapshot [%#x] missing", root)
|
|
}
|
|
// Run the journaling
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
journal := new(bytes.Buffer)
|
|
base, err := snap.(snapshot).LegacyJournal(journal)
|
|
if err != nil {
|
|
return common.Hash{}, err
|
|
}
|
|
// Store the journal into the database and return
|
|
rawdb.WriteSnapshotJournal(t.diskdb, journal.Bytes())
|
|
return base, nil
|
|
}
|
|
|
|
// Rebuild wipes all available snapshot data from the persistent database and
|
|
// discard all caches and diff layers. Afterwards, it starts a new snapshot
|
|
// generator with the given root hash.
|
|
func (t *Tree) Rebuild(root common.Hash) {
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
// Firstly delete any recovery flag in the database. Because now we are
|
|
// building a brand new snapshot.
|
|
rawdb.DeleteSnapshotRecoveryNumber(t.diskdb)
|
|
|
|
// Track whether there's a wipe currently running and keep it alive if so
|
|
var wiper chan struct{}
|
|
|
|
// Iterate over and mark all layers stale
|
|
for _, layer := range t.layers {
|
|
switch layer := layer.(type) {
|
|
case *diskLayer:
|
|
// If the base layer is generating, abort it and save
|
|
if layer.genAbort != nil {
|
|
abort := make(chan *generatorStats)
|
|
layer.genAbort <- abort
|
|
|
|
if stats := <-abort; stats != nil {
|
|
wiper = stats.wiping
|
|
}
|
|
}
|
|
// Layer should be inactive now, mark it as stale
|
|
layer.lock.Lock()
|
|
layer.stale = true
|
|
layer.lock.Unlock()
|
|
|
|
case *diffLayer:
|
|
// If the layer is a simple diff, simply mark as stale
|
|
layer.lock.Lock()
|
|
atomic.StoreUint32(&layer.stale, 1)
|
|
layer.lock.Unlock()
|
|
|
|
default:
|
|
panic(fmt.Sprintf("unknown layer type: %T", layer))
|
|
}
|
|
}
|
|
// Start generating a new snapshot from scratch on a backgroung thread. The
|
|
// generator will run a wiper first if there's not one running right now.
|
|
log.Info("Rebuilding state snapshot")
|
|
t.layers = map[common.Hash]snapshot{
|
|
root: generateSnapshot(t.diskdb, t.triedb, t.cache, root, wiper),
|
|
}
|
|
}
|
|
|
|
// AccountIterator creates a new account iterator for the specified root hash and
|
|
// seeks to a starting account hash.
|
|
func (t *Tree) AccountIterator(root common.Hash, seek common.Hash) (AccountIterator, error) {
|
|
ok, err := t.generating()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if ok {
|
|
return nil, ErrNotConstructed
|
|
}
|
|
return newFastAccountIterator(t, root, seek)
|
|
}
|
|
|
|
// StorageIterator creates a new storage iterator for the specified root hash and
|
|
// account. The iterator will be move to the specific start position.
|
|
func (t *Tree) StorageIterator(root common.Hash, account common.Hash, seek common.Hash) (StorageIterator, error) {
|
|
ok, err := t.generating()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if ok {
|
|
return nil, ErrNotConstructed
|
|
}
|
|
return newFastStorageIterator(t, root, account, seek)
|
|
}
|
|
|
|
// disklayer is an internal helper function to return the disk layer.
|
|
// The lock of snapTree is assumed to be held already.
|
|
func (t *Tree) disklayer() *diskLayer {
|
|
var snap snapshot
|
|
for _, s := range t.layers {
|
|
snap = s
|
|
break
|
|
}
|
|
if snap == nil {
|
|
return nil
|
|
}
|
|
switch layer := snap.(type) {
|
|
case *diskLayer:
|
|
return layer
|
|
case *diffLayer:
|
|
return layer.origin
|
|
default:
|
|
panic(fmt.Sprintf("%T: undefined layer", snap))
|
|
}
|
|
}
|
|
|
|
// diskRoot is a internal helper function to return the disk layer root.
|
|
// The lock of snapTree is assumed to be held already.
|
|
func (t *Tree) diskRoot() common.Hash {
|
|
disklayer := t.disklayer()
|
|
if disklayer == nil {
|
|
return common.Hash{}
|
|
}
|
|
return disklayer.Root()
|
|
}
|
|
|
|
// generating is an internal helper function which reports whether the snapshot
|
|
// is still under the construction.
|
|
func (t *Tree) generating() (bool, error) {
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
layer := t.disklayer()
|
|
if layer == nil {
|
|
return false, errors.New("disk layer is missing")
|
|
}
|
|
layer.lock.RLock()
|
|
defer layer.lock.RUnlock()
|
|
return layer.genMarker != nil, nil
|
|
}
|
|
|
|
// diskRoot is a external helper function to return the disk layer root.
|
|
func (t *Tree) DiskRoot() common.Hash {
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
return t.diskRoot()
|
|
}
|