mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-22 03:30:37 +00:00
e3: kv/temporal prototype 3 (#6395)
This commit is contained in:
parent
c619413871
commit
130ab85bea
@ -15,12 +15,15 @@ import (
|
||||
|
||||
"github.com/ledgerwatch/erigon-lib/common/datadir"
|
||||
"github.com/ledgerwatch/erigon-lib/common/dir"
|
||||
"github.com/ledgerwatch/erigon-lib/kv/kvcfg"
|
||||
libstate "github.com/ledgerwatch/erigon-lib/state"
|
||||
|
||||
"github.com/ledgerwatch/erigon/core/state/temporal"
|
||||
"github.com/ledgerwatch/erigon/eth/ethconfig"
|
||||
"github.com/ledgerwatch/erigon/params"
|
||||
"github.com/ledgerwatch/erigon/rpc/rpccfg"
|
||||
"github.com/ledgerwatch/erigon/turbo/debug"
|
||||
"github.com/ledgerwatch/erigon/turbo/logging"
|
||||
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snap"
|
||||
|
||||
"github.com/ledgerwatch/erigon-lib/direct"
|
||||
"github.com/ledgerwatch/erigon-lib/gointerfaces"
|
||||
@ -49,12 +52,10 @@ import (
|
||||
"github.com/ledgerwatch/erigon/core/rawdb"
|
||||
"github.com/ledgerwatch/erigon/node"
|
||||
"github.com/ledgerwatch/erigon/node/nodecfg"
|
||||
"github.com/ledgerwatch/erigon/params"
|
||||
"github.com/ledgerwatch/erigon/rpc"
|
||||
"github.com/ledgerwatch/erigon/turbo/rpchelper"
|
||||
"github.com/ledgerwatch/erigon/turbo/services"
|
||||
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
|
||||
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snap"
|
||||
)
|
||||
|
||||
var rootCmd = &cobra.Command{
|
||||
@ -268,12 +269,27 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
|
||||
if !cfg.WithDatadir && cfg.PrivateApiAddr == "" {
|
||||
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("either remote db or local db must be specified")
|
||||
}
|
||||
creds, err := grpcutil.TLS(cfg.TLSCACert, cfg.TLSCertfile, cfg.TLSKeyFile)
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("open tls cert: %w", err)
|
||||
}
|
||||
conn, err := grpcutil.Connect(creds, cfg.PrivateApiAddr)
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to execution service privateApi: %w", err)
|
||||
}
|
||||
|
||||
// Do not change the order of these checks. Chaindata needs to be checked first, because PrivateApiAddr has default value which is not ""
|
||||
// If PrivateApiAddr is checked first, the Chaindata option will never work
|
||||
kvClient := remote.NewKVClient(conn)
|
||||
remoteKv, err := remotedb.NewRemote(gointerfaces.VersionFromProto(remotedbserver.KvServiceAPIVersion), logger, kvClient).Open()
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to remoteKv: %w", err)
|
||||
}
|
||||
|
||||
// Configure DB first
|
||||
var allSnapshots *snapshotsync.RoSnapshots
|
||||
onNewSnapshot := func() {}
|
||||
if cfg.WithDatadir {
|
||||
dir.MustExist(cfg.Dirs.SnapHistory)
|
||||
var rwKv kv.RwDB
|
||||
dir.MustExist(cfg.Dirs.SnapHistory)
|
||||
log.Trace("Creating chain db", "path", cfg.Dirs.Chaindata)
|
||||
limiter := semaphore.NewWeighted(int64(cfg.DBReadConcurrency))
|
||||
rwKv, err = kv2.NewMDBX(logger).RoTxsLimiter(limiter).Path(cfg.Dirs.Chaindata).Readonly().Open()
|
||||
@ -284,6 +300,101 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
|
||||
return nil, nil, nil, nil, nil, nil, nil, ff, nil, compatErr
|
||||
}
|
||||
db = rwKv
|
||||
|
||||
var cc *params.ChainConfig
|
||||
if err := db.View(context.Background(), func(tx kv.Tx) error {
|
||||
genesisBlock, err := rawdb.ReadBlockByNumber(tx, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if genesisBlock == nil {
|
||||
return fmt.Errorf("genesis not found in DB. Likely Erigon was never started on this datadir")
|
||||
}
|
||||
cc, err = rawdb.ReadChainConfig(tx, genesisBlock.Hash())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cfg.Snap.Enabled, err = snap.Enabled(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, nil, nil, nil, nil, nil, nil, ff, nil, err
|
||||
}
|
||||
if cc == nil {
|
||||
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("chain config not found in db. Need start erigon at least once on this db")
|
||||
}
|
||||
cfg.Snap.Enabled = cfg.Snap.Enabled || cfg.Sync.UseSnapshots
|
||||
if !cfg.Snap.Enabled {
|
||||
log.Info("Use --snapshots=false")
|
||||
}
|
||||
|
||||
// Configure sapshots
|
||||
if cfg.Snap.Enabled {
|
||||
allSnapshots = snapshotsync.NewRoSnapshots(cfg.Snap, cfg.Dirs.Snap)
|
||||
// To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down
|
||||
// Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `kvClient.Snapshots` after establish grpc connection
|
||||
allSnapshots.OptimisticReopenWithDB(db)
|
||||
allSnapshots.LogStat()
|
||||
|
||||
if agg, err = libstate.NewAggregator22(cfg.Dirs.SnapHistory, cfg.Dirs.Tmp, ethconfig.HistoryV3AggregationStep, db); err != nil {
|
||||
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err)
|
||||
}
|
||||
_ = agg.ReopenFiles()
|
||||
|
||||
db.View(context.Background(), func(tx kv.Tx) error {
|
||||
agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 {
|
||||
_, histBlockNumProgress, _ := rawdb.TxNums.FindBlockNum(tx, endTxNumMinimax)
|
||||
return histBlockNumProgress
|
||||
})
|
||||
return nil
|
||||
})
|
||||
onNewSnapshot = func() {
|
||||
go func() { // don't block events processing by network communication
|
||||
reply, err := kvClient.Snapshots(ctx, &remote.SnapshotsRequest{}, grpc.WaitForReady(true))
|
||||
if err != nil {
|
||||
log.Warn("[Snapshots] reopen", "err", err)
|
||||
return
|
||||
}
|
||||
if err := allSnapshots.ReopenList(reply.Files, true); err != nil {
|
||||
log.Error("[Snapshots] reopen", "err", err)
|
||||
} else {
|
||||
allSnapshots.LogStat()
|
||||
}
|
||||
|
||||
if err = agg.ReopenFiles(); err != nil {
|
||||
log.Error("[Snapshots] reopen", "err", err)
|
||||
} else {
|
||||
db.View(context.Background(), func(tx kv.Tx) error {
|
||||
agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 {
|
||||
_, histBlockNumProgress, _ := rawdb.TxNums.FindBlockNum(tx, endTxNumMinimax)
|
||||
return histBlockNumProgress
|
||||
})
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}()
|
||||
}
|
||||
onNewSnapshot()
|
||||
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
|
||||
|
||||
var histV3Enabled bool
|
||||
_ = db.View(ctx, func(tx kv.Tx) error {
|
||||
histV3Enabled, _ = kvcfg.HistoryV3.Enabled(tx)
|
||||
return nil
|
||||
})
|
||||
if histV3Enabled {
|
||||
log.Info("HistoryV3", "enable", histV3Enabled)
|
||||
db = temporal.New(rwKv, agg)
|
||||
}
|
||||
}
|
||||
}
|
||||
// If DB can't be configured - used PrivateApiAddr as remote DB
|
||||
if db == nil {
|
||||
db = remoteKv
|
||||
}
|
||||
if cfg.WithDatadir {
|
||||
stateCache = kvcache.NewDummy()
|
||||
blockReader = snapshotsync.NewBlockReader()
|
||||
|
||||
@ -314,115 +425,8 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
|
||||
log.Info("if you run RPCDaemon on same machine with Erigon add --datadir option")
|
||||
}
|
||||
|
||||
if db != nil {
|
||||
var cc *params.ChainConfig
|
||||
if err := db.View(context.Background(), func(tx kv.Tx) error {
|
||||
genesisBlock, err := rawdb.ReadBlockByNumber(tx, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if genesisBlock == nil {
|
||||
return fmt.Errorf("genesis not found in DB. Likely Erigon was never started on this datadir")
|
||||
}
|
||||
cc, err = rawdb.ReadChainConfig(tx, genesisBlock.Hash())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cfg.Snap.Enabled, err = snap.Enabled(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, nil, nil, nil, nil, nil, nil, ff, nil, err
|
||||
}
|
||||
if cc == nil {
|
||||
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("chain config not found in db. Need start erigon at least once on this db")
|
||||
}
|
||||
cfg.Snap.Enabled = cfg.Snap.Enabled || cfg.Sync.UseSnapshots
|
||||
}
|
||||
|
||||
creds, err := grpcutil.TLS(cfg.TLSCACert, cfg.TLSCertfile, cfg.TLSKeyFile)
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("open tls cert: %w", err)
|
||||
}
|
||||
conn, err := grpcutil.Connect(creds, cfg.PrivateApiAddr)
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to execution service privateApi: %w", err)
|
||||
}
|
||||
|
||||
kvClient := remote.NewKVClient(conn)
|
||||
remoteKv, err := remotedb.NewRemote(gointerfaces.VersionFromProto(remotedbserver.KvServiceAPIVersion), logger, kvClient).Open()
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to remoteKv: %w", err)
|
||||
}
|
||||
|
||||
subscribeToStateChangesLoop(ctx, kvClient, stateCache)
|
||||
|
||||
onNewSnapshot := func() {}
|
||||
if cfg.WithDatadir {
|
||||
if cfg.Snap.Enabled {
|
||||
|
||||
allSnapshots := snapshotsync.NewRoSnapshots(cfg.Snap, cfg.Dirs.Snap)
|
||||
// To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down
|
||||
// Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `kvClient.Snapshots` after establish grpc connection
|
||||
allSnapshots.OptimisticReopenWithDB(db)
|
||||
allSnapshots.LogStat()
|
||||
|
||||
if agg, err = libstate.NewAggregator22(cfg.Dirs.SnapHistory, cfg.Dirs.Tmp, ethconfig.HistoryV3AggregationStep, db); err != nil {
|
||||
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err)
|
||||
}
|
||||
_ = agg.ReopenFiles()
|
||||
|
||||
db.View(context.Background(), func(tx kv.Tx) error {
|
||||
agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 {
|
||||
_, histBlockNumProgress, _ := rawdb.TxNums.FindBlockNum(tx, endTxNumMinimax)
|
||||
return histBlockNumProgress
|
||||
})
|
||||
return nil
|
||||
})
|
||||
|
||||
onNewSnapshot = func() {
|
||||
go func() { // don't block events processing by network communication
|
||||
reply, err := kvClient.Snapshots(ctx, &remote.SnapshotsRequest{}, grpc.WaitForReady(true))
|
||||
if err != nil {
|
||||
log.Warn("[Snapshots] reopen", "err", err)
|
||||
return
|
||||
}
|
||||
if err := allSnapshots.ReopenList(reply.Files, true); err != nil {
|
||||
log.Error("[Snapshots] reopen", "err", err)
|
||||
} else {
|
||||
allSnapshots.LogStat()
|
||||
}
|
||||
|
||||
if err = agg.ReopenFiles(); err != nil {
|
||||
log.Error("[Snapshots] reopen", "err", err)
|
||||
} else {
|
||||
db.View(context.Background(), func(tx kv.Tx) error {
|
||||
agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 {
|
||||
_, histBlockNumProgress, _ := rawdb.TxNums.FindBlockNum(tx, endTxNumMinimax)
|
||||
return histBlockNumProgress
|
||||
})
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}()
|
||||
}
|
||||
onNewSnapshot()
|
||||
// TODO: how to don't block startup on remote RPCDaemon?
|
||||
// txNums = exec22.TxNumsFromDB(allSnapshots, db)
|
||||
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
|
||||
} else {
|
||||
log.Info("Use --snapshots=false")
|
||||
}
|
||||
}
|
||||
|
||||
if !cfg.WithDatadir {
|
||||
blockReader = snapshotsync.NewRemoteBlockReader(remote.NewETHBACKENDClient(conn))
|
||||
}
|
||||
remoteEth := rpcservices.NewRemoteBackend(remote.NewETHBACKENDClient(conn), db, blockReader)
|
||||
blockReader = remoteEth
|
||||
|
||||
txpoolConn := conn
|
||||
if cfg.TxPoolApiAddr != cfg.PrivateApiAddr {
|
||||
txpoolConn, err = grpcutil.Connect(creds, cfg.TxPoolApiAddr)
|
||||
@ -435,9 +439,12 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
|
||||
miningService := rpcservices.NewMiningService(mining)
|
||||
txPool = txpool.NewTxpoolClient(txpoolConn)
|
||||
txPoolService := rpcservices.NewTxPoolService(txPool)
|
||||
if db == nil {
|
||||
db = remoteKv
|
||||
if !cfg.WithDatadir {
|
||||
blockReader = snapshotsync.NewRemoteBlockReader(remote.NewETHBACKENDClient(conn))
|
||||
}
|
||||
|
||||
remoteEth := rpcservices.NewRemoteBackend(remote.NewETHBACKENDClient(conn), db, blockReader)
|
||||
blockReader = remoteEth
|
||||
eth = remoteEth
|
||||
go func() {
|
||||
if !remoteKv.EnsureVersionCompatibility() {
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
"github.com/ledgerwatch/erigon-lib/kv/bitmapdb"
|
||||
libstate "github.com/ledgerwatch/erigon-lib/state"
|
||||
"github.com/ledgerwatch/erigon/core/state/temporal"
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
|
||||
"github.com/ledgerwatch/erigon/common"
|
||||
@ -311,7 +312,11 @@ func (api *APIImpl) getLogsV3(ctx context.Context, tx kv.Tx, begin, end uint64,
|
||||
var bitmapForORing roaring64.Bitmap
|
||||
it := ac.LogAddrIterator(addr.Bytes(), fromTxNum, toTxNum, tx)
|
||||
for it.HasNext() {
|
||||
bitmapForORing.Add(it.Next())
|
||||
n, err := it.NextBatch()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
bitmapForORing.AddMany(n)
|
||||
}
|
||||
if addrBitmap == nil {
|
||||
addrBitmap = &bitmapForORing
|
||||
@ -333,7 +338,7 @@ func (api *APIImpl) getLogsV3(ctx context.Context, tx kv.Tx, begin, end uint64,
|
||||
var signer *types.Signer
|
||||
var rules *params.Rules
|
||||
var skipAnalysis bool
|
||||
stateReader := state.NewHistoryReader22(ac)
|
||||
stateReader := state.NewHistoryReaderV3(ac)
|
||||
stateReader.SetTx(tx)
|
||||
ibs := state.New(stateReader)
|
||||
|
||||
@ -458,9 +463,21 @@ func getTopicsBitmapV3(ac *libstate.Aggregator22Context, tx kv.Tx, topics [][]co
|
||||
for _, sub := range topics {
|
||||
var bitmapForORing roaring64.Bitmap
|
||||
for _, topic := range sub {
|
||||
it := ac.LogTopicIterator(topic.Bytes(), from, to, tx)
|
||||
for it.HasNext() {
|
||||
bitmapForORing.Add(it.Next())
|
||||
if ttx, casted := tx.(kv.TemporalTx); casted {
|
||||
it, err := ttx.InvertedIndexRange(temporal.LogTopic, topic.Bytes(), from, to)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for it.HasNext() {
|
||||
n, err := it.NextBatch()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
bitmapForORing.AddMany(n)
|
||||
}
|
||||
} else {
|
||||
it := ac.LogTopicIterator(topic.Bytes(), from, to, tx)
|
||||
bitmapForORing.Or(it.ToBitamp())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -554,7 +554,11 @@ func (api *TraceAPIImpl) filterV3(ctx context.Context, dbtx kv.Tx, fromBlock, to
|
||||
if addr != nil {
|
||||
it := ac.TraceFromIterator(addr.Bytes(), fromTxNum, toTxNum, dbtx)
|
||||
for it.HasNext() {
|
||||
allTxs.Add(it.Next())
|
||||
n, err := it.NextBatch()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
allTxs.AddMany(n)
|
||||
}
|
||||
fromAddresses[*addr] = struct{}{}
|
||||
}
|
||||
@ -564,7 +568,11 @@ func (api *TraceAPIImpl) filterV3(ctx context.Context, dbtx kv.Tx, fromBlock, to
|
||||
if addr != nil {
|
||||
it := ac.TraceToIterator(addr.Bytes(), fromTxNum, toTxNum, dbtx)
|
||||
for it.HasNext() {
|
||||
txsTo.Add(it.Next())
|
||||
n, err := it.NextBatch()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
txsTo.AddMany(n)
|
||||
}
|
||||
toAddresses[*addr] = struct{}{}
|
||||
}
|
||||
@ -616,7 +624,7 @@ func (api *TraceAPIImpl) filterV3(ctx context.Context, dbtx kv.Tx, fromBlock, to
|
||||
var lastHeader *types.Header
|
||||
var lastSigner *types.Signer
|
||||
var lastRules *params.Rules
|
||||
stateReader := state.NewHistoryReader22(ac)
|
||||
stateReader := state.NewHistoryReaderV3(ac)
|
||||
stateReader.SetTx(dbtx)
|
||||
noop := state.NewNoopWriter()
|
||||
for it.HasNext() {
|
||||
|
@ -14,19 +14,23 @@ Content-Type: application/json
|
||||
###
|
||||
|
||||
POST localhost:8545
|
||||
#POST https://goerli.infura.io/v3/e8e57acb3dde4a889e15107e796332aa
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"method": "eth_getBalance",
|
||||
"params": [
|
||||
"0xfffa4763f94f7ad191b366a343092a5d1a47ed08",
|
||||
"0xde84"
|
||||
"0xa6DD2974B96e959F2c8930024451a30aFEC24203",
|
||||
"0x27100"
|
||||
],
|
||||
"id": 1
|
||||
}
|
||||
|
||||
###
|
||||
# 0x68d39d934c85200
|
||||
# 0x68d39d934c85200
|
||||
# 0x68d39d934c85200
|
||||
|
||||
POST localhost:8545
|
||||
Content-Type: application/json
|
||||
|
@ -221,7 +221,7 @@ func History22(genesis *core.Genesis, logger log.Logger) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func runHistory22(trace bool, blockNum, txNumStart uint64, hw *state.HistoryReader23, ww state.StateWriter, chainConfig *params.ChainConfig, getHeader func(hash common.Hash, number uint64) *types.Header, block *types.Block, vmConfig vm.Config) (uint64, types.Receipts, error) {
|
||||
func runHistory22(trace bool, blockNum, txNumStart uint64, hw *state.HistoryReaderV4, ww state.StateWriter, chainConfig *params.ChainConfig, getHeader func(hash common.Hash, number uint64) *types.Header, block *types.Block, vmConfig vm.Config) (uint64, types.Receipts, error) {
|
||||
header := block.Header()
|
||||
vmConfig.TraceJumpDest = true
|
||||
engine := ethash.NewFullFaker()
|
||||
|
@ -7,27 +7,41 @@ import (
|
||||
libstate "github.com/ledgerwatch/erigon-lib/state"
|
||||
"github.com/ledgerwatch/erigon/common"
|
||||
"github.com/ledgerwatch/erigon/common/dbutils"
|
||||
"github.com/ledgerwatch/erigon/core/state/temporal"
|
||||
"github.com/ledgerwatch/erigon/core/types/accounts"
|
||||
)
|
||||
|
||||
// HistoryReader22 Implements StateReader and StateWriter
|
||||
type HistoryReader22 struct {
|
||||
// HistoryReaderV3 Implements StateReader and StateWriter
|
||||
type HistoryReaderV3 struct {
|
||||
ac *libstate.Aggregator22Context
|
||||
txNum uint64
|
||||
trace bool
|
||||
tx kv.Tx
|
||||
ttx kv.TemporalTx
|
||||
}
|
||||
|
||||
func NewHistoryReader22(ac *libstate.Aggregator22Context) *HistoryReader22 {
|
||||
return &HistoryReader22{ac: ac}
|
||||
func NewHistoryReaderV3(ac *libstate.Aggregator22Context) *HistoryReaderV3 {
|
||||
return &HistoryReaderV3{ac: ac}
|
||||
}
|
||||
|
||||
func (hr *HistoryReader22) SetTx(tx kv.Tx) { hr.tx = tx }
|
||||
func (hr *HistoryReader22) SetTxNum(txNum uint64) { hr.txNum = txNum }
|
||||
func (hr *HistoryReader22) SetTrace(trace bool) { hr.trace = trace }
|
||||
func (hr *HistoryReaderV3) SetTx(tx kv.Tx) {
|
||||
if ttx, casted := tx.(kv.TemporalTx); casted {
|
||||
hr.ttx = ttx
|
||||
}
|
||||
hr.tx = tx
|
||||
}
|
||||
func (hr *HistoryReaderV3) SetTxNum(txNum uint64) { hr.txNum = txNum }
|
||||
func (hr *HistoryReaderV3) SetTrace(trace bool) { hr.trace = trace }
|
||||
|
||||
func (hr *HistoryReader22) ReadAccountData(address common.Address) (*accounts.Account, error) {
|
||||
enc, ok, err := hr.ac.ReadAccountDataNoStateWithRecent(address.Bytes(), hr.txNum)
|
||||
func (hr *HistoryReaderV3) ReadAccountData(address common.Address) (*accounts.Account, error) {
|
||||
var enc []byte
|
||||
var ok bool
|
||||
var err error
|
||||
if hr.ttx != nil {
|
||||
enc, ok, err = hr.ttx.HistoryGetNoState(temporal.Accounts, address.Bytes(), hr.txNum)
|
||||
} else {
|
||||
enc, ok, err = hr.ac.ReadAccountDataNoStateWithRecent(address.Bytes(), hr.txNum)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -68,8 +82,15 @@ func (hr *HistoryReader22) ReadAccountData(address common.Address) (*accounts.Ac
|
||||
return &a, nil
|
||||
}
|
||||
|
||||
func (hr *HistoryReader22) ReadAccountStorage(address common.Address, incarnation uint64, key *common.Hash) ([]byte, error) {
|
||||
enc, ok, err := hr.ac.ReadAccountStorageNoStateWithRecent(address.Bytes(), key.Bytes(), hr.txNum)
|
||||
func (hr *HistoryReaderV3) ReadAccountStorage(address common.Address, incarnation uint64, key *common.Hash) ([]byte, error) {
|
||||
var enc []byte
|
||||
var ok bool
|
||||
var err error
|
||||
if hr.ttx != nil {
|
||||
enc, ok, err = hr.ttx.HistoryGetNoState(temporal.Storage, append(address.Bytes(), key.Bytes()...), hr.txNum)
|
||||
} else {
|
||||
enc, ok, err = hr.ac.ReadAccountStorageNoStateWithRecent(address.Bytes(), key.Bytes(), hr.txNum)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -93,11 +114,18 @@ func (hr *HistoryReader22) ReadAccountStorage(address common.Address, incarnatio
|
||||
return enc, nil
|
||||
}
|
||||
|
||||
func (hr *HistoryReader22) ReadAccountCode(address common.Address, incarnation uint64, codeHash common.Hash) ([]byte, error) {
|
||||
func (hr *HistoryReaderV3) ReadAccountCode(address common.Address, incarnation uint64, codeHash common.Hash) ([]byte, error) {
|
||||
if codeHash == emptyCodeHashH {
|
||||
return nil, nil
|
||||
}
|
||||
enc, ok, err := hr.ac.ReadAccountCodeNoStateWithRecent(address.Bytes(), hr.txNum)
|
||||
var enc []byte
|
||||
var ok bool
|
||||
var err error
|
||||
if hr.ttx != nil {
|
||||
enc, ok, err = hr.ttx.HistoryGetNoState(temporal.Code, address.Bytes(), hr.txNum)
|
||||
} else {
|
||||
enc, ok, err = hr.ac.ReadAccountCodeNoStateWithRecent(address.Bytes(), hr.txNum)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -113,8 +141,17 @@ func (hr *HistoryReader22) ReadAccountCode(address common.Address, incarnation u
|
||||
return enc, nil
|
||||
}
|
||||
|
||||
func (hr *HistoryReader22) ReadAccountCodeSize(address common.Address, incarnation uint64, codeHash common.Hash) (int, error) {
|
||||
size, ok, err := hr.ac.ReadAccountCodeSizeNoStateWithRecent(address.Bytes(), hr.txNum)
|
||||
func (hr *HistoryReaderV3) ReadAccountCodeSize(address common.Address, incarnation uint64, codeHash common.Hash) (int, error) {
|
||||
var enc []byte
|
||||
var ok bool
|
||||
var size int
|
||||
var err error
|
||||
if hr.ttx != nil {
|
||||
enc, ok, err = hr.ttx.HistoryGetNoState(temporal.Code, address.Bytes(), hr.txNum)
|
||||
} else {
|
||||
enc, ok, err = hr.ac.ReadAccountCodeNoStateWithRecent(address.Bytes(), hr.txNum)
|
||||
}
|
||||
size = len(enc)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -134,12 +171,12 @@ func (hr *HistoryReader22) ReadAccountCodeSize(address common.Address, incarnati
|
||||
return size, nil
|
||||
}
|
||||
|
||||
func (hr *HistoryReader22) ReadAccountIncarnation(address common.Address) (uint64, error) {
|
||||
func (hr *HistoryReaderV3) ReadAccountIncarnation(address common.Address) (uint64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
/*
|
||||
func (s *HistoryReader22) ForEachStorage(addr common.Address, startLocation common.Hash, cb func(key, seckey common.Hash, value uint256.Int) bool, maxResults int) error {
|
||||
func (s *HistoryReaderV3) ForEachStorage(addr common.Address, startLocation common.Hash, cb func(key, seckey common.Hash, value uint256.Int) bool, maxResults int) error {
|
||||
acc, err := s.ReadAccountData(addr)
|
||||
if err != nil {
|
||||
return err
|
@ -19,8 +19,8 @@ func bytesToUint64(buf []byte) (x uint64) {
|
||||
return
|
||||
}
|
||||
|
||||
// HistoryReader23 Implements StateReader and StateWriter
|
||||
type HistoryReader23 struct {
|
||||
// HistoryReaderV4 Implements StateReader and StateWriter
|
||||
type HistoryReaderV4 struct {
|
||||
ac *libstate.AggregatorContext
|
||||
ri *libstate.ReadIndices
|
||||
txNum uint64
|
||||
@ -28,32 +28,32 @@ type HistoryReader23 struct {
|
||||
tx kv.Tx
|
||||
}
|
||||
|
||||
func NewHistoryReader23(ac *libstate.AggregatorContext, ri *libstate.ReadIndices) *HistoryReader23 {
|
||||
return &HistoryReader23{ac: ac, ri: ri}
|
||||
func NewHistoryReader23(ac *libstate.AggregatorContext, ri *libstate.ReadIndices) *HistoryReaderV4 {
|
||||
return &HistoryReaderV4{ac: ac, ri: ri}
|
||||
}
|
||||
|
||||
func (hr *HistoryReader23) SetTx(tx kv.Tx) { hr.tx = tx }
|
||||
func (hr *HistoryReaderV4) SetTx(tx kv.Tx) { hr.tx = tx }
|
||||
|
||||
func (hr *HistoryReader23) SetRwTx(tx kv.RwTx) {
|
||||
func (hr *HistoryReaderV4) SetRwTx(tx kv.RwTx) {
|
||||
hr.ri.SetTx(tx)
|
||||
}
|
||||
|
||||
func (hr *HistoryReader23) SetTxNum(txNum uint64) {
|
||||
func (hr *HistoryReaderV4) SetTxNum(txNum uint64) {
|
||||
hr.txNum = txNum
|
||||
if hr.ri != nil {
|
||||
hr.ri.SetTxNum(txNum)
|
||||
}
|
||||
}
|
||||
|
||||
func (hr *HistoryReader23) FinishTx() error {
|
||||
func (hr *HistoryReaderV4) FinishTx() error {
|
||||
return hr.ri.FinishTx()
|
||||
}
|
||||
|
||||
func (hr *HistoryReader23) SetTrace(trace bool) {
|
||||
func (hr *HistoryReaderV4) SetTrace(trace bool) {
|
||||
hr.trace = trace
|
||||
}
|
||||
|
||||
func (hr *HistoryReader23) ReadAccountData(address common.Address) (*accounts.Account, error) {
|
||||
func (hr *HistoryReaderV4) ReadAccountData(address common.Address) (*accounts.Account, error) {
|
||||
if hr.ri != nil {
|
||||
if err := hr.ri.ReadAccountData(address.Bytes()); err != nil {
|
||||
return nil, err
|
||||
@ -80,7 +80,7 @@ func (hr *HistoryReader23) ReadAccountData(address common.Address) (*accounts.Ac
|
||||
return &a, nil
|
||||
}
|
||||
|
||||
func (hr *HistoryReader23) ReadAccountStorage(address common.Address, incarnation uint64, key *common.Hash) ([]byte, error) {
|
||||
func (hr *HistoryReaderV4) ReadAccountStorage(address common.Address, incarnation uint64, key *common.Hash) ([]byte, error) {
|
||||
if hr.ri != nil {
|
||||
if err := hr.ri.ReadAccountStorage(address.Bytes(), key.Bytes()); err != nil {
|
||||
return nil, err
|
||||
@ -103,7 +103,7 @@ func (hr *HistoryReader23) ReadAccountStorage(address common.Address, incarnatio
|
||||
return enc, nil
|
||||
}
|
||||
|
||||
func (hr *HistoryReader23) ReadAccountCode(address common.Address, incarnation uint64, codeHash common.Hash) ([]byte, error) {
|
||||
func (hr *HistoryReaderV4) ReadAccountCode(address common.Address, incarnation uint64, codeHash common.Hash) ([]byte, error) {
|
||||
if hr.ri != nil {
|
||||
if err := hr.ri.ReadAccountCode(address.Bytes()); err != nil {
|
||||
return nil, err
|
||||
@ -119,7 +119,7 @@ func (hr *HistoryReader23) ReadAccountCode(address common.Address, incarnation u
|
||||
return enc, nil
|
||||
}
|
||||
|
||||
func (hr *HistoryReader23) ReadAccountCodeSize(address common.Address, incarnation uint64, codeHash common.Hash) (int, error) {
|
||||
func (hr *HistoryReaderV4) ReadAccountCodeSize(address common.Address, incarnation uint64, codeHash common.Hash) (int, error) {
|
||||
if hr.ri != nil {
|
||||
if err := hr.ri.ReadAccountCodeSize(address.Bytes()); err != nil {
|
||||
return 0, err
|
||||
@ -135,6 +135,6 @@ func (hr *HistoryReader23) ReadAccountCodeSize(address common.Address, incarnati
|
||||
return size, nil
|
||||
}
|
||||
|
||||
func (hr *HistoryReader23) ReadAccountIncarnation(address common.Address) (uint64, error) {
|
||||
func (hr *HistoryReaderV4) ReadAccountIncarnation(address common.Address) (uint64, error) {
|
||||
return 0, nil
|
||||
}
|
@ -12,29 +12,59 @@ import (
|
||||
|
||||
const DefaultIncarnation = uint64(1)
|
||||
|
||||
func GetAsOf2(tx *temporal.Tx, storage bool, key []byte, timestamp uint64) (v []byte, err error) {
|
||||
func GetAsOfV3(tx kv.TemporalTx, storage bool, key []byte, timestamp uint64, histV3 bool) (v []byte, err error) {
|
||||
var ok bool
|
||||
if storage {
|
||||
v, ok, err = tx.GetNoState(temporal.Storage, key, timestamp)
|
||||
v, ok, err = tx.HistoryGetNoState(temporal.Storage, key, timestamp)
|
||||
} else {
|
||||
v, ok, err = tx.GetNoState(temporal.Accounts, key, timestamp)
|
||||
v, ok, err = tx.HistoryGetNoState(temporal.Accounts, key, timestamp)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if ok {
|
||||
if v == nil {
|
||||
return nil, nil
|
||||
}
|
||||
if !storage {
|
||||
v, err = accounts.ConvertV3toV2(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !ok {
|
||||
return tx.GetOne(kv.PlainState, key)
|
||||
}
|
||||
|
||||
if v == nil {
|
||||
return nil, nil
|
||||
}
|
||||
if storage {
|
||||
return v, nil
|
||||
}
|
||||
|
||||
if histV3 {
|
||||
v, err = accounts.ConvertV3toV2(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v, nil
|
||||
}
|
||||
return tx.GetOne(kv.PlainState, key)
|
||||
|
||||
//restore codehash
|
||||
var acc accounts.Account
|
||||
if err := acc.DecodeForStorage(v); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if acc.Incarnation > 0 && acc.IsEmptyCodeHash() {
|
||||
var codeHash []byte
|
||||
var err error
|
||||
|
||||
prefix := make([]byte, length.Addr+length.BlockNum)
|
||||
copy(prefix, key)
|
||||
binary.BigEndian.PutUint64(prefix[length.Addr:], acc.Incarnation)
|
||||
|
||||
codeHash, err = tx.GetOne(kv.PlainContractCode, prefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(codeHash) > 0 {
|
||||
acc.CodeHash.SetBytes(codeHash)
|
||||
}
|
||||
v = make([]byte, acc.EncodingLengthForStorage())
|
||||
acc.EncodeForStorage(v)
|
||||
}
|
||||
return v, nil
|
||||
}
|
||||
|
||||
func GetAsOf(tx kv.Tx, indexC kv.Cursor, changesC kv.CursorDupSort, storage bool, key []byte, timestamp uint64) ([]byte, error) {
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
"github.com/holiman/uint256"
|
||||
"github.com/ledgerwatch/erigon-lib/common/length"
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
"github.com/ledgerwatch/erigon-lib/kv/kvcfg"
|
||||
"github.com/ledgerwatch/erigon/common"
|
||||
"github.com/ledgerwatch/erigon/common/dbutils"
|
||||
"github.com/ledgerwatch/erigon/core/state/historyv2read"
|
||||
@ -54,24 +55,35 @@ type PlainState struct {
|
||||
accChangesC, storageChangesC kv.CursorDupSort
|
||||
tx kv.Tx
|
||||
blockNr uint64
|
||||
histV3 bool
|
||||
storage map[common.Address]*btree.BTree
|
||||
trace bool
|
||||
systemContractLookup map[common.Address][]CodeRecord
|
||||
}
|
||||
|
||||
func NewPlainState(tx kv.Tx, blockNr uint64, systemContractLookup map[common.Address][]CodeRecord) *PlainState {
|
||||
c1, _ := tx.Cursor(kv.AccountsHistory)
|
||||
c2, _ := tx.Cursor(kv.StorageHistory)
|
||||
c3, _ := tx.CursorDupSort(kv.AccountChangeSet)
|
||||
c4, _ := tx.CursorDupSort(kv.StorageChangeSet)
|
||||
|
||||
return &PlainState{
|
||||
tx: tx,
|
||||
blockNr: blockNr,
|
||||
storage: make(map[common.Address]*btree.BTree),
|
||||
accHistoryC: c1, storageHistoryC: c2, accChangesC: c3, storageChangesC: c4,
|
||||
histV3, _ := kvcfg.HistoryV3.Enabled(tx)
|
||||
ps := &PlainState{
|
||||
tx: tx,
|
||||
blockNr: blockNr,
|
||||
histV3: histV3,
|
||||
storage: make(map[common.Address]*btree.BTree),
|
||||
systemContractLookup: systemContractLookup,
|
||||
}
|
||||
|
||||
if _, ok := tx.(kv.TemporalTx); !ok {
|
||||
c1, _ := tx.Cursor(kv.AccountsHistory)
|
||||
c2, _ := tx.Cursor(kv.StorageHistory)
|
||||
c3, _ := tx.CursorDupSort(kv.AccountChangeSet)
|
||||
c4, _ := tx.CursorDupSort(kv.StorageChangeSet)
|
||||
|
||||
ps.accHistoryC = c1
|
||||
ps.storageHistoryC = c2
|
||||
ps.accChangesC = c3
|
||||
ps.storageChangesC = c4
|
||||
}
|
||||
|
||||
return ps
|
||||
}
|
||||
|
||||
func (s *PlainState) SetTrace(trace bool) {
|
||||
@ -90,10 +102,20 @@ func (s *PlainState) ForEachStorage(addr common.Address, startLocation common.Ha
|
||||
st := btree.New(16)
|
||||
var k [length.Addr + length.Incarnation + length.Hash]byte
|
||||
copy(k[:], addr[:])
|
||||
accData, err := historyv2read.GetAsOf(s.tx, s.accHistoryC, s.accChangesC, false /* storage */, addr[:], s.blockNr)
|
||||
if err != nil {
|
||||
return err
|
||||
var accData []byte
|
||||
var err error
|
||||
if ttx, ok := s.tx.(kv.TemporalTx); ok {
|
||||
accData, err = historyv2read.GetAsOfV3(ttx, false /* storage */, addr[:], s.blockNr, s.histV3)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
accData, err = historyv2read.GetAsOf(s.tx, s.accHistoryC, s.accChangesC, false /* storage */, addr[:], s.blockNr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
var acc accounts.Account
|
||||
if err := acc.DecodeForStorage(accData); err != nil {
|
||||
log.Error("Error decoding account", "err", err)
|
||||
@ -162,9 +184,18 @@ func (s *PlainState) ForEachStorage(addr common.Address, startLocation common.Ha
|
||||
}
|
||||
|
||||
func (s *PlainState) ReadAccountData(address common.Address) (*accounts.Account, error) {
|
||||
enc, err := historyv2read.GetAsOf(s.tx, s.accHistoryC, s.accChangesC, false /* storage */, address[:], s.blockNr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
var enc []byte
|
||||
var err error
|
||||
if ttx, ok := s.tx.(kv.TemporalTx); ok {
|
||||
enc, err = historyv2read.GetAsOfV3(ttx, false /* storage */, address[:], s.blockNr, s.histV3)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
enc, err = historyv2read.GetAsOf(s.tx, s.accHistoryC, s.accChangesC, false /* storage */, address[:], s.blockNr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if len(enc) == 0 {
|
||||
if s.trace {
|
||||
@ -199,9 +230,18 @@ func (s *PlainState) ReadAccountData(address common.Address) (*accounts.Account,
|
||||
|
||||
func (s *PlainState) ReadAccountStorage(address common.Address, incarnation uint64, key *common.Hash) ([]byte, error) {
|
||||
compositeKey := dbutils.PlainGenerateCompositeStorageKey(address.Bytes(), incarnation, key.Bytes())
|
||||
enc, err := historyv2read.GetAsOf(s.tx, s.storageHistoryC, s.storageChangesC, true /* storage */, compositeKey, s.blockNr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
var enc []byte
|
||||
var err error
|
||||
if ttx, ok := s.tx.(kv.TemporalTx); ok {
|
||||
enc, err = historyv2read.GetAsOfV3(ttx, true /* storage */, compositeKey, s.blockNr, s.histV3)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
enc, err = historyv2read.GetAsOf(s.tx, s.storageHistoryC, s.storageChangesC, true /* storage */, compositeKey, s.blockNr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if s.trace {
|
||||
fmt.Printf("ReadAccountStorage [%x] [%x] => [%x]\n", address, *key, enc)
|
||||
@ -232,9 +272,18 @@ func (s *PlainState) ReadAccountCodeSize(address common.Address, incarnation uin
|
||||
}
|
||||
|
||||
func (s *PlainState) ReadAccountIncarnation(address common.Address) (uint64, error) {
|
||||
enc, err := historyv2read.GetAsOf(s.tx, s.accHistoryC, s.accChangesC, false /* storage */, address[:], s.blockNr+1)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
var enc []byte
|
||||
var err error
|
||||
if ttx, ok := s.tx.(kv.TemporalTx); ok {
|
||||
enc, err = historyv2read.GetAsOfV3(ttx, false /* storage */, address[:], s.blockNr+1, s.histV3)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
} else {
|
||||
enc, err = historyv2read.GetAsOf(s.tx, s.accHistoryC, s.accChangesC, false /* storage */, address[:], s.blockNr+1)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
if len(enc) == 0 {
|
||||
return 0, nil
|
||||
|
@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/RoaringBitmap/roaring/roaring64"
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
"github.com/ledgerwatch/erigon-lib/kv/bitmapdb"
|
||||
"github.com/ledgerwatch/erigon-lib/kv/kvcfg"
|
||||
@ -22,22 +21,22 @@ import (
|
||||
// v - value
|
||||
|
||||
type DB struct {
|
||||
kv kv.RoDB
|
||||
kv.RwDB
|
||||
agg *state.Aggregator22
|
||||
hitoryV3 bool
|
||||
}
|
||||
|
||||
func New(kv kv.RoDB, agg *state.Aggregator22) *DB {
|
||||
return &DB{kv: kv, agg: agg, hitoryV3: kvcfg.HistoryV3.FromDB(kv)}
|
||||
func New(kv kv.RwDB, agg *state.Aggregator22) *DB {
|
||||
return &DB{RwDB: kv, agg: agg, hitoryV3: kvcfg.HistoryV3.FromDB(kv)}
|
||||
}
|
||||
func (db *DB) BeginTemporalRo(ctx context.Context) (*Tx, error) {
|
||||
kvTx, err := db.kv.BeginRo(ctx)
|
||||
func (db *DB) BeginTemporalRo(ctx context.Context) (kv.TemporalTx, error) {
|
||||
kvTx, err := db.RwDB.BeginRo(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tx := &Tx{Tx: kvTx, agg: db.agg.MakeContext()}
|
||||
tx := &Tx{Tx: kvTx, hitoryV3: db.hitoryV3}
|
||||
if db.hitoryV3 {
|
||||
|
||||
tx.agg = db.agg.MakeContext()
|
||||
} else {
|
||||
tx.accHistoryC, _ = tx.Cursor(kv.AccountsHistory)
|
||||
tx.storageHistoryC, _ = tx.Cursor(kv.StorageHistory)
|
||||
@ -46,6 +45,27 @@ func (db *DB) BeginTemporalRo(ctx context.Context) (*Tx, error) {
|
||||
}
|
||||
return tx, nil
|
||||
}
|
||||
func (db *DB) ViewTemporal(ctx context.Context, f func(tx kv.TemporalTx) error) error {
|
||||
tx, err := db.BeginTemporalRo(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
return f(tx)
|
||||
}
|
||||
|
||||
// TODO: it's temporary method, allowing inject TemproalTx without changing code. But it's not type-safe.
|
||||
func (db *DB) BeginRo(ctx context.Context) (kv.Tx, error) {
|
||||
return db.BeginTemporalRo(ctx)
|
||||
}
|
||||
func (db *DB) View(ctx context.Context, f func(tx kv.Tx) error) error {
|
||||
tx, err := db.BeginTemporalRo(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
return f(tx)
|
||||
}
|
||||
|
||||
type Tx struct {
|
||||
kv.Tx
|
||||
@ -59,24 +79,20 @@ type Tx struct {
|
||||
hitoryV3 bool
|
||||
}
|
||||
|
||||
type History string
|
||||
|
||||
const (
|
||||
Accounts History = "accounts"
|
||||
Storage History = "storage"
|
||||
Code History = "code"
|
||||
Accounts kv.History = "accounts"
|
||||
Storage kv.History = "storage"
|
||||
Code kv.History = "code"
|
||||
)
|
||||
|
||||
type InvertedIdx string
|
||||
|
||||
const (
|
||||
LogTopic InvertedIdx = "LogTopic"
|
||||
LogAddr InvertedIdx = "LogAddr"
|
||||
TracesFrom InvertedIdx = "TracesFrom"
|
||||
TracesTo InvertedIdx = "TracesTo"
|
||||
LogTopic kv.InvertedIdx = "LogTopic"
|
||||
LogAddr kv.InvertedIdx = "LogAddr"
|
||||
TracesFrom kv.InvertedIdx = "TracesFrom"
|
||||
TracesTo kv.InvertedIdx = "TracesTo"
|
||||
)
|
||||
|
||||
func (tx *Tx) GetNoState(name History, key []byte, ts uint64) (v []byte, ok bool, err error) {
|
||||
func (tx *Tx) HistoryGetNoState(name kv.History, key []byte, ts uint64) (v []byte, ok bool, err error) {
|
||||
if tx.hitoryV3 {
|
||||
switch name {
|
||||
case Accounts:
|
||||
@ -114,28 +130,22 @@ type Cursor struct {
|
||||
hitoryV3 bool
|
||||
}
|
||||
|
||||
type It interface {
|
||||
Next() uint64
|
||||
HasNext() bool
|
||||
Close()
|
||||
}
|
||||
|
||||
// [fromTs, toTs)
|
||||
func (tx *Tx) InvertedIndexRange(name InvertedIdx, key []byte, fromTs, toTs uint64) (bitmap *roaring64.Bitmap, err error) {
|
||||
func (tx *Tx) InvertedIndexRange(name kv.InvertedIdx, key []byte, fromTs, toTs uint64) (timestamps kv.Iter[uint64], err error) {
|
||||
if tx.hitoryV3 {
|
||||
switch name {
|
||||
case LogTopic:
|
||||
t := tx.agg.LogTopicIterator(key, fromTs, toTs, tx)
|
||||
return t.ToBitamp(), nil
|
||||
return t, nil
|
||||
case LogAddr:
|
||||
t := tx.agg.LogAddrIterator(key, fromTs, toTs, tx)
|
||||
return t.ToBitamp(), nil
|
||||
return t, nil
|
||||
case TracesFrom:
|
||||
t := tx.agg.TraceFromIterator(key, fromTs, toTs, tx)
|
||||
return t.ToBitamp(), nil
|
||||
return t, nil
|
||||
case TracesTo:
|
||||
t := tx.agg.TraceToIterator(key, fromTs, toTs, tx)
|
||||
return t.ToBitamp(), nil
|
||||
return t, nil
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected: %s", name))
|
||||
}
|
||||
@ -153,6 +163,10 @@ func (tx *Tx) InvertedIndexRange(name InvertedIdx, key []byte, fromTs, toTs uint
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected: %s", name))
|
||||
}
|
||||
return bitmapdb.Get64(tx, table, key, fromTs, toTs)
|
||||
bm, err := bitmapdb.Get64(tx, table, key, fromTs, toTs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return kv.IterFromArray(bm.ToArray()), nil
|
||||
}
|
||||
}
|
||||
|
@ -54,6 +54,7 @@ import (
|
||||
txpool2 "github.com/ledgerwatch/erigon-lib/txpool"
|
||||
"github.com/ledgerwatch/erigon-lib/txpool/txpooluitl"
|
||||
types2 "github.com/ledgerwatch/erigon-lib/types"
|
||||
"github.com/ledgerwatch/erigon/core/state/temporal"
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
"golang.org/x/exp/slices"
|
||||
"google.golang.org/grpc"
|
||||
@ -283,6 +284,10 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
|
||||
}
|
||||
backend.agg = agg
|
||||
|
||||
if config.HistoryV3 {
|
||||
backend.chainDB = temporal.New(backend.chainDB, agg)
|
||||
}
|
||||
|
||||
kvRPC := remotedbserver.NewKvServer(ctx, chainKv, allSnapshots, agg)
|
||||
backend.notifications.StateChangesConsumer = kvRPC
|
||||
|
||||
|
2
go.mod
2
go.mod
@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon
|
||||
go 1.18
|
||||
|
||||
require (
|
||||
github.com/ledgerwatch/erigon-lib v0.0.0-20221220013813-d4547e00f648
|
||||
github.com/ledgerwatch/erigon-lib v0.0.0-20221221105103-f352a64847f2
|
||||
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20221220153128-af4eb4c802af
|
||||
github.com/ledgerwatch/log/v3 v3.6.0
|
||||
github.com/ledgerwatch/secp256k1 v1.0.0
|
||||
|
4
go.sum
4
go.sum
@ -561,8 +561,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0=
|
||||
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
|
||||
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
|
||||
github.com/ledgerwatch/erigon-lib v0.0.0-20221220013813-d4547e00f648 h1:cbA7jRfUrwExMn3C/B5jhiDvWuArPO3vWdQH+3tYFEM=
|
||||
github.com/ledgerwatch/erigon-lib v0.0.0-20221220013813-d4547e00f648/go.mod h1:5GCPOzxAshLF7f0wrMZu2Bdq0qqIiMcIubM9n+25gGo=
|
||||
github.com/ledgerwatch/erigon-lib v0.0.0-20221221105103-f352a64847f2 h1:u4XIy9PtcApTElPq6Of9icVt81bJiEyFqxYk430cnEY=
|
||||
github.com/ledgerwatch/erigon-lib v0.0.0-20221221105103-f352a64847f2/go.mod h1:5GCPOzxAshLF7f0wrMZu2Bdq0qqIiMcIubM9n+25gGo=
|
||||
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20221220153128-af4eb4c802af h1:lSsr//NdJ9+gJLmXOfMsKAZ0HGiK//MvDReqttLBk8M=
|
||||
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20221220153128-af4eb4c802af/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
|
||||
github.com/ledgerwatch/log/v3 v3.6.0 h1:JBUSK1epPyutUrz7KYDTcJtQLEHnehECRpKbM1ugy5M=
|
||||
|
@ -120,7 +120,7 @@ func CreateHistoryStateReader(tx kv.Tx, blockNumber, txnIndex uint64, agg *state
|
||||
}
|
||||
aggCtx := agg.MakeContext()
|
||||
aggCtx.SetTx(tx)
|
||||
r := state.NewHistoryReader22(aggCtx)
|
||||
r := state.NewHistoryReaderV3(aggCtx)
|
||||
r.SetTx(tx)
|
||||
minTxNum, err := rawdb.TxNums.Min(tx, blockNumber)
|
||||
if err != nil {
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
libstate "github.com/ledgerwatch/erigon-lib/state"
|
||||
"github.com/ledgerwatch/erigon-lib/txpool"
|
||||
types2 "github.com/ledgerwatch/erigon-lib/types"
|
||||
"github.com/ledgerwatch/erigon/core/state/temporal"
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
"google.golang.org/protobuf/types/known/emptypb"
|
||||
|
||||
@ -207,12 +208,40 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
|
||||
dirs := datadir.New(tmpdir)
|
||||
var err error
|
||||
|
||||
cfg := ethconfig.Defaults
|
||||
cfg.HistoryV3 = ethconfig.EnableHistoryV3InTest
|
||||
cfg.StateStream = true
|
||||
cfg.BatchSize = 1 * datasize.MB
|
||||
cfg.Sync.BodyDownloadTimeoutSeconds = 10
|
||||
cfg.DeprecatedTxPool.Disable = !withTxPool
|
||||
cfg.DeprecatedTxPool.StartOnInit = true
|
||||
|
||||
db := memdb.New()
|
||||
ctx, ctxCancel := context.WithCancel(context.Background())
|
||||
_ = db.Update(ctx, func(tx kv.RwTx) error {
|
||||
_, _ = kvcfg.HistoryV3.WriteOnce(tx, cfg.HistoryV3)
|
||||
return nil
|
||||
})
|
||||
|
||||
var agg *libstate.Aggregator22
|
||||
if cfg.HistoryV3 {
|
||||
dir.MustExist(dirs.SnapHistory)
|
||||
agg, err = libstate.NewAggregator22(dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, db)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := agg.ReopenFiles(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.HistoryV3 {
|
||||
db = temporal.New(db, agg)
|
||||
}
|
||||
|
||||
erigonGrpcServeer := remotedbserver.NewKvServer(ctx, db, nil, nil)
|
||||
mock := &MockSentry{
|
||||
Ctx: ctx, cancel: ctxCancel, DB: db,
|
||||
Ctx: ctx, cancel: ctxCancel, DB: db, agg: agg,
|
||||
t: t,
|
||||
Log: log.New(),
|
||||
Dirs: dirs,
|
||||
@ -228,8 +257,8 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
|
||||
UpdateHead: func(Ctx context.Context, headHeight, headTime uint64, hash common.Hash, td *uint256.Int) {
|
||||
},
|
||||
PeerId: gointerfaces.ConvertHashToH512([64]byte{0x12, 0x34, 0x50}), // "12345"
|
||||
HistoryV3: ethconfig.EnableHistoryV3InTest,
|
||||
BlockSnapshots: snapshotsync.NewRoSnapshots(ethconfig.Defaults.Snapshot, dirs.Snap),
|
||||
HistoryV3: cfg.HistoryV3,
|
||||
}
|
||||
if t != nil {
|
||||
t.Cleanup(mock.Close)
|
||||
@ -241,30 +270,6 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
|
||||
sendHeaderRequest := func(_ context.Context, r *headerdownload.HeaderRequest) ([64]byte, bool) { return [64]byte{}, false }
|
||||
propagateNewBlockHashes := func(context.Context, []headerdownload.Announce) {}
|
||||
penalize := func(context.Context, []headerdownload.PenaltyItem) {}
|
||||
cfg := ethconfig.Defaults
|
||||
cfg.HistoryV3 = mock.HistoryV3
|
||||
cfg.StateStream = true
|
||||
cfg.BatchSize = 1 * datasize.MB
|
||||
cfg.Sync.BodyDownloadTimeoutSeconds = 10
|
||||
cfg.DeprecatedTxPool.Disable = !withTxPool
|
||||
cfg.DeprecatedTxPool.StartOnInit = true
|
||||
|
||||
_ = db.Update(ctx, func(tx kv.RwTx) error {
|
||||
_, _ = kvcfg.HistoryV3.WriteOnce(tx, cfg.HistoryV3)
|
||||
return nil
|
||||
})
|
||||
|
||||
if cfg.HistoryV3 {
|
||||
dir.MustExist(dirs.SnapHistory)
|
||||
mock.agg, err = libstate.NewAggregator22(dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, db)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := mock.agg.ReopenFiles(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
mock.SentryClient = direct.NewSentryClientDirect(eth.ETH66, mock)
|
||||
sentries := []direct.SentryClient{mock.SentryClient}
|
||||
@ -646,7 +651,7 @@ func (ms *MockSentry) NewHistoricalStateReader(blockNum uint64, tx kv.Tx) state.
|
||||
if ms.HistoryV3 {
|
||||
aggCtx := ms.agg.MakeContext()
|
||||
aggCtx.SetTx(tx)
|
||||
r := state.NewHistoryReader22(aggCtx)
|
||||
r := state.NewHistoryReaderV3(aggCtx)
|
||||
r.SetTx(tx)
|
||||
minTxNum, err := rawdb.TxNums.Min(tx, blockNum)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user