e3: reverse/limited iterators, stream tooling ()

This commit is contained in:
Alex Sharov 2023-01-20 18:08:20 +07:00 committed by GitHub
parent d6c330f91b
commit b71725ecb3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 195 additions and 205 deletions

View File

@ -1288,7 +1288,7 @@ func iterate(filename string, prefix string) error {
fmt.Printf("[%x] =>", key)
cnt := 0
for efIt.HasNext() {
txNum := efIt.Next()
txNum, _ := efIt.Next()
var txKey [8]byte
binary.BigEndian.PutUint64(txKey[:], txNum)
offset := r.Lookup2(txKey[:], key)

View File

@ -2,7 +2,6 @@ package commands
import (
"bytes"
"context"
"encoding/json"
"reflect"
"testing"
@ -58,15 +57,15 @@ func TestTraceBlockByNumber(t *testing.T) {
for _, tt := range debugTraceTransactionTests {
var buf bytes.Buffer
stream := jsoniter.NewStream(jsoniter.ConfigDefault, &buf, 4096)
tx, err := ethApi.GetTransactionByHash(context.Background(), libcommon.HexToHash(tt.txHash))
tx, err := ethApi.GetTransactionByHash(m.Ctx, libcommon.HexToHash(tt.txHash))
if err != nil {
t.Errorf("traceBlock %s: %v", tt.txHash, err)
}
txcount, err := ethApi.GetBlockTransactionCountByHash(context.Background(), *tx.BlockHash)
txcount, err := ethApi.GetBlockTransactionCountByHash(m.Ctx, *tx.BlockHash)
if err != nil {
t.Errorf("traceBlock %s: %v", tt.txHash, err)
}
err = api.TraceBlockByNumber(context.Background(), rpc.BlockNumber(tx.BlockNumber.ToInt().Uint64()), &tracers.TraceConfig{}, stream)
err = api.TraceBlockByNumber(m.Ctx, rpc.BlockNumber(tx.BlockNumber.ToInt().Uint64()), &tracers.TraceConfig{}, stream)
if err != nil {
t.Errorf("traceBlock %s: %v", tt.txHash, err)
}
@ -83,7 +82,7 @@ func TestTraceBlockByNumber(t *testing.T) {
}
var buf bytes.Buffer
stream := jsoniter.NewStream(jsoniter.ConfigDefault, &buf, 4096)
err := api.TraceBlockByNumber(context.Background(), rpc.LatestBlockNumber, &tracers.TraceConfig{}, stream)
err := api.TraceBlockByNumber(m.Ctx, rpc.LatestBlockNumber, &tracers.TraceConfig{}, stream)
if err != nil {
t.Errorf("traceBlock %v: %v", rpc.LatestBlockNumber, err)
}
@ -143,7 +142,7 @@ func TestTraceTransaction(t *testing.T) {
for _, tt := range debugTraceTransactionTests {
var buf bytes.Buffer
stream := jsoniter.NewStream(jsoniter.ConfigDefault, &buf, 4096)
err := api.TraceTransaction(context.Background(), libcommon.HexToHash(tt.txHash), &tracers.TraceConfig{}, stream)
err := api.TraceTransaction(m.Ctx, libcommon.HexToHash(tt.txHash), &tracers.TraceConfig{}, stream)
if err != nil {
t.Errorf("traceTransaction %s: %v", tt.txHash, err)
}
@ -177,7 +176,7 @@ func TestTraceTransactionNoRefund(t *testing.T) {
var buf bytes.Buffer
stream := jsoniter.NewStream(jsoniter.ConfigDefault, &buf, 4096)
var norefunds = true
err := api.TraceTransaction(context.Background(), libcommon.HexToHash(tt.txHash), &tracers.TraceConfig{NoRefunds: &norefunds}, stream)
err := api.TraceTransaction(m.Ctx, libcommon.HexToHash(tt.txHash), &tracers.TraceConfig{NoRefunds: &norefunds}, stream)
if err != nil {
t.Errorf("traceTransaction %s: %v", tt.txHash, err)
}
@ -207,6 +206,19 @@ func TestStorageRangeAt(t *testing.T) {
api := NewPrivateDebugAPI(
NewBaseApi(nil, kvcache.New(kvcache.DefaultCoherentConfig), br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine),
m.DB, 0)
t.Run("invalid addr", func(t *testing.T) {
var block4 *types.Block
err := m.DB.View(m.Ctx, func(tx kv.Tx) error {
block4, _ = rawdb.ReadBlockByNumber(tx, 4)
return nil
})
require.NoError(t, err)
addr := libcommon.HexToAddress("0x537e697c7ab75a26f9ecf0ce810e3154dfcaaf55")
expect := StorageRangeResult{storageMap{}, nil}
result, err := api.StorageRangeAt(m.Ctx, block4.Hash(), 0, addr, nil, 100)
require.NoError(t, err)
require.Equal(t, expect, result)
})
t.Run("block 4, addr 1", func(t *testing.T) {
var block4 *types.Block
err := m.DB.View(m.Ctx, func(tx kv.Tx) error {

View File

@ -394,7 +394,7 @@ func (api *ErigonImpl) GetBlockReceiptsByBlockHash(ctx context.Context, cannonic
result := make([]map[string]interface{}, 0, len(receipts))
for _, receipt := range receipts {
txn := block.Transactions()[receipt.TransactionIndex]
result = append(result, marshalReceipt(receipt, txn, chainConfig, block, txn.Hash(), true))
result = append(result, marshalReceipt(receipt, txn, chainConfig, block.HeaderNoCopy(), txn.Hash(), true))
}
if chainConfig.Bor != nil {
@ -405,7 +405,7 @@ func (api *ErigonImpl) GetBlockReceiptsByBlockHash(ctx context.Context, cannonic
return nil, err
}
if borReceipt != nil {
result = append(result, marshalReceipt(borReceipt, borTx, chainConfig, block, borReceipt.TxHash, false))
result = append(result, marshalReceipt(borReceipt, borTx, chainConfig, block.HeaderNoCopy(), borReceipt.TxHash, false))
}
}
}

View File

@ -414,7 +414,7 @@ func (api *APIImpl) getLogsV3(ctx context.Context, tx kv.TemporalTx, begin, end
if txn == nil {
continue
}
rawLogs, err := exec.execTx(txNum, txIndex, txn)
rawLogs, _, err := exec.execTx(txNum, txIndex, txn)
if err != nil {
return nil, err
}
@ -482,7 +482,7 @@ func (e *intraBlockExec) changeBlock(header *types.Header) {
e.vmConfig.SkipAnalysis = core.SkipAnalysis(e.chainConfig, e.blockNum)
}
func (e *intraBlockExec) execTx(txNum uint64, txIndex int, txn types.Transaction) ([]*types.Log, error) {
func (e *intraBlockExec) execTx(txNum uint64, txIndex int, txn types.Transaction) ([]*types.Log, *core.ExecutionResult, error) {
e.stateReader.SetTxNum(txNum)
txHash := txn.Hash()
e.ibs.Reset()
@ -490,14 +490,14 @@ func (e *intraBlockExec) execTx(txNum uint64, txIndex int, txn types.Transaction
gp := new(core.GasPool).AddGas(txn.GetGas())
msg, err := txn.AsMessage(*e.signer, e.header.BaseFee, e.rules)
if err != nil {
return nil, err
return nil, nil, err
}
e.evm.ResetBetweenBlocks(*e.blockCtx, core.NewEVMTxContext(msg), e.ibs, *e.vmConfig, e.rules)
_, err = core.ApplyMessage(e.evm, msg, gp, true /* refunds */, false /* gasBailout */)
res, err := core.ApplyMessage(e.evm, msg, gp, true /* refunds */, false /* gasBailout */)
if err != nil {
return nil, fmt.Errorf("%w: blockNum=%d, txNum=%d, %s", err, e.blockNum, txNum, e.ibs.Error())
return nil, nil, fmt.Errorf("%w: blockNum=%d, txNum=%d, %s", err, e.blockNum, txNum, e.ibs.Error())
}
return e.ibs.GetLogs(txHash), nil
return e.ibs.GetLogs(txHash), res, nil
}
// The Topic list restricts matches to particular event topics. Each event has a list
@ -518,11 +518,11 @@ func getTopicsBitmapV3(tx kv.TemporalTx, topics [][]libcommon.Hash, from, to uin
defer bitmapdb.ReturnToPool64(bitmapForORing)
for _, topic := range sub {
it, err := tx.IndexRange(temporal.LogTopicIdx, topic.Bytes(), from, to)
it, err := tx.IndexRange(temporal.LogTopicIdx, topic.Bytes(), from, to, true, -1)
if err != nil {
return nil, err
}
bm, err := it.ToBitmap()
bm, err := it.(bitmapdb.ToBitmap).ToBitmap()
if err != nil {
return nil, err
}
@ -552,11 +552,11 @@ func getAddrsBitmapV3(tx kv.TemporalTx, addrs []libcommon.Address, from, to uint
}
}()
for idx, addr := range addrs {
it, err := tx.IndexRange(temporal.LogAddrIdx, addr[:], from, to)
it, err := tx.IndexRange(temporal.LogAddrIdx, addr[:], from, to, true, -1)
if err != nil {
return nil, err
}
rx[idx], err = it.ToBitmap()
rx[idx], err = it.(bitmapdb.ToBitmap).ToBitmap()
if err != nil {
return nil, err
}
@ -645,10 +645,10 @@ func (api *APIImpl) GetTransactionReceipt(ctx context.Context, txnHash libcommon
if borReceipt == nil {
return nil, nil
}
return marshalReceipt(borReceipt, borTx, cc, block, txnHash, false), nil
return marshalReceipt(borReceipt, borTx, cc, block.HeaderNoCopy(), txnHash, false), nil
}
return marshalReceipt(receipts[txnIndex], block.Transactions()[txnIndex], cc, block, txnHash, true), nil
return marshalReceipt(receipts[txnIndex], block.Transactions()[txnIndex], cc, block.HeaderNoCopy(), txnHash, true), nil
}
// GetBlockReceipts - receipts for individual block
@ -682,7 +682,7 @@ func (api *APIImpl) GetBlockReceipts(ctx context.Context, number rpc.BlockNumber
result := make([]map[string]interface{}, 0, len(receipts))
for _, receipt := range receipts {
txn := block.Transactions()[receipt.TransactionIndex]
result = append(result, marshalReceipt(receipt, txn, chainConfig, block, txn.Hash(), true))
result = append(result, marshalReceipt(receipt, txn, chainConfig, block.HeaderNoCopy(), txn.Hash(), true))
}
if chainConfig.Bor != nil {
@ -693,7 +693,7 @@ func (api *APIImpl) GetBlockReceipts(ctx context.Context, number rpc.BlockNumber
return nil, err
}
if borReceipt != nil {
result = append(result, marshalReceipt(borReceipt, borTx, chainConfig, block, borReceipt.TxHash, false))
result = append(result, marshalReceipt(borReceipt, borTx, chainConfig, block.HeaderNoCopy(), borReceipt.TxHash, false))
}
}
}
@ -701,7 +701,7 @@ func (api *APIImpl) GetBlockReceipts(ctx context.Context, number rpc.BlockNumber
return result, nil
}
func marshalReceipt(receipt *types.Receipt, txn types.Transaction, chainConfig *chain.Config, block *types.Block, txnHash libcommon.Hash, signed bool) map[string]interface{} {
func marshalReceipt(receipt *types.Receipt, txn types.Transaction, chainConfig *chain.Config, header *types.Header, txnHash libcommon.Hash, signed bool) map[string]interface{} {
var chainId *big.Int
switch t := txn.(type) {
case *types.LegacyTx:
@ -735,11 +735,11 @@ func marshalReceipt(receipt *types.Receipt, txn types.Transaction, chainConfig *
"logsBloom": types.CreateBloom(types.Receipts{receipt}),
}
if !chainConfig.IsLondon(block.NumberU64()) {
if !chainConfig.IsLondon(header.Number.Uint64()) {
fields["effectiveGasPrice"] = hexutil.Uint64(txn.GetPrice().Uint64())
} else {
baseFee, _ := uint256.FromBig(block.BaseFee())
gasPrice := new(big.Int).Add(block.BaseFee(), txn.GetEffectiveGasTip(baseFee).ToBig())
baseFee, _ := uint256.FromBig(header.BaseFee)
gasPrice := new(big.Int).Add(header.BaseFee, txn.GetEffectiveGasTip(baseFee).ToBig())
fields["effectiveGasPrice"] = hexutil.Uint64(gasPrice.Uint64())
}
// Assign receipt status.

View File

@ -11,7 +11,6 @@ import (
"github.com/ledgerwatch/erigon-lib/chain"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/common/hexutil"
"github.com/ledgerwatch/erigon/consensus/ethash"
"github.com/ledgerwatch/erigon/core"
@ -481,7 +480,7 @@ func (api *OtterscanAPIImpl) GetBlockTransactions(ctx context.Context, number rp
result := make([]map[string]interface{}, 0, len(receipts))
for _, receipt := range receipts {
txn := b.Transactions()[receipt.TransactionIndex]
marshalledRcpt := marshalReceipt(receipt, txn, chainConfig, b, txn.Hash(), true)
marshalledRcpt := marshalReceipt(receipt, txn, chainConfig, b.HeaderNoCopy(), txn.Hash(), true)
marshalledRcpt["logs"] = nil
marshalledRcpt["logsBloom"] = nil
result = append(result, marshalledRcpt)

View File

@ -5,6 +5,12 @@ import (
"testing"
"github.com/RoaringBitmap/roaring/roaring64"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/rpcdaemontest"
"github.com/ledgerwatch/erigon/common/hexutil"
"github.com/ledgerwatch/erigon/rpc/rpccfg"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/stretchr/testify/require"
)
func newMockBackwardChunkLocator(chunks [][]byte) ChunkLocator {
@ -141,3 +147,52 @@ func TestBackwardBlockProviderWithMultipleChunksBlockNotFound(t *testing.T) {
checkNext(t, blockProvider, 0, false)
}
func TestSearchTransactionsBefore(t *testing.T) {
m, _, _ := rpcdaemontest.CreateTestSentry(t)
agg, require := m.HistoryV3Components(), require.New(t)
br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots)
api := NewOtterscanAPI(NewBaseApi(nil, nil, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB)
addr := libcommon.HexToAddress("0x537e697c7ab75a26f9ecf0ce810e3154dfcaaf44")
t.Run("small page size", func(t *testing.T) {
results, err := api.SearchTransactionsBefore(m.Ctx, addr, 10, 2)
require.NoError(err)
require.False(results.FirstPage)
require.False(results.LastPage)
require.Equal(2, len(results.Txs))
require.Equal(2, len(results.Receipts))
})
t.Run("big page size", func(t *testing.T) {
results, err := api.SearchTransactionsBefore(m.Ctx, addr, 10, 10)
require.NoError(err)
require.False(results.FirstPage)
require.True(results.LastPage)
require.Equal(3, len(results.Txs))
require.Equal(3, len(results.Receipts))
})
t.Run("filter last block", func(t *testing.T) {
results, err := api.SearchTransactionsBefore(m.Ctx, addr, 5, 10)
require.NoError(err)
require.False(results.FirstPage)
require.True(results.LastPage)
require.Equal(2, len(results.Txs))
require.Equal(2, len(results.Receipts))
require.Equal(4, int(results.Txs[0].BlockNumber.ToInt().Uint64()))
require.Equal(0, int(results.Txs[0].Nonce))
require.Equal(4, int(results.Receipts[0]["blockNumber"].(hexutil.Uint64)))
require.Equal(libcommon.HexToHash("0x79491e16fd1b1ceea44c46af850b2ef121683055cd579fd4d877beba22e77c1c"), results.Receipts[0]["transactionHash"].(libcommon.Hash))
require.Equal(libcommon.HexToAddress("0x0D3ab14BBaD3D99F4203bd7a11aCB94882050E7e"), results.Receipts[0]["from"].(libcommon.Address))
require.Equal(addr, *results.Receipts[0]["to"].(*libcommon.Address))
require.Equal(3, int(results.Txs[1].BlockNumber.ToInt().Uint64()))
require.Equal(2, int(results.Txs[1].Nonce))
require.Equal(3, int(results.Receipts[1]["blockNumber"].(hexutil.Uint64)))
require.Equal(libcommon.HexToHash("0x79491e16fd1b1ceea44c46af850b2ef121683055cd579fd4d877beba22e77c1c"), results.Receipts[0]["transactionHash"].(libcommon.Hash))
require.Equal(libcommon.HexToAddress("0x0D3ab14BBaD3D99F4203bd7a11aCB94882050E7e"), results.Receipts[0]["from"].(libcommon.Address))
require.Equal(addr, *results.Receipts[0]["to"].(*libcommon.Address))
})
}

View File

@ -93,7 +93,7 @@ func (api *OtterscanAPIImpl) traceBlock(dbtx kv.Tx, ctx context.Context, blockNu
if tracer.Found {
rpcTx := newRPCTransaction(tx, block.Hash(), blockNum, uint64(idx), block.BaseFee())
mReceipt := marshalReceipt(blockReceipts[idx], tx, chainConfig, block, tx.Hash(), true)
mReceipt := marshalReceipt(blockReceipts[idx], tx, chainConfig, block.HeaderNoCopy(), tx.Hash(), true)
mReceipt["timestamp"] = block.Time()
rpcTxs = append(rpcTxs, rpcTx)
receipts = append(receipts, mReceipt)

View File

@ -110,7 +110,7 @@ func TestReplayBlockTransactions(t *testing.T) {
// Call GetTransactionReceipt for transaction which is not in the database
n := rpc.BlockNumber(6)
results, err := api.ReplayBlockTransactions(context.Background(), rpc.BlockNumberOrHash{BlockNumber: &n}, []string{"stateDiff"})
results, err := api.ReplayBlockTransactions(m.Ctx, rpc.BlockNumberOrHash{BlockNumber: &n}, []string{"stateDiff"})
if err != nil {
t.Errorf("calling ReplayBlockTransactions: %v", err)
}

View File

@ -243,11 +243,11 @@ func traceFilterBitmaps(tx kv.Tx, req TraceFilterRequest, from, to uint64) (from
if ttx, casted := tx.(kv.TemporalTx); casted {
for _, addr := range req.FromAddress {
if addr != nil {
it, err := ttx.IndexRange(temporal.TracesFromIdx, addr.Bytes(), from, to)
it, err := ttx.IndexRange(temporal.TracesFromIdx, addr.Bytes(), from, to, true, -1)
if errors.Is(err, ethdb.ErrKeyNotFound) {
continue
}
b, err := it.ToBitmap()
b, err := it.(bitmapdb.ToBitmap).ToBitmap()
if err != nil {
return nil, nil, nil, err
}
@ -258,11 +258,11 @@ func traceFilterBitmaps(tx kv.Tx, req TraceFilterRequest, from, to uint64) (from
for _, addr := range req.ToAddress {
if addr != nil {
it, err := ttx.IndexRange(temporal.TracesToIdx, addr.Bytes(), from, to)
it, err := ttx.IndexRange(temporal.TracesToIdx, addr.Bytes(), from, to, true, -1)
if errors.Is(err, ethdb.ErrKeyNotFound) {
continue
}
b, err := it.ToBitmap()
b, err := it.(bitmapdb.ToBitmap).ToBitmap()
if err != nil {
return nil, nil, nil, err
}

View File

@ -72,7 +72,7 @@ func (s *SentinelServer) SendRequest(_ context.Context, req *sentinelrpc.Request
// Wait a bit to not exhaust CPU and skip.
continue
}
log.Debug("Sent request", "pid", pid)
log.Trace("[sentinel] Sent request", "pid", pid)
s.sentinel.Peers().PeerDoRequest(pid)
go func() {
data, isError, err := communication.SendRequestRawToPeer(s.ctx, s.sentinel.Host(), req.Data, req.Topic, pid)

View File

@ -394,7 +394,7 @@ func GenerateChain(config *chain.Config, parent *types.Block, engine consensus.E
c.Close()
if GenerateTrace {
fmt.Printf("State after %d================\n", b.header.Number)
it, err := tx.Range(kv.HashedAccounts, nil, nil)
it, err := tx.Stream(kv.HashedAccounts, nil, nil)
if err != nil {
return nil, nil, err
}
@ -406,7 +406,7 @@ func GenerateChain(config *chain.Config, parent *types.Block, engine consensus.E
fmt.Printf("%x: %x\n", k, v)
}
fmt.Printf("..................\n")
it, err = tx.Range(kv.HashedStorage, nil, nil)
it, err = tx.Stream(kv.HashedStorage, nil, nil)
if err != nil {
return nil, nil, err
}

View File

@ -5,14 +5,11 @@ import (
"encoding/binary"
"fmt"
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/length"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/bitmapdb"
"github.com/ledgerwatch/erigon-lib/kv/kvcfg"
"github.com/ledgerwatch/erigon-lib/kv/stream"
"github.com/ledgerwatch/erigon-lib/kv/temporal/historyv2"
"github.com/ledgerwatch/erigon-lib/state"
)
@ -27,7 +24,7 @@ import (
//Methods Naming:
// Get: exact match of criterias
// Range: [from, to)
// Dual: [from, to)
// Each: [from, INF)
// Prefix: Has(k, prefix)
// Amount: [from, INF) AND maximum N records
@ -38,8 +35,7 @@ type tParseIncarnation func(v []byte) (uint64, error)
type DB struct {
kv.RwDB
agg *state.AggregatorV3
hitoryV3 bool
agg *state.AggregatorV3
convertV3toV2 tConvertV3toV2
restoreCodeHash tRestoreCodeHash
@ -47,23 +43,20 @@ type DB struct {
}
func New(kv kv.RwDB, agg *state.AggregatorV3, cb1 tConvertV3toV2, cb2 tRestoreCodeHash, cb3 tParseIncarnation) *DB {
return &DB{RwDB: kv, agg: agg, hitoryV3: kvcfg.HistoryV3.FromDB(kv), convertV3toV2: cb1, restoreCodeHash: cb2, parseInc: cb3}
if !kvcfg.HistoryV3.FromDB(kv) {
panic("not supported")
}
return &DB{RwDB: kv, agg: agg, convertV3toV2: cb1, restoreCodeHash: cb2, parseInc: cb3}
}
func (db *DB) BeginTemporalRo(ctx context.Context) (kv.TemporalTx, error) {
kvTx, err := db.RwDB.BeginRo(ctx)
if err != nil {
return nil, err
}
tx := &Tx{Tx: kvTx, hitoryV3: db.hitoryV3, db: db}
if db.hitoryV3 {
tx.agg = db.agg.MakeContext()
tx.agg.SetTx(kvTx)
} else {
tx.accHistoryC, _ = tx.Cursor(kv.AccountsHistory)
tx.storageHistoryC, _ = tx.Cursor(kv.StorageHistory)
tx.accChangesC, _ = tx.CursorDupSort(kv.AccountChangeSet)
tx.storageChangesC, _ = tx.CursorDupSort(kv.StorageChangeSet)
}
tx := &Tx{Tx: kvTx, db: db}
tx.agg = db.agg.MakeContext()
tx.agg.SetTx(kvTx)
return tx, nil
}
func (db *DB) ViewTemporal(ctx context.Context, f func(tx kv.TemporalTx) error) error {
@ -90,15 +83,8 @@ func (db *DB) View(ctx context.Context, f func(tx kv.Tx) error) error {
type Tx struct {
kv.Tx
db *DB
agg *state.Aggregator22Context
//HistoryV2 fields
accHistoryC, storageHistoryC kv.Cursor
accChangesC, storageChangesC kv.CursorDupSort
//HistoryV3 fields
hitoryV3 bool
db *DB
agg *state.Aggregator22Context
resourcesToClose []kv.Closer
}
@ -136,56 +122,47 @@ const (
TracesToIdx kv.InvertedIdx = "TracesToIdx"
)
func (tx *Tx) DomainRangeAscend(name kv.Domain, k1, fromKey []byte, asOfTs uint64, limit int) (pairs kv.Pairs, err error) {
if tx.hitoryV3 {
switch name {
case AccountsDomain:
panic("not implemented yet")
case StorageDomain:
//it := tx.agg.StorageHistoryRIterateChanged(asOfTs, math.MaxUint64, tx)
toKey, _ := kv.NextSubtree(k1)
fromKey2 := append(common.Copy(k1), fromKey...)
it := tx.agg.StorageHistoricalStateRange(asOfTs, fromKey2, toKey, limit, tx)
func (tx *Tx) DomainRangeAscend(name kv.Domain, k1, fromKey []byte, asOfTs uint64, limit int) (pairs stream.Kv, err error) {
switch name {
case AccountsDomain:
panic("not implemented yet")
case StorageDomain:
//it := tx.agg.StorageHistoryRIterateChanged(asOfTs, math.MaxUint64, tx)
toKey, _ := kv.NextSubtree(k1)
fromKey2 := append(common.Copy(k1), fromKey...)
it := tx.agg.StorageHistoricalStateRange(asOfTs, fromKey2, toKey, limit, tx)
accData, err := tx.GetOne(kv.PlainState, k1)
if err != nil {
return nil, err
}
inc, err := tx.db.parseInc(accData)
if err != nil {
return nil, err
}
startkey := make([]byte, length.Addr+length.Incarnation+length.Hash)
copy(startkey, k1)
binary.BigEndian.PutUint64(startkey[length.Addr:], inc)
copy(startkey[length.Addr+length.Incarnation:], fromKey)
toPrefix := make([]byte, length.Addr+length.Incarnation)
copy(toPrefix, k1)
binary.BigEndian.PutUint64(toPrefix[length.Addr:], inc+1)
it2, err := tx.RangeAscend(kv.PlainState, startkey, toPrefix, limit)
if err != nil {
return nil, err
}
it3 := stream.TransformPairs(it2, func(k, v []byte) ([]byte, []byte) {
return append(append([]byte{}, k[:20]...), k[28:]...), v
})
//for it3.HasNext() {
// k, v, err := it3.Next()
// fmt.Printf("PlainState: %x, %x, %s\n", k, v, err)
//}
//TODO: seems MergePairs can't handle "amount" request
return stream.MergePairs(it, it3), nil
case CodeDomain:
panic("not implemented yet")
default:
panic(fmt.Sprintf("unexpected: %s", name))
accData, err := tx.GetOne(kv.PlainState, k1)
if err != nil {
return nil, err
}
}
inc, err := tx.db.parseInc(accData)
if err != nil {
return nil, err
}
startkey := make([]byte, length.Addr+length.Incarnation+length.Hash)
copy(startkey, k1)
binary.BigEndian.PutUint64(startkey[length.Addr:], inc)
copy(startkey[length.Addr+length.Incarnation:], fromKey)
panic("not implemented yet")
toPrefix := make([]byte, length.Addr+length.Incarnation)
copy(toPrefix, k1)
binary.BigEndian.PutUint64(toPrefix[length.Addr:], inc+1)
it2, err := tx.StreamAscend(kv.PlainState, startkey, toPrefix, limit)
if err != nil {
return nil, err
}
it3 := stream.TransformPairs(it2, func(k, v []byte) ([]byte, []byte) {
return append(append([]byte{}, k[:20]...), k[28:]...), v
})
//TODO: seems MergePairs can't handle "amount" request
return stream.UnionPairs(it, it3), nil
case CodeDomain:
panic("not implemented yet")
default:
panic(fmt.Sprintf("unexpected: %s", name))
}
}
func (tx *Tx) DomainGet(name kv.Domain, key, key2 []byte, ts uint64) (v []byte, ok bool, err error) {
switch name {
@ -200,36 +177,22 @@ func (tx *Tx) DomainGet(name kv.Domain, key, key2 []byte, ts uint64) (v []byte,
v, err = tx.GetOne(kv.PlainState, key)
return v, v != nil, err
case StorageDomain:
if tx.hitoryV3 {
v, ok, err = tx.HistoryGet(StorageHistory, append(key[:20], key2...), ts)
if err != nil {
return nil, false, err
}
if ok {
return v, true, nil
}
} else {
v, ok, err = tx.HistoryGet(StorageHistory, append(key, key2...), ts)
if err != nil {
return nil, false, err
}
if ok {
return v, true, nil
}
v, ok, err = tx.HistoryGet(StorageHistory, append(key[:20], key2...), ts)
if err != nil {
return nil, false, err
}
if ok {
return v, true, nil
}
v, err = tx.GetOne(kv.PlainState, append(key, key2...))
return v, v != nil, err
case CodeDomain:
if tx.hitoryV3 {
v, ok, err = tx.HistoryGet(CodeHistory, key, ts)
if err != nil {
return nil, false, err
}
if ok {
return v, true, nil
}
v, err = tx.GetOne(kv.Code, key2)
return v, v != nil, err
v, ok, err = tx.HistoryGet(CodeHistory, key, ts)
if err != nil {
return nil, false, err
}
if ok {
return v, true, nil
}
v, err = tx.GetOne(kv.Code, key2)
return v, v != nil, err
@ -239,54 +202,30 @@ func (tx *Tx) DomainGet(name kv.Domain, key, key2 []byte, ts uint64) (v []byte,
}
func (tx *Tx) HistoryGet(name kv.History, key []byte, ts uint64) (v []byte, ok bool, err error) {
if tx.hitoryV3 {
switch name {
case AccountsHistory:
v, ok, err = tx.agg.ReadAccountDataNoStateWithRecent(key, ts)
if err != nil {
return nil, false, err
}
if !ok || len(v) == 0 {
return v, ok, nil
}
v, err = tx.db.convertV3toV2(v)
if err != nil {
return nil, false, err
}
v, err = tx.db.restoreCodeHash(tx.Tx, key, v)
if err != nil {
return nil, false, err
}
return v, true, nil
case StorageHistory:
return tx.agg.ReadAccountStorageNoStateWithRecent2(key, ts)
case CodeHistory:
return tx.agg.ReadAccountCodeNoStateWithRecent(key, ts)
default:
panic(fmt.Sprintf("unexpected: %s", name))
}
}
switch name {
case AccountsHistory:
v, ok, err = historyv2.FindByHistory(tx.accHistoryC, tx.accChangesC, false, key, ts)
v, ok, err = tx.agg.ReadAccountDataNoStateWithRecent(key, ts)
if err != nil {
return nil, false, err
}
if !ok || len(v) == 0 {
return v, ok, nil
}
v, err = tx.db.convertV3toV2(v)
if err != nil {
return nil, false, err
}
v, err = tx.db.restoreCodeHash(tx.Tx, key, v)
if err != nil {
return nil, false, err
}
return v, true, nil
case StorageHistory:
return historyv2.FindByHistory(tx.storageHistoryC, tx.storageChangesC, true, key, ts)
return tx.agg.ReadAccountStorageNoStateWithRecent2(key, ts)
case CodeHistory:
return nil, false, fmt.Errorf("ErigonV2 doesn't support CodeHistory")
return tx.agg.ReadAccountCodeNoStateWithRecent(key, ts)
default:
return nil, false, fmt.Errorf("unexpected history name: %s", name)
panic(fmt.Sprintf("unexpected: %s", name))
}
}
@ -302,57 +241,42 @@ type Cursor struct {
hitoryV3 bool
}
// [fromTs, toTs)
func (tx *Tx) IndexRange(name kv.InvertedIdx, key []byte, fromTs, toTs uint64) (timestamps kv.U64Stream, err error) {
if tx.hitoryV3 {
switch name {
case LogTopicIdx:
t := tx.agg.LogTopicIterator(key, fromTs, toTs, tx)
tx.resourcesToClose = append(tx.resourcesToClose, t)
return t, nil
case LogAddrIdx:
t := tx.agg.LogAddrIterator(key, fromTs, toTs, tx)
tx.resourcesToClose = append(tx.resourcesToClose, t)
return t, nil
case TracesFromIdx:
t := tx.agg.TraceFromIterator(key, fromTs, toTs, tx)
tx.resourcesToClose = append(tx.resourcesToClose, t)
return t, nil
case TracesToIdx:
t := tx.agg.TraceToIterator(key, fromTs, toTs, tx)
tx.resourcesToClose = append(tx.resourcesToClose, t)
return t, nil
default:
return nil, fmt.Errorf("unexpected history name: %s", name)
}
}
func (tx *Tx) IndexRange(name kv.InvertedIdx, key []byte, fromTs, toTs uint64, orderAscend bool, limit int) (timestamps stream.U64, err error) {
return tx.IndexStream(name, key, fromTs, toTs, orderAscend, limit)
}
var bm *roaring64.Bitmap
// [fromTs, toTs)
func (tx *Tx) IndexStream(name kv.InvertedIdx, key []byte, fromTs, toTs uint64, orderAscend bool, limit int) (timestamps stream.U64, err error) {
switch name {
case LogTopicIdx:
bm32, err := bitmapdb.Get(tx, kv.LogTopicIndex, key, uint32(fromTs), uint32(toTs))
t, err := tx.agg.LogTopicIterator(key, fromTs, toTs, tx)
if err != nil {
return nil, err
}
bm = bitmapdb.CastBitmapTo64(bm32)
tx.resourcesToClose = append(tx.resourcesToClose, t)
return t, nil
case LogAddrIdx:
bm32, err := bitmapdb.Get(tx, kv.LogAddressIndex, key, uint32(fromTs), uint32(toTs))
t, err := tx.agg.LogAddrIterator(key, fromTs, toTs, tx)
if err != nil {
return nil, err
}
bm = bitmapdb.CastBitmapTo64(bm32)
tx.resourcesToClose = append(tx.resourcesToClose, t)
return t, nil
case TracesFromIdx:
bm, err = bitmapdb.Get64(tx, kv.CallFromIndex, key, fromTs, toTs)
t, err := tx.agg.TraceFromIterator(key, fromTs, toTs, orderAscend, limit, tx)
if err != nil {
return nil, err
}
tx.resourcesToClose = append(tx.resourcesToClose, t)
return t, nil
case TracesToIdx:
bm, err = bitmapdb.Get64(tx, kv.CallToIndex, key, fromTs, toTs)
t, err := tx.agg.TraceToIterator(key, fromTs, toTs, orderAscend, limit, tx)
if err != nil {
return nil, err
}
tx.resourcesToClose = append(tx.resourcesToClose, t)
return t, nil
default:
return nil, fmt.Errorf("unexpected history name: %s", name)
}
return bitmapdb.NewBitmapStream(bm), nil
}

2
go.mod
View File

@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon
go 1.18
require (
github.com/ledgerwatch/erigon-lib v0.0.0-20230118232012-ab20c390190e
github.com/ledgerwatch/erigon-lib v0.0.0-20230120091748-2ea0cb82e919
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20230120022649-cd9409a200da
github.com/ledgerwatch/log/v3 v3.7.0
github.com/ledgerwatch/secp256k1 v1.0.0

4
go.sum
View File

@ -565,8 +565,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20230118232012-ab20c390190e h1:NSM1xDen45UITWjI5uZV3GahtqzFkgCRD/X6+OZyS1s=
github.com/ledgerwatch/erigon-lib v0.0.0-20230118232012-ab20c390190e/go.mod h1:1UHFnZQCpr37W397IJf68OxYv3iQmBTU9D7t3LUHbPo=
github.com/ledgerwatch/erigon-lib v0.0.0-20230120091748-2ea0cb82e919 h1:w9D+tV8V29cOk6v9iKNREb06cUtkqHB893wkoDw/GJo=
github.com/ledgerwatch/erigon-lib v0.0.0-20230120091748-2ea0cb82e919/go.mod h1:GAsj4lIZdXUGf0oSy6jSoG6nFZ+B0FK1KlcfR6JUuDY=
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20230120022649-cd9409a200da h1:lQQBOHzAUThkymfXJj/m07vAjyMx9XoMMy3OomaeOrA=
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20230120022649-cd9409a200da/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
github.com/ledgerwatch/log/v3 v3.7.0 h1:aFPEZdwZx4jzA3+/Pf8wNDN5tCI0cIolq/kfvgcM+og=