Store receipts separately - one record per tx (#1271)

* squash

* add --database flag to integration

* clean

* split to 2 buckets

* split to 2 buckets

* split to 2 buckets

* split to 2 buckets

* split to 2 buckets

* save progress

* save progress

* improve test

* improve test

* save progress

* change app logic

* change app logic

* return err from rawdb package

* don't clean automatically

* don't clean automatically

* clean

* clean

* clean

* don't rely on `make clean`

* improve cbor code

* clean

* clean

* clean

* fix tests

* rebase master

* stop on error: headers stage

* make TxDb walk and multiwalk safe

* Fix panics

Co-authored-by: Alexey Akhunov <akhounov@gmail.com>
This commit is contained in:
Alex Sharov 2020-10-25 15:38:55 +07:00 committed by GitHub
parent e9572ae981
commit 331dcd45eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
80 changed files with 1245 additions and 1934 deletions

View File

@ -211,7 +211,7 @@ func (b *SimulatedBackend) CodeAt(ctx context.Context, contract common.Address,
b.mu.Lock()
defer b.mu.Unlock()
dbtx, err1 := b.kv.Begin(ctx, nil, false)
dbtx, err1 := b.kv.Begin(ctx, nil, ethdb.RO)
if err1 != nil {
return nil, err1
}
@ -225,7 +225,7 @@ func (b *SimulatedBackend) BalanceAt(ctx context.Context, contract common.Addres
b.mu.Lock()
defer b.mu.Unlock()
dbtx, err1 := b.kv.Begin(ctx, nil, false)
dbtx, err1 := b.kv.Begin(ctx, nil, ethdb.RO)
if err1 != nil {
return nil, err1
}
@ -239,7 +239,7 @@ func (b *SimulatedBackend) NonceAt(ctx context.Context, contract common.Address,
b.mu.Lock()
defer b.mu.Unlock()
dbtx, err1 := b.kv.Begin(ctx, nil, false)
dbtx, err1 := b.kv.Begin(ctx, nil, ethdb.RO)
if err1 != nil {
return 0, err1
}
@ -253,7 +253,7 @@ func (b *SimulatedBackend) StorageAt(ctx context.Context, contract common.Addres
b.mu.Lock()
defer b.mu.Unlock()
dbtx, err1 := b.kv.Begin(ctx, nil, false)
dbtx, err1 := b.kv.Begin(ctx, nil, ethdb.RO)
if err1 != nil {
return nil, err1
}

View File

@ -37,6 +37,7 @@ import (
"github.com/ledgerwatch/turbo-geth/core/state"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/crypto"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/params"
)
@ -133,7 +134,7 @@ func TestNewSimulatedBackend(t *testing.T) {
if sim.blockchain.Config() != params.AllEthashProtocolChanges {
t.Errorf("expected sim blockchain config to equal params.AllEthashProtocolChanges, got %v", sim.config)
}
tx, err1 := sim.KV().Begin(context.Background(), nil, false)
tx, err1 := sim.KV().Begin(context.Background(), nil, ethdb.RO)
if err1 != nil {
t.Errorf("TestNewSimulatedBackend create tx: %v", err1)
}

View File

@ -30,6 +30,7 @@ import (
"github.com/ledgerwatch/turbo-geth/core/state"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/core/vm"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/params"
"github.com/ledgerwatch/turbo-geth/tests"
@ -211,7 +212,7 @@ func Main(ctx *cli.Context) error {
//postAlloc := state.DumpGenesisFormat(false, false, false)
collector := make(Alloc)
tx, err1 := db.Begin(context.Background(), nil, false)
tx, err1 := db.Begin(context.Background(), nil, ethdb.RO)
if err1 != nil {
return fmt.Errorf("transition cannot open tx: %v", err1)
}

View File

@ -281,7 +281,7 @@ func runCmd(ctx *cli.Context) error {
fmt.Println("Could not commit state: ", err)
os.Exit(1)
}
tx, err1 := db.KV().Begin(context.Background(), nil, false)
tx, err1 := db.KV().Begin(context.Background(), nil, ethdb.RO)
if err1 != nil {
return fmt.Errorf("transition cannot open tx: %v", err1)
}

View File

@ -109,7 +109,7 @@ func stateTestCmd(ctx *cli.Context) error {
// Test failed, mark as so and dump any state to aid debugging
result.Pass, *result.Error = false, err.Error()
if ctx.GlobalBool(DumpFlag.Name) && statedb != nil {
tx, err1 := tds.Database().(ethdb.HasKV).KV().Begin(context.Background(), nil, false)
tx, err1 := tds.Database().(ethdb.HasKV).KV().Begin(context.Background(), nil, ethdb.RO)
if err1 != nil {
return fmt.Errorf("transition cannot open tx: %v", err1)
}

View File

@ -1670,7 +1670,7 @@ func iterateOverCode(chaindata string) error {
func zstd(chaindata string) error {
db := ethdb.MustOpen(chaindata)
defer db.Close()
tx, errBegin := db.Begin(context.Background(), true)
tx, errBegin := db.Begin(context.Background(), ethdb.RW)
check(errBegin)
defer tx.Rollback()
@ -1696,7 +1696,7 @@ func zstd(chaindata string) error {
}
storageReceipts := types.Receipts{}
err = cbor.Unmarshal(&storageReceipts, v)
err = cbor.Unmarshal(&storageReceipts, bytes.NewReader(v))
check(err)
samples1 = append(samples1, v)
@ -1862,7 +1862,7 @@ func zstd(chaindata string) error {
func benchRlp(chaindata string) error {
db := ethdb.MustOpen(chaindata)
defer db.Close()
tx, err := db.Begin(context.Background(), true)
tx, err := db.Begin(context.Background(), ethdb.RW)
check(err)
defer tx.Rollback()
@ -1885,7 +1885,7 @@ func benchRlp(chaindata string) error {
var cbor_decode3 time.Duration
var cbor_compress time.Duration
bufSlice := make([]byte, 0, 100_000)
buf := bytes.NewBuffer(make([]byte, 0, 100_000))
compressBuf := make([]byte, 0, 100_000)
var samplesCbor [][]byte
@ -1902,7 +1902,7 @@ func benchRlp(chaindata string) error {
}
receipts := types.Receipts{}
err = cbor.Unmarshal(&receipts, v)
err = cbor.Unmarshal(&receipts, bytes.NewReader(v))
check(err)
select {
@ -1925,13 +1925,14 @@ func benchRlp(chaindata string) error {
blockNum := binary.BigEndian.Uint64(k)
storageReceipts := types.Receipts{}
err = cbor.Unmarshal(&storageReceipts, v)
err = cbor.Unmarshal(&storageReceipts, bytes.NewReader(v))
check(err)
t := time.Now()
err = cbor.Marshal(&bufSlice, storageReceipts)
buf.Reset()
err = cbor.Marshal(buf, storageReceipts)
cbor_encode += time.Since(t)
total_cbor += len(bufSlice)
total_cbor += buf.Len()
check(err)
t = time.Now()
@ -1941,7 +1942,7 @@ func benchRlp(chaindata string) error {
storageReceipts2 := types.Receipts{}
t = time.Now()
err = cbor.Unmarshal(&storageReceipts2, bufSlice)
err = cbor.Unmarshal(&storageReceipts2, bytes.NewReader(buf.Bytes()))
cbor_decode += time.Since(t)
check(err)
@ -2092,12 +2093,13 @@ func extracHeaders(chaindata string, block uint64) error {
func receiptSizes(chaindata string) error {
db := ethdb.MustOpen(chaindata)
defer db.Close()
tx, err := db.KV().Begin(context.Background(), nil, false)
tx, err := db.KV().Begin(context.Background(), nil, ethdb.RW)
if err != nil {
return err
}
defer tx.Rollback()
c := tx.Cursor(dbutils.BlockReceiptsPrefix)
c := tx.Cursor(dbutils.Log)
defer c.Close()
sizes := make(map[int]int)
for k, v, err := c.First(); k != nil; k, v, err = c.Next() {
@ -2114,6 +2116,9 @@ func receiptSizes(chaindata string) error {
}
sort.Ints(lens)
for _, l := range lens {
if sizes[l] < 100000 {
continue
}
fmt.Printf("%6d - %d\n", l, sizes[l])
}
return nil

View File

@ -154,6 +154,7 @@ func resetExec(db *ethdb.ObjectDatabase) error {
dbutils.PlainStorageChangeSetBucket,
dbutils.PlainContractCodeBucket,
dbutils.BlockReceiptsPrefix,
dbutils.Log,
dbutils.IncarnationMapBucket,
dbutils.CodeBucket,
); err != nil {

View File

@ -94,14 +94,14 @@ func syncBySmallSteps(ctx context.Context, chaindata string) error {
}
}
var tx ethdb.DbWithPendingMutations = ethdb.NewTxDbWithoutTransaction(db, true)
var tx ethdb.DbWithPendingMutations = ethdb.NewTxDbWithoutTransaction(db, ethdb.RW)
defer tx.Rollback()
cc, bc, st, progress := newSync(ch, db, tx, changeSetHook)
defer bc.Stop()
cc.SetDB(tx)
tx, err = tx.Begin(context.Background(), true)
tx, err = tx.Begin(ctx, ethdb.RO)
if err != nil {
return err
}

View File

@ -43,7 +43,7 @@ func NewPrivateDebugAPI(db ethdb.KV, dbReader ethdb.Database) *PrivateDebugAPIIm
// StorageRangeAt implements debug_storageRangeAt. Returns information about a range of storage locations (if any) for the given address.
func (api *PrivateDebugAPIImpl) StorageRangeAt(ctx context.Context, blockHash common.Hash, txIndex uint64, contractAddress common.Address, keyStart hexutil.Bytes, maxResult int) (StorageRangeResult, error) {
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return StorageRangeResult{}, err
}
@ -69,7 +69,7 @@ func (api *PrivateDebugAPIImpl) StorageRangeAt(ctx context.Context, blockHash co
// AccountRange implements debug_accountRange. Returns a range of accounts involved in the given block range
func (api *PrivateDebugAPIImpl) AccountRange(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash, startKey []byte, maxResults int, excludeCode, excludeStorage, excludeMissingPreimages bool) (state.IteratorDump, error) {
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return state.IteratorDump{}, err
}
@ -129,7 +129,7 @@ func (api *PrivateDebugAPIImpl) AccountRange(ctx context.Context, blockNrOrHash
// GetModifiedAccountsByNumber implements debug_getModifiedAccountsByNumber. Returns a list of accounts modified in the given block.
func (api *PrivateDebugAPIImpl) GetModifiedAccountsByNumber(ctx context.Context, startNumber rpc.BlockNumber, endNumber *rpc.BlockNumber) ([]common.Address, error) {
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return nil, err
}
@ -166,7 +166,7 @@ func (api *PrivateDebugAPIImpl) GetModifiedAccountsByNumber(ctx context.Context,
// GetModifiedAccountsByHash implements debug_getModifiedAccountsByHash. Returns a list of accounts modified in the given block.
func (api *PrivateDebugAPIImpl) GetModifiedAccountsByHash(ctx context.Context, startHash common.Hash, endHash *common.Hash) ([]common.Address, error) {
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return nil, err
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"math/big"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/turbo/adapter"
"github.com/ledgerwatch/turbo-geth/turbo/rpchelper"
@ -20,7 +21,7 @@ func (api *APIImpl) GetBalance(ctx context.Context, address common.Address, bloc
return nil, err
}
tx, err1 := api.db.Begin(ctx, nil, false)
tx, err1 := api.db.Begin(ctx, nil, ethdb.RO)
if err1 != nil {
return nil, fmt.Errorf("getBalance cannot open tx: %v", err1)
}
@ -44,7 +45,7 @@ func (api *APIImpl) GetTransactionCount(ctx context.Context, address common.Addr
return nil, err
}
nonce := hexutil.Uint64(0)
tx, err1 := api.db.Begin(ctx, nil, false)
tx, err1 := api.db.Begin(ctx, nil, ethdb.RO)
if err1 != nil {
return nil, fmt.Errorf("getTransactionCount cannot open tx: %v", err1)
}
@ -64,7 +65,7 @@ func (api *APIImpl) GetCode(ctx context.Context, address common.Address, blockNr
return nil, err
}
tx, err1 := api.db.Begin(ctx, nil, false)
tx, err1 := api.db.Begin(ctx, nil, ethdb.RO)
if err1 != nil {
return nil, fmt.Errorf("getCode cannot open tx: %v", err1)
}
@ -90,7 +91,7 @@ func (api *APIImpl) GetStorageAt(ctx context.Context, address common.Address, in
return hexutil.Encode(common.LeftPadBytes(empty[:], 32)), err
}
tx, err1 := api.db.Begin(ctx, nil, false)
tx, err1 := api.db.Begin(ctx, nil, ethdb.RO)
if err1 != nil {
return "", fmt.Errorf("getStorageAt cannot open tx: %v", err1)
}

View File

@ -7,13 +7,14 @@ import (
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/hexutil"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/rpc"
"github.com/ledgerwatch/turbo-geth/turbo/adapter/ethapi"
)
// GetBlockByNumber implements eth_getBlockByNumber. Returns information about a block given the block's number.
func (api *APIImpl) GetBlockByNumber(ctx context.Context, number rpc.BlockNumber, fullTx bool) (map[string]interface{}, error) {
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return nil, err
}
@ -51,7 +52,7 @@ func (api *APIImpl) GetBlockByNumber(ctx context.Context, number rpc.BlockNumber
// GetBlockByHash implements eth_getBlockByHash. Returns information about a block given the block's hash.
func (api *APIImpl) GetBlockByHash(ctx context.Context, hash common.Hash, fullTx bool) (map[string]interface{}, error) {
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return nil, err
}
@ -86,7 +87,7 @@ func (api *APIImpl) GetBlockByHash(ctx context.Context, hash common.Hash, fullTx
// GetBlockTransactionCountByNumber implements eth_getBlockTransactionCountByNumber. Returns the number of transactions in a block given the block's block number.
func (api *APIImpl) GetBlockTransactionCountByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*hexutil.Uint, error) {
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return nil, err
}
@ -110,7 +111,7 @@ func (api *APIImpl) GetBlockTransactionCountByNumber(ctx context.Context, blockN
// GetBlockTransactionCountByHash implements eth_getBlockTransactionCountByHash. Returns the number of transactions in a block given the block's block hash.
func (api *APIImpl) GetBlockTransactionCountByHash(ctx context.Context, blockHash common.Hash) (*hexutil.Uint, error) {
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return nil, err
}

View File

@ -12,6 +12,7 @@ import (
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/core/state"
"github.com/ledgerwatch/turbo-geth/core/vm"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/internal/ethapi"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/params"
@ -22,7 +23,7 @@ import (
// Call implements eth_call. Executes a new message call immediately without creating a transaction on the block chain.
func (api *APIImpl) Call(ctx context.Context, args ethapi.CallArgs, blockNrOrHash rpc.BlockNumberOrHash, overrides *map[common.Address]ethapi.Account) (hexutil.Bytes, error) {
tx, err1 := api.db.Begin(ctx, nil, false)
tx, err1 := api.db.Begin(ctx, nil, ethdb.RO)
if err1 != nil {
return nil, fmt.Errorf("call cannot open tx: %v", err1)
}
@ -49,7 +50,7 @@ func (api *APIImpl) EstimateGas(ctx context.Context, args ethapi.CallArgs) (hexu
}
func (api *APIImpl) DoEstimateGas(ctx context.Context, args ethapi.CallArgs, blockNrOrHash rpc.BlockNumberOrHash, gasCap *big.Int) (hexutil.Uint64, error) {
tx, err1 := api.db.Begin(ctx, nil, false)
tx, err1 := api.db.Begin(ctx, nil, ethdb.RO)
if err1 != nil {
return 0, fmt.Errorf("estimateGas cannot open tx: %v", err1)
}

View File

@ -20,7 +20,7 @@ import (
"github.com/ledgerwatch/turbo-geth/turbo/transactions"
)
func getReceipts(ctx context.Context, tx rawdb.DatabaseReader, number uint64, hash common.Hash) (types.Receipts, error) {
func getReceipts(ctx context.Context, tx ethdb.Database, number uint64, hash common.Hash) (types.Receipts, error) {
if cached := rawdb.ReadReceipts(tx, hash, number); cached != nil {
return cached, nil
}
@ -58,7 +58,7 @@ func getReceipts(ctx context.Context, tx rawdb.DatabaseReader, number uint64, ha
// GetLogsByHash non-standard RPC that returns all logs in a block
// TODO(tjayrush): Since this is non-standard we could rename it to GetLogsByBlockHash to be more consistent and avoid confusion
func (api *APIImpl) GetLogsByHash(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return nil, err
}
@ -84,7 +84,7 @@ func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([
var begin, end uint64
var logs []*types.Log //nolint:prealloc
tx, beginErr := api.dbReader.Begin(ctx, false)
tx, beginErr := api.dbReader.Begin(ctx, ethdb.RO)
if beginErr != nil {
return returnLogs(logs), beginErr
}
@ -117,7 +117,7 @@ func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([
blockNumbers := roaring.New()
blockNumbers.AddRange(begin, end+1) // [min,max)
topicsBitmap, err := getTopicsBitmap(tx.(ethdb.HasTx).Tx().Cursor(dbutils.LogTopicIndex), crit.Topics, uint32(begin), uint32(end))
topicsBitmap, err := getTopicsBitmap(tx, crit.Topics, uint32(begin), uint32(end))
if err != nil {
return nil, err
}
@ -129,10 +129,9 @@ func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([
}
}
logAddrIndex := tx.(ethdb.HasTx).Tx().Cursor(dbutils.LogAddressIndex)
var addrBitmap *roaring.Bitmap
for _, addr := range crit.Addresses {
m, err := bitmapdb.Get(logAddrIndex, addr[:], uint32(begin), uint32(end))
m, err := bitmapdb.Get(tx, dbutils.LogAddressIndex, addr[:], uint32(begin), uint32(end))
if err != nil {
return nil, err
}
@ -189,12 +188,12 @@ func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([
// {{}, {B}} matches any topic in first position AND B in second position
// {{A}, {B}} matches topic A in first position AND B in second position
// {{A, B}, {C, D}} matches topic (A OR B) in first position AND (C OR D) in second position
func getTopicsBitmap(c ethdb.Cursor, topics [][]common.Hash, from, to uint32) (*roaring.Bitmap, error) {
func getTopicsBitmap(c ethdb.Getter, topics [][]common.Hash, from, to uint32) (*roaring.Bitmap, error) {
var result *roaring.Bitmap
for _, sub := range topics {
var bitmapForORing *roaring.Bitmap
for _, topic := range sub {
m, err := bitmapdb.Get(c, topic[:], from, to)
m, err := bitmapdb.Get(c, dbutils.LogTopicIndex, topic[:], from, to)
if err != nil {
return nil, err
}
@ -218,7 +217,7 @@ func getTopicsBitmap(c ethdb.Cursor, topics [][]common.Hash, from, to uint32) (*
// GetTransactionReceipt implements eth_getTransactionReceipt. Returns the receipt of a transaction given the transaction's hash.
func (api *APIImpl) GetTransactionReceipt(ctx context.Context, hash common.Hash) (map[string]interface{}, error) {
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return nil, err
}

View File

@ -7,6 +7,7 @@ import (
"github.com/ledgerwatch/turbo-geth/common/hexutil"
"github.com/ledgerwatch/turbo-geth/eth"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
)
// BlockNumber implements eth_blockNumber. Returns the block number of most recent block.
@ -43,7 +44,7 @@ func (api *APIImpl) Syncing(_ context.Context) (interface{}, error) {
// ChainId implements eth_chainId. Returns the current ethereum chainId.
func (api *APIImpl) ChainId(ctx context.Context) (hexutil.Uint64, error) {
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return 0, err
}

View File

@ -7,12 +7,13 @@ import (
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/hexutil"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/rpc"
)
// GetTransactionByHash implements eth_getTransactionByHash. Returns information about a transaction given the transaction's hash.
func (api *APIImpl) GetTransactionByHash(ctx context.Context, hash common.Hash) (*RPCTransaction, error) {
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return nil, err
}
@ -28,7 +29,7 @@ func (api *APIImpl) GetTransactionByHash(ctx context.Context, hash common.Hash)
// GetTransactionByBlockHashAndIndex implements eth_getTransactionByBlockHashAndIndex. Returns information about a transaction given the block's hash and a transaction index.
func (api *APIImpl) GetTransactionByBlockHashAndIndex(ctx context.Context, blockHash common.Hash, txIndex hexutil.Uint64) (*RPCTransaction, error) {
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return nil, err
}
@ -53,7 +54,7 @@ func (api *APIImpl) GetTransactionByBlockHashAndIndex(ctx context.Context, block
// GetTransactionByBlockNumberAndIndex implements eth_getTransactionByBlockNumberAndIndex. Returns information about a transaction given a block number and transaction index.
func (api *APIImpl) GetTransactionByBlockNumberAndIndex(ctx context.Context, blockNr rpc.BlockNumber, txIndex hexutil.Uint) (*RPCTransaction, error) {
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return nil, err
}

View File

@ -8,6 +8,7 @@ import (
"github.com/ledgerwatch/turbo-geth/common/hexutil"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/rpc"
"github.com/ledgerwatch/turbo-geth/turbo/adapter/ethapi"
@ -15,7 +16,7 @@ import (
// GetUncleByBlockNumberAndIndex implements eth_getUncleByBlockNumberAndIndex. Returns information about an uncle given a block's number and the index of the uncle.
func (api *APIImpl) GetUncleByBlockNumberAndIndex(ctx context.Context, number rpc.BlockNumber, index hexutil.Uint) (map[string]interface{}, error) {
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return nil, err
}
@ -51,7 +52,7 @@ func (api *APIImpl) GetUncleByBlockNumberAndIndex(ctx context.Context, number rp
// GetUncleByBlockHashAndIndex implements eth_getUncleByBlockHashAndIndex. Returns information about an uncle given a block's hash and the index of the uncle.
func (api *APIImpl) GetUncleByBlockHashAndIndex(ctx context.Context, hash common.Hash, index hexutil.Uint) (map[string]interface{}, error) {
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return nil, err
}
@ -86,7 +87,7 @@ func (api *APIImpl) GetUncleByBlockHashAndIndex(ctx context.Context, hash common
func (api *APIImpl) GetUncleCountByBlockNumber(ctx context.Context, number rpc.BlockNumber) (*hexutil.Uint, error) {
n := hexutil.Uint(0)
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return &n, err
}
@ -110,7 +111,7 @@ func (api *APIImpl) GetUncleCountByBlockNumber(ctx context.Context, number rpc.B
// GetUncleCountByBlockHash implements eth_getUncleCountByBlockHash. Returns the number of uncles in the block, if any.
func (api *APIImpl) GetUncleCountByBlockHash(ctx context.Context, hash common.Hash) (*hexutil.Uint, error) {
n := hexutil.Uint(0)
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return &n, err
}

View File

@ -7,12 +7,13 @@ import (
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/rpc"
)
// GetHeaderByNumber implements tg_getHeaderByNumber. Returns a block's header given a block number ignoring the block's transaction and uncle list (may be faster).
func (api *TgImpl) GetHeaderByNumber(ctx context.Context, blockNumber rpc.BlockNumber) (*types.Header, error) {
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return nil, err
}
@ -28,7 +29,7 @@ func (api *TgImpl) GetHeaderByNumber(ctx context.Context, blockNumber rpc.BlockN
// GetHeaderByHash implements tg_getHeaderByHash. Returns a block's header given a block's hash.
func (api *TgImpl) GetHeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) {
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return nil, err
}

View File

@ -7,12 +7,13 @@ import (
"github.com/ledgerwatch/turbo-geth/consensus/ethash"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/rpc"
)
// BlockReward returns the block reward for this block
// func (api *TgImpl) BlockReward(ctx context.Context, blockNr rpc.BlockNumber) (Issuance, error) {
// tx, err := api.dbReader.Begin(ctx, false)
// tx, err := api.dbReader.Begin(ctx, ethdb.RO)
// if err != nil {
// return Issuance{}, err
// }
@ -23,7 +24,7 @@ import (
// UncleReward returns the uncle reward for this block
// func (api *TgImpl) UncleReward(ctx context.Context, blockNr rpc.BlockNumber) (Issuance, error) {
// tx, err := api.dbReader.Begin(ctx, false)
// tx, err := api.dbReader.Begin(ctx, ethdb.RO)
// if err != nil {
// return Issuance{}, err
// }
@ -34,7 +35,7 @@ import (
// Issuance implements tg_issuance. Returns the total issuance (block reward plus uncle reward) for the given block.
func (api *TgImpl) Issuance(ctx context.Context, blockNr rpc.BlockNumber) (Issuance, error) {
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return Issuance{}, err
}

View File

@ -7,11 +7,12 @@ import (
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/ethdb"
)
// GetLogsByHash implements tg_getLogsByHash. Returns an array of arrays of logs generated by the transactions in the block given by the block's hash.
func (api *TgImpl) GetLogsByHash(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return nil, err
}

View File

@ -5,6 +5,7 @@ import (
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/core/forkid"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/rpc"
"github.com/ledgerwatch/turbo-geth/turbo/rpchelper"
)
@ -23,7 +24,7 @@ func (api *TgImpl) Forks(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHas
return Forks{}, err
}
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return Forks{}, err
}

View File

@ -23,7 +23,7 @@ import (
// Transaction implements trace_transaction
// TODO(tjayrush): I think this should return an []interface{}, so we can return both Parity and Geth traces
func (api *TraceAPIImpl) Transaction(ctx context.Context, txHash common.Hash) (ParityTraces, error) {
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return nil, err
}
@ -43,7 +43,7 @@ func (api *TraceAPIImpl) Transaction(ctx context.Context, txHash common.Hash) (P
// TODO(tjayrush): only accepts a single one
// TODO(tjayrush): I think this should return an interface{}, so we can return both Parity and Geth traces
func (api *TraceAPIImpl) Get(ctx context.Context, txHash common.Hash, indicies []hexutil.Uint64) (*ParityTrace, error) {
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return nil, err
}
@ -256,7 +256,7 @@ func (api *TraceAPIImpl) Filter(ctx context.Context, req TraceFilterRequest) (Pa
}
traceType := "callTracer" // nolint: goconst
traces := ParityTraces{}
dbtx, err1 := api.db.Begin(ctx, nil, false)
dbtx, err1 := api.db.Begin(ctx, nil, ethdb.RO)
if err1 != nil {
return nil, fmt.Errorf("traceFilter cannot open tx: %v", err1)
}

View File

@ -15,7 +15,7 @@ import (
// TraceTransaction implements debug_traceTransaction. Returns Geth style transaction traces.
func (api *PrivateDebugAPIImpl) TraceTransaction(ctx context.Context, hash common.Hash, config *eth.TraceConfig) (interface{}, error) {
tx, err := api.dbReader.Begin(ctx, false)
tx, err := api.dbReader.Begin(ctx, ethdb.RO)
if err != nil {
return nil, err
}

View File

@ -108,11 +108,11 @@ func CompareAccountRange(tgURL, gethURL, tmpDataDir, gethDataDir string, blockNu
}
}
tgTx, err := resultsKV.Begin(context.Background(), nil, false)
tgTx, err := resultsKV.Begin(context.Background(), nil, ethdb.RO)
if err != nil {
log.Fatal(err)
}
gethTx, err := gethKV.Begin(context.Background(), nil, false)
gethTx, err := gethKV.Begin(context.Background(), nil, ethdb.RO)
if err != nil {
log.Fatal(err)
}

View File

@ -45,7 +45,7 @@ func CheckChangeSets(genesis *core.Genesis, blockNum uint64, chaindata string, h
if chaindata != historyfile {
historyDb = ethdb.MustOpen(historyfile)
}
historyTx, err1 := historyDb.KV().Begin(context.Background(), nil, false)
historyTx, err1 := historyDb.KV().Begin(context.Background(), nil, ethdb.RO)
if err1 != nil {
return err1
}
@ -92,12 +92,14 @@ func CheckChangeSets(genesis *core.Genesis, blockNum uint64, chaindata string, h
}
}
if writeReceipts {
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts)
if batch.BatchSize() >= batch.IdealBatchSize() {
log.Info("Committing receipts", "up to block", block.NumberU64(), "batch size", common.StorageSize(batch.BatchSize()))
if err := batch.CommitAndBegin(context.Background()); err != nil {
return err
}
if err := rawdb.WriteReceipts(batch, block.NumberU64(), receipts); err != nil {
return err
}
}
if batch.BatchSize() >= batch.IdealBatchSize() {
log.Info("Committing receipts", "up to block", block.NumberU64(), "batch size", common.StorageSize(batch.BatchSize()))
if err := batch.CommitAndBegin(context.Background()); err != nil {
return err
}
}

View File

@ -126,7 +126,7 @@ func dataDependencies(blockNum uint64) {
ethDb := ethdb.MustOpen("/Volumes/tb4/turbo-geth-10/geth/chaindata")
defer ethDb.Close()
ethTx, err1 := ethDb.KV().Begin(context.Background(), nil, false)
ethTx, err1 := ethDb.KV().Begin(context.Background(), nil, ethdb.RO)
check(err1)
defer ethTx.Rollback()

View File

@ -87,7 +87,7 @@ func accountsReadWrites(blockNum uint64) {
ethDb := ethdb.MustOpen("/Volumes/tb41/turbo-geth-10/geth/chaindata")
defer ethDb.Close()
ethTx, err1 := ethDb.KV().Begin(context.Background(), nil, false)
ethTx, err1 := ethDb.KV().Begin(context.Background(), nil, ethdb.RO)
check(err1)
defer ethTx.Rollback()
chainConfig := params.MainnetChainConfig

View File

@ -115,7 +115,7 @@ func storageReadWrites(blockNum uint64) {
ethDb := ethdb.MustOpen("/Volumes/tb41/turbo-geth-10/geth/chaindata")
defer ethDb.Close()
ethTx, err1 := ethDb.KV().Begin(context.Background(), nil, false)
ethTx, err1 := ethDb.KV().Begin(context.Background(), nil, ethdb.RO)
check(err1)
defer ethTx.Rollback()
chainConfig := params.MainnetChainConfig

View File

@ -162,7 +162,7 @@ func speculativeExecution(blockNum uint64) {
ethDb := ethdb.MustOpen("/Volumes/tb41/turbo-geth-10/geth/chaindata")
defer ethDb.Close()
ethTx, err1 := ethDb.KV().Begin(context.Background(), nil, false)
ethTx, err1 := ethDb.KV().Begin(context.Background(), nil, ethdb.RO)
check(err1)
defer ethTx.Rollback()
chainConfig := params.MainnetChainConfig

View File

@ -435,7 +435,7 @@ func NewGasLimitReporter(ctx context.Context, remoteDB ethdb.KV, localDB ethdb.K
var err error
var localTx ethdb.Tx
if localTx, err = localDB.Begin(ctx, nil, true); err != nil {
if localTx, err = localDB.Begin(ctx, nil, ethdb.RW); err != nil {
panic(err)
}
@ -450,7 +450,7 @@ func NewGasLimitReporter(ctx context.Context, remoteDB ethdb.KV, localDB ethdb.K
if err = localTx.Commit(ctx); err != nil {
panic(err)
}
if localTx, err = localDB.Begin(ctx, nil, true); err != nil {
if localTx, err = localDB.Begin(ctx, nil, ethdb.RW); err != nil {
panic(err)
}
@ -1113,7 +1113,7 @@ func makeCreators(blockNum uint64) {
ethDb := ethdb.MustOpen("/Volumes/tb41/turbo-geth/geth/chaindata")
//ethDb := ethdb.MustOpen("/Users/alexeyakhunov/Library/Ethereum/geth/chaindata")
defer ethDb.Close()
ethTx, err1 := ethDb.KV().Begin(context.Background(), nil, false)
ethTx, err1 := ethDb.KV().Begin(context.Background(), nil, ethdb.RO)
check(err1)
defer ethTx.Rollback()
f, err := os.OpenFile("/Volumes/tb41/turbo-geth/creators.csv", os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)

View File

@ -132,7 +132,7 @@ func makeTokens(blockNum uint64) {
ethDb := ethdb.MustOpen("/Volumes/tb41/turbo-geth/geth/chaindata")
//ethDb := ethdb.MustOpen("/Users/alexeyakhunov/Library/Ethereum/geth/chaindata")
defer ethDb.Close()
ethTx, err1 := ethDb.KV().Begin(context.Background(), nil, false)
ethTx, err1 := ethDb.KV().Begin(context.Background(), nil, ethdb.RO)
check(err1)
defer ethTx.Rollback()
chainConfig := params.MainnetChainConfig
@ -198,7 +198,7 @@ func makeTokenBalances() {
ethDb := ethdb.MustOpen("/Volumes/tb41/turbo-geth/geth/chaindata")
//ethDb := ethdb.MustOpen("/Users/alexeyakhunov/Library/Ethereum/geth/chaindata")
defer ethDb.Close()
ethTx, err1 := ethDb.KV().Begin(context.Background(), nil, false)
ethTx, err1 := ethDb.KV().Begin(context.Background(), nil, ethdb.RO)
check(err1)
defer ethTx.Rollback()
txCacher := core.NewTxSenderCacher(runtime.NumCPU())
@ -424,7 +424,7 @@ func makeTokenAllowances() {
ethDb := ethdb.MustOpen("/Volumes/tb41/turbo-geth/geth/chaindata")
//ethDb := ethdb.MustOpen("/Users/alexeyakhunov/Library/Ethereum/geth/chaindata")
defer ethDb.Close()
ethTx, err1 := ethDb.KV().Begin(context.Background(), nil, false)
ethTx, err1 := ethDb.KV().Begin(context.Background(), nil, ethdb.RO)
check(err1)
defer ethTx.Rollback()
txCacher := core.NewTxSenderCacher(runtime.NumCPU())

View File

@ -173,7 +173,7 @@ func transactionStats(blockNum uint64) {
//ethDb := ethdb.MustOpen("/Volumes/tb41/turbo-geth/geth/chaindata")
//ethDb := ethdb.MustOpen("/Users/alexeyakhunov/Library/Ethereum/geth/chaindata")
defer ethDb.Close()
ethTx, err1 := ethDb.KV().Begin(context.Background(), nil, false)
ethTx, err1 := ethDb.KV().Begin(context.Background(), nil, ethdb.RO)
check(err1)
defer ethTx.Rollback()
f, err := os.OpenFile("txs.csv", os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)

View File

@ -112,8 +112,9 @@ var (
HeaderHashSuffix = []byte("n") // headerPrefix + num (uint64 big endian) + headerHashSuffix -> hash
HeaderNumberPrefix = "H" // headerNumberPrefix + hash -> num (uint64 big endian)
BlockBodyPrefix = "b" // blockBodyPrefix + num (uint64 big endian) + hash -> block body
BlockReceiptsPrefix = "r" // blockReceiptsPrefix + num (uint64 big endian) + hash -> block receipts
BlockBodyPrefix = "b" // blockBodyPrefix + num (uint64 big endian) + hash -> block body
BlockReceiptsPrefix = "r" // blockReceiptsPrefix + num (uint64 big endian) + hash -> block receipts
Log = "log" // blockReceiptsPrefix + num (uint64 big endian) + hash -> block receipts
// Stores bitmap indices - in which block numbers saw logs of given 'address' or 'topic'
// [addr or topic] + [2 bytes inverted shard number] -> bitmap(blockN)
@ -238,6 +239,7 @@ var Buckets = []string{
SnapshotInfoBucket,
CallFromIndex,
CallToIndex,
Log,
}
// DeprecatedBuckets - list of buckets which can be programmatically deleted - for example after migration

View File

@ -57,9 +57,19 @@ func BlockBodyKey(number uint64, hash common.Hash) []byte {
return append(EncodeBlockNumber(number), hash.Bytes()...)
}
// blockReceiptsKey = blockReceiptsPrefix + num (uint64 big endian) + hash
func BlockReceiptsKey(number uint64, hash common.Hash) []byte {
return append(EncodeBlockNumber(number), hash.Bytes()...)
// ReceiptsKey = blockN (uint64 big endian)
func ReceiptsKey(blockNumber uint64) []byte {
newK := make([]byte, 8)
binary.BigEndian.PutUint64(newK, blockNumber)
return newK
}
// LogKey = blockN (uint64 big endian) + txId (uint32 big endian)
func LogKey(blockNumber uint64, txId uint32) []byte {
newK := make([]byte, 8+4)
binary.BigEndian.PutUint64(newK, blockNumber)
binary.BigEndian.PutUint32(newK[8:], txId)
return newK
}
// bloomBitsKey = bloomBitsPrefix + bit (uint16 big endian) + section (uint64 big endian) + hash

View File

@ -30,7 +30,7 @@ type Collector struct {
flushBuffer func([]byte, bool) error
dataProviders []dataProvider
allFlushed bool
cleanOnFailure bool
autoClean bool
}
// NewCollectorFromFiles creates collector from existing files (left over from previous unsuccessful loading)
@ -54,18 +54,18 @@ func NewCollectorFromFiles(tmpdir string) (*Collector, error) {
}
dataProviders[i] = &dataProvider
}
return &Collector{dataProviders: dataProviders, allFlushed: true, cleanOnFailure: false}, nil
return &Collector{dataProviders: dataProviders, allFlushed: true, autoClean: false}, nil
}
// NewCriticalCollector does not clean up temporary files if loading has failed
func NewCriticalCollector(tmpdir string, sortableBuffer Buffer) *Collector {
c := NewCollector(tmpdir, sortableBuffer)
c.cleanOnFailure = false
c.autoClean = false
return c
}
func NewCollector(tmpdir string, sortableBuffer Buffer) *Collector {
c := &Collector{cleanOnFailure: true}
c := &Collector{autoClean: true}
encoder := codec.NewEncoder(nil, &cbor)
c.flushBuffer = func(currentKey []byte, canStoreInRam bool) error {
@ -108,17 +108,9 @@ func (c *Collector) Collect(k, v []byte) error {
func (c *Collector) Load(logPrefix string, db ethdb.Database, toBucket string, loadFunc LoadFunc, args TransformArgs) (err error) {
defer func() {
if !c.cleanOnFailure {
// don't clean if error or panic happened
if err != nil {
return
}
if rec := recover(); rec != nil {
panic(rec)
}
if c.autoClean {
c.Close(logPrefix)
}
disposeProviders(logPrefix, c.dataProviders)
}()
if !c.allFlushed {
if err := c.flushBuffer(nil, true); err != nil {
@ -132,6 +124,10 @@ func (c *Collector) Load(logPrefix string, db ethdb.Database, toBucket string, l
return nil
}
func (c *Collector) Close(logPrefix string) {
disposeProviders(logPrefix, c.dataProviders)
}
func loadFilesIntoBucket(logPrefix string, db ethdb.Database, bucket string, providers []dataProvider, loadFunc LoadFunc, args TransformArgs) error {
decoder := codec.NewDecoder(nil, &cbor)
var m runtime.MemStats
@ -156,7 +152,7 @@ func loadFilesIntoBucket(logPrefix string, db ethdb.Database, bucket string, pro
useExternalTx = true
} else {
var err error
tx, err = db.Begin(context.Background(), true)
tx, err = db.Begin(context.Background(), ethdb.RW)
if err != nil {
return err
}

View File

@ -257,7 +257,9 @@ func makeChainForBench(db ethdb.Database, full bool, count uint64) {
if full || n == 0 {
block := types.NewBlockWithHeader(header)
rawdb.WriteBody(ctx, db, hash, n, block.Body())
rawdb.WriteReceipts(db, hash, n, nil)
if err := rawdb.WriteReceipts(db, n, nil); err != nil {
panic(err)
}
}
}
}

View File

@ -396,7 +396,7 @@ func (bc *BlockChain) SetHead(head uint64) error {
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
updateFn := func(db rawdb.DatabaseWriter, header *types.Header) (uint64, bool) {
updateFn := func(db ethdb.Database, header *types.Header) (uint64, bool) {
// Rewind the block chain, ensuring we don't end up with a stateless head block
if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() < currentBlock.NumberU64() {
newHeadBlock := bc.GetBlock(header.Hash(), header.Number.Uint64())
@ -436,7 +436,7 @@ func (bc *BlockChain) SetHead(head uint64) error {
}
// Rewind the header chain, deleting all block bodies until then
delFn := func(db rawdb.DatabaseDeleter, hash common.Hash, num uint64) {
delFn := func(db ethdb.Database, hash common.Hash, num uint64) {
// Ignore the error here since light client won't hit this path
frozen, _ := bc.db.Ancients()
if num+1 <= frozen {
@ -453,7 +453,9 @@ func (bc *BlockChain) SetHead(head uint64) error {
// The header, total difficulty and canonical hash will be
// removed in the hc.SetHead function.
rawdb.DeleteBody(db, hash, num)
rawdb.DeleteReceipts(db, hash, num)
if err := rawdb.DeleteReceipts(db, num); err != nil {
panic(err)
}
}
// Todo(rjl493456442) txlookup, bloombits, etc
}
@ -1026,7 +1028,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}
// Write all the data out into the database
rawdb.WriteBody(context.Background(), batch, block.Hash(), block.NumberU64(), block.Body())
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i])
if err := rawdb.WriteReceipts(batch, block.NumberU64(), receiptChain[i]); err != nil {
return 0, err
}
if bc.enableTxLookupIndex {
rawdb.WriteTxLookupEntries(batch, block)
}
@ -1179,7 +1183,9 @@ func (bc *BlockChain) writeBlockWithState(ctx context.Context, block *types.Bloc
}
}
if bc.enableReceipts && !bc.cacheConfig.DownloadOnly {
rawdb.WriteReceipts(bc.db, block.Hash(), block.NumberU64(), receipts)
if err := rawdb.WriteReceipts(bc.db, block.NumberU64(), receipts); err != nil {
return NonStatTy, err
}
}
// If the total difficulty is higher than our known, add it to the canonical chain
@ -1399,7 +1405,9 @@ func (bc *BlockChain) insertChain(ctx context.Context, chain types.Blocks, verif
// state, but if it's this special case here(skip reexecution) we will lose
// the empty receipt entry.
if len(block.Transactions()) == 0 {
rawdb.WriteReceipts(bc.db, block.Hash(), block.NumberU64(), nil)
if err1 := rawdb.WriteReceipts(bc.db, block.NumberU64(), nil); err1 != nil {
return i, err1
}
} else {
log.Error("Please file an issue, skip known block execution without receipt",
"hash", block.Hash(), "number", block.NumberU64())

View File

@ -379,7 +379,9 @@ func (g *Genesis) Commit(db ethdb.Database, history bool) (*types.Block, *state.
if err := rawdb.WriteBlock(context.Background(), db, block); err != nil {
return nil, nil, err
}
rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), nil)
if err := rawdb.WriteReceipts(db, block.NumberU64(), nil); err != nil {
return nil, nil, err
}
if err := rawdb.WriteCanonicalHash(db, block.Hash(), block.NumberU64()); err != nil {
return nil, nil, err
}

View File

@ -491,11 +491,11 @@ type (
// before head header is updated. The method will return the actual block it
// updated the head to (missing state) and a flag if setHead should continue
// rewinding till that forcefully (exceeded ancient limits)
UpdateHeadBlocksCallback func(rawdb.DatabaseWriter, *types.Header) (uint64, bool)
UpdateHeadBlocksCallback func(ethdb.Database, *types.Header) (uint64, bool)
// DeleteBlockContentCallback is a callback function that is called by SetHead
// before each header is deleted.
DeleteBlockContentCallback func(rawdb.DatabaseDeleter, common.Hash, uint64)
DeleteBlockContentCallback func(ethdb.Database, common.Hash, uint64)
)
// SetHead rewinds the local chain to a new head. Everything above the new head

View File

@ -417,46 +417,43 @@ func DeleteTd(db DatabaseDeleter, hash common.Hash, number uint64) error {
// HasReceipts verifies the existence of all the transaction receipts belonging
// to a block.
func HasReceipts(db DatabaseReader, hash common.Hash, number uint64) bool {
if has, err := db.Has(dbutils.BlockReceiptsPrefix, dbutils.BlockReceiptsKey(number, hash)); !has || err != nil {
if has, err := db.Has(dbutils.BlockReceiptsPrefix, dbutils.ReceiptsKey(number)); !has || err != nil {
return false
}
return true
}
// ReadReceiptsRLP retrieves all the transaction receipts belonging to a block in RLP encoding.
func ReadReceiptsRLP(db DatabaseReader, hash common.Hash, number uint64) rlp.RawValue {
data := []byte{}
//data, _ := db.Ancient(freezerReceiptTable, number)
if len(data) == 0 {
data, _ = db.Get(dbutils.BlockReceiptsPrefix, dbutils.BlockReceiptsKey(number, hash))
// In the background freezer is moving data from leveldb to flatten files.
// So during the first check for ancient db, the data is not yet in there,
// but when we reach into leveldb, the data was already moved. That would
// result in a not found error.
if len(data) == 0 {
//data, _ = db.Ancient(freezerReceiptTable, number)
}
}
return nil // Can't find the data anywhere.
}
// ReadRawReceipts retrieves all the transaction receipts belonging to a block.
// The receipt metadata fields are not guaranteed to be populated, so they
// should not be used. Use ReadReceipts instead if the metadata is needed.
func ReadRawReceipts(db DatabaseReader, hash common.Hash, number uint64) types.Receipts {
func ReadRawReceipts(db ethdb.Database, hash common.Hash, number uint64) types.Receipts {
// Retrieve the flattened receipt slice
data, err := db.Get(dbutils.BlockReceiptsPrefix, dbutils.BlockReceiptsKey(number, hash))
data, err := db.Get(dbutils.BlockReceiptsPrefix, dbutils.ReceiptsKey(number))
if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) {
log.Error("ReadRawReceipts failed", "err", err)
}
if len(data) == 0 {
return nil
}
receipts := types.Receipts{}
if err := cbor.Unmarshal(&receipts, data); err != nil {
var receipts types.Receipts
if err := cbor.Unmarshal(&receipts, bytes.NewReader(data)); err != nil {
log.Error("receipt unmarshal failed", "hash", hash, "err", err)
return nil
}
if err := db.Walk(dbutils.Log, dbutils.LogKey(number, 0), 8*8, func(k, v []byte) (bool, error) {
var logs types.Logs
if err := cbor.Unmarshal(&logs, bytes.NewReader(v)); err != nil {
return false, fmt.Errorf("receipt unmarshal failed: %x, %w", hash, err)
}
receipts[binary.BigEndian.Uint32(k[8:])].Logs = logs
return true, nil
}); err != nil {
log.Error("logs fetching failed", "hash", hash, "err", err)
return nil
}
return receipts
}
@ -467,7 +464,7 @@ func ReadRawReceipts(db DatabaseReader, hash common.Hash, number uint64) types.R
// The current implementation populates these metadata fields by reading the receipts'
// corresponding block body, so if the block body is not found it will return nil even
// if the receipt itself is stored.
func ReadReceipts(db DatabaseReader, hash common.Hash, number uint64) types.Receipts {
func ReadReceipts(db ethdb.Database, hash common.Hash, number uint64) types.Receipts {
// We're deriving many fields from the block body, retrieve beside the receipt
receipts := ReadRawReceipts(db, hash, number)
if receipts == nil {
@ -487,24 +484,104 @@ func ReadReceipts(db DatabaseReader, hash common.Hash, number uint64) types.Rece
}
// WriteReceipts stores all the transaction receipts belonging to a block.
func WriteReceipts(db DatabaseWriter, hash common.Hash, number uint64, receipts types.Receipts) {
newV := make([]byte, 0, 1024)
err := cbor.Marshal(&newV, receipts)
if err != nil {
log.Crit("Failed to encode block receipts", "err", err)
func WriteReceipts(tx DatabaseWriter, number uint64, receipts types.Receipts) error {
buf := bytes.NewBuffer(make([]byte, 0, 1024))
for txId, r := range receipts {
if len(r.Logs) == 0 {
continue
}
buf.Reset()
err := cbor.Marshal(buf, r.Logs)
if err != nil {
return fmt.Errorf("encode block logs for block %d: %v", number, err)
}
if err = tx.Put(dbutils.Log, dbutils.LogKey(number, uint32(txId)), buf.Bytes()); err != nil {
return fmt.Errorf("writing logs for block %d: %v", number, err)
}
}
// Store the flattened receipt slice
if err := db.Put(dbutils.BlockReceiptsPrefix, dbutils.BlockReceiptsKey(number, hash), newV); err != nil {
log.Crit("Failed to store block receipts", "err", err)
buf.Reset()
err := cbor.Marshal(buf, receipts)
if err != nil {
return fmt.Errorf("encode block receipts for block %d: %v", number, err)
}
if err = tx.Put(dbutils.BlockReceiptsPrefix, dbutils.ReceiptsKey(number), buf.Bytes()); err != nil {
return fmt.Errorf("writing receipts for block %d: %v", number, err)
}
return nil
}
// WriteReceipts stores all the transaction receipts belonging to a block.
func AppendReceipts(tx ethdb.DbWithPendingMutations, blockNumber uint64, receipts types.Receipts) error {
buf := bytes.NewBuffer(make([]byte, 0, 1024))
for txId, r := range receipts {
if len(r.Logs) == 0 {
continue
}
buf.Reset()
err := cbor.Marshal(buf, r.Logs)
if err != nil {
return fmt.Errorf("encode block receipts for block %d: %v", blockNumber, err)
}
if err = tx.Append(dbutils.Log, dbutils.LogKey(blockNumber, uint32(txId)), buf.Bytes()); err != nil {
return fmt.Errorf("writing receipts for block %d: %v", blockNumber, err)
}
}
buf.Reset()
err := cbor.Marshal(buf, receipts)
if err != nil {
return fmt.Errorf("encode block receipts for block %d: %v", blockNumber, err)
}
if err = tx.Append(dbutils.BlockReceiptsPrefix, dbutils.ReceiptsKey(blockNumber), buf.Bytes()); err != nil {
return fmt.Errorf("writing receipts for block %d: %v", blockNumber, err)
}
return nil
}
// DeleteReceipts removes all receipt data associated with a block hash.
func DeleteReceipts(db DatabaseDeleter, hash common.Hash, number uint64) {
if err := db.Delete(dbutils.BlockReceiptsPrefix, dbutils.BlockReceiptsKey(number, hash)); err != nil {
log.Crit("Failed to delete block receipts", "err", err)
func DeleteReceipts(db ethdb.Database, number uint64) error {
if err := db.Delete(dbutils.BlockReceiptsPrefix, dbutils.ReceiptsKey(number)); err != nil {
return fmt.Errorf("receipts delete failed: %d, %w", number, err)
}
if err := db.Walk(dbutils.Log, dbutils.LogKey(number, 0), 8*8, func(k, v []byte) (bool, error) {
if err := db.Delete(dbutils.Log, k); err != nil {
return false, err
}
return true, nil
}); err != nil {
return fmt.Errorf("logs delete failed: %d, %w", number, err)
}
return nil
}
// DeleteNewerReceipts removes all receipt for given block number or newer
func DeleteNewerReceipts(db ethdb.Database, number uint64) error {
if err := db.Walk(dbutils.BlockReceiptsPrefix, dbutils.ReceiptsKey(number), 0, func(k, v []byte) (bool, error) {
if err := db.Delete(dbutils.BlockReceiptsPrefix, k); err != nil {
return false, err
}
return true, nil
}); err != nil {
return fmt.Errorf("delete newer receipts failed: %d, %w", number, err)
}
if err := db.Walk(dbutils.Log, dbutils.LogKey(number, 0), 0, func(k, v []byte) (bool, error) {
if err := db.Delete(dbutils.Log, k); err != nil {
return false, err
}
return true, nil
}); err != nil {
return fmt.Errorf("delete newer logs failed: %d, %w", number, err)
}
return nil
}
// ReadBlock retrieves an entire block corresponding to the hash, assembling it
@ -566,8 +643,10 @@ func WriteAncientBlock(db ethdb.AncientWriter, block *types.Block, receipts type
*/
// DeleteBlock removes all block data associated with a hash.
func DeleteBlock(db DatabaseDeleter, hash common.Hash, number uint64) error {
DeleteReceipts(db, hash, number)
func DeleteBlock(db ethdb.Database, hash common.Hash, number uint64) error {
if err := DeleteReceipts(db, number); err != nil {
return err
}
DeleteHeader(db, hash, number)
DeleteBody(db, hash, number)
if err := DeleteTd(db, hash, number); err != nil {
@ -578,8 +657,10 @@ func DeleteBlock(db DatabaseDeleter, hash common.Hash, number uint64) error {
// DeleteBlockWithoutNumber removes all block data associated with a hash, except
// the hash to number mapping.
func DeleteBlockWithoutNumber(db DatabaseDeleter, hash common.Hash, number uint64) error {
DeleteReceipts(db, hash, number)
func DeleteBlockWithoutNumber(db ethdb.Database, hash common.Hash, number uint64) error {
if err := DeleteReceipts(db, number); err != nil {
return err
}
deleteHeaderWithoutNumber(db, hash, number)
DeleteBody(db, hash, number)
if err := DeleteTd(db, hash, number); err != nil {

View File

@ -130,7 +130,7 @@ func TestBlockStorage(t *testing.T) {
// Write and verify the block in the database
err := WriteBlock(context.Background(), db, block)
if err != nil {
panic(err)
t.Fatalf("Could not write block: %v", err)
}
if entry := ReadBlock(db, block.Hash(), block.NumberU64()); entry == nil {
t.Fatalf("Stored block not found")
@ -149,7 +149,7 @@ func TestBlockStorage(t *testing.T) {
}
// Delete the block and verify the execution
if err := DeleteBlock(db, block.Hash(), block.NumberU64()); err != nil {
panic(err)
t.Fatalf("Could not delete block: %v", err)
}
if entry := ReadBlock(db, block.Hash(), block.NumberU64()); entry != nil {
t.Fatalf("Deleted block returned: %v", entry)
@ -208,7 +208,7 @@ func TestTdStorage(t *testing.T) {
hash, td := common.Hash{}, big.NewInt(314)
entry, err := ReadTd(db, hash, 0)
if err != nil {
panic(err)
t.Fatalf("ReadTd failed: %v", err)
}
if entry != nil {
t.Fatalf("Non existent TD returned: %v", entry)
@ -216,11 +216,11 @@ func TestTdStorage(t *testing.T) {
// Write and verify the TD in the database
err = WriteTd(db, hash, 0, td)
if err != nil {
panic(err)
t.Fatalf("WriteTd failed: %v", err)
}
entry, err = ReadTd(db, hash, 0)
if err != nil {
panic(err)
t.Fatalf("ReadTd failed: %v", err)
}
if entry == nil {
t.Fatalf("Stored TD not found")
@ -230,11 +230,11 @@ func TestTdStorage(t *testing.T) {
// Delete the TD and verify the execution
err = DeleteTd(db, hash, 0)
if err != nil {
panic(err)
t.Fatalf("DeleteTd failed: %v", err)
}
entry, err = ReadTd(db, hash, 0)
if err != nil {
panic(err)
t.Fatalf("ReadTd failed: %v", err)
}
if entry != nil {
t.Fatalf("Deleted TD returned: %v", entry)
@ -250,7 +250,7 @@ func TestCanonicalMappingStorage(t *testing.T) {
hash, number := common.Hash{0: 0xff}, uint64(314)
entry, err := ReadCanonicalHash(db, number)
if err != nil {
panic(err)
t.Fatalf("ReadCanonicalHash failed: %v", err)
}
if entry != (common.Hash{}) {
t.Fatalf("Non existent canonical mapping returned: %v", entry)
@ -258,11 +258,11 @@ func TestCanonicalMappingStorage(t *testing.T) {
// Write and verify the TD in the database
err = WriteCanonicalHash(db, hash, number)
if err != nil {
panic(err)
t.Fatalf("WriteCanoncalHash failed: %v", err)
}
entry, err = ReadCanonicalHash(db, number)
if err != nil {
panic(err)
t.Fatalf("ReadCanonicalHash failed: %v", err)
}
if entry == (common.Hash{}) {
t.Fatalf("Stored canonical mapping not found")
@ -272,11 +272,11 @@ func TestCanonicalMappingStorage(t *testing.T) {
// Delete the TD and verify the execution
err = DeleteCanonicalHash(db, number)
if err != nil {
panic(err)
t.Fatalf("DeleteCanonicalHash failed: %v", err)
}
entry, err = ReadCanonicalHash(db, number)
if err != nil {
panic(err)
t.Error(err)
}
if entry != (common.Hash{}) {
t.Fatalf("Deleted canonical mapping returned: %v", entry)
@ -369,7 +369,9 @@ func TestBlockReceiptStorage(t *testing.T) {
WriteBody(ctx, db, hash, 0, body)
// Insert the receipt slice into the database and check presence
WriteReceipts(db, hash, 0, receipts)
if err := WriteReceipts(db, 0, receipts); err != nil {
t.Fatalf("WriteReceipts failed: %v", err)
}
if rs := ReadReceipts(db, hash, 0); len(rs) == 0 {
t.Fatalf("no receipts returned")
} else {
@ -389,7 +391,9 @@ func TestBlockReceiptStorage(t *testing.T) {
// Sanity check that body alone without the receipt is a full purge
WriteBody(ctx, db, hash, 0, body)
DeleteReceipts(db, hash, 0)
if err := DeleteReceipts(db, 0); err != nil {
t.Fatalf("DeleteReceipts failed: %v", err)
}
if rs := ReadReceipts(db, hash, 0); len(rs) != 0 {
t.Fatalf("deleted receipts returned: %v", rs)
}

View File

@ -22,6 +22,7 @@ import (
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
)
@ -91,7 +92,7 @@ func ReadTransaction(db DatabaseReader, hash common.Hash) (*types.Transaction, c
// ReadReceipt retrieves a specific transaction receipt from the database, along with
// its added positional metadata.
func ReadReceipt(db DatabaseReader, hash common.Hash) (*types.Receipt, common.Hash, uint64, uint64) {
func ReadReceipt(db ethdb.Database, hash common.Hash) (*types.Receipt, common.Hash, uint64, uint64) {
// Retrieve the context of the receipt based on the transaction hash
blockNumber := ReadTxLookupEntry(db, hash)
if blockNumber == nil {

View File

@ -116,7 +116,7 @@ func TestMutationCommitThinHistory(t *testing.T) {
t.Fatal(commitErr)
}
tx, err1 := db.KV().Begin(context.Background(), nil, false)
tx, err1 := db.KV().Begin(context.Background(), nil, ethdb.RO)
if err1 != nil {
t.Fatalf("create tx: %v", err1)
}
@ -547,7 +547,7 @@ func TestWalkAsOfStateHashed(t *testing.T) {
//walk and collect walkAsOf result
var err error
var startKey [72]byte
tx, err1 := db.KV().Begin(context.Background(), nil, false)
tx, err1 := db.KV().Begin(context.Background(), nil, ethdb.RO)
if err1 != nil {
t.Fatalf("create tx: %v", err1)
}
@ -710,7 +710,7 @@ func TestWalkAsOfStatePlain(t *testing.T) {
//walk and collect walkAsOf result
var err error
var startKey [60]byte
tx, err1 := db.KV().Begin(context.Background(), nil, false)
tx, err1 := db.KV().Begin(context.Background(), nil, ethdb.RO)
if err1 != nil {
t.Fatalf("create tx: %v", err1)
}
@ -893,7 +893,7 @@ func TestWalkAsOfUsingFixedBytesStatePlain(t *testing.T) {
var err error
startKey := make([]byte, 60)
copy(startKey[:common.AddressLength], addr1.Bytes())
tx, err1 := db.KV().Begin(context.Background(), nil, false)
tx, err1 := db.KV().Begin(context.Background(), nil, ethdb.RO)
if err1 != nil {
t.Fatalf("create tx: %v", err1)
}
@ -1074,7 +1074,7 @@ func TestWalkAsOfAccountHashed(t *testing.T) {
}
var startKey [32]byte
tx, err1 := db.KV().Begin(context.Background(), nil, false)
tx, err1 := db.KV().Begin(context.Background(), nil, ethdb.RO)
if err1 != nil {
t.Fatalf("create tx: %v", err1)
}
@ -1232,7 +1232,7 @@ func TestWalkAsOfAccountPlain(t *testing.T) {
}, true, true)
var startKey [20]byte
tx, err1 := db.KV().Begin(context.Background(), nil, false)
tx, err1 := db.KV().Begin(context.Background(), nil, ethdb.RO)
if err1 != nil {
t.Fatalf("create tx: %v", err1)
}
@ -1419,7 +1419,7 @@ func TestWalkAsOfStateHashed_WithoutIndex(t *testing.T) {
//walk and collect walkAsOf result
var startKey [72]byte
tx, err1 := db.KV().Begin(context.Background(), nil, false)
tx, err1 := db.KV().Begin(context.Background(), nil, ethdb.RO)
if err1 != nil {
t.Fatalf("create tx: %v", err1)
}
@ -1578,7 +1578,7 @@ func TestWalkAsOfStatePlain_WithoutIndex(t *testing.T) {
}
var startKey [60]byte
tx, err1 := db.KV().Begin(context.Background(), nil, false)
tx, err1 := db.KV().Begin(context.Background(), nil, ethdb.RO)
if err1 != nil {
t.Fatalf("create tx: %v", err1)
}
@ -1738,7 +1738,7 @@ func TestWalkAsOfAccountHashed_WithoutIndex(t *testing.T) {
}
var startKey [32]byte
tx, err1 := db.KV().Begin(context.Background(), nil, false)
tx, err1 := db.KV().Begin(context.Background(), nil, ethdb.RO)
if err1 != nil {
t.Fatalf("create tx: %v", err1)
}
@ -1906,7 +1906,7 @@ func TestWalkAsOfAccountPlain_WithoutIndex(t *testing.T) {
}
var startKey [32]byte
tx, err1 := db.KV().Begin(context.Background(), nil, false)
tx, err1 := db.KV().Begin(context.Background(), nil, ethdb.RO)
if err1 != nil {
t.Fatalf("create tx: %v", err1)
}
@ -2101,7 +2101,7 @@ func TestWalkAsOfAccountPlain_WithChunks(t *testing.T) {
},
}, true, true)
tx, err1 := db.KV().Begin(context.Background(), nil, false)
tx, err1 := db.KV().Begin(context.Background(), nil, ethdb.RO)
if err1 != nil {
t.Fatalf("create tx: %v", err1)
}
@ -2243,7 +2243,7 @@ func TestWalkAsOfStoragePlain_WithChunks(t *testing.T) {
},
}, true, true)
tx, err1 := db.KV().Begin(context.Background(), nil, false)
tx, err1 := db.KV().Begin(context.Background(), nil, ethdb.RO)
if err1 != nil {
t.Fatalf("create tx: %v", err1)
}

View File

@ -69,7 +69,7 @@ func (s *StateSuite) TestDump(c *checker.C) {
c.Check(err, checker.IsNil)
// check that dump contains the state objects that are in trie
tx, err1 := s.kv.Begin(context.Background(), nil, false)
tx, err1 := s.kv.Begin(context.Background(), nil, ethdb.RO)
if err1 != nil {
c.Fatalf("create tx: %v", err1)
}
@ -351,7 +351,7 @@ func TestDump(t *testing.T) {
}
// check that dump contains the state objects that are in trie
tx, err1 := db.KV().Begin(context.Background(), nil, false)
tx, err1 := db.KV().Begin(context.Background(), nil, ethdb.RO)
if err1 != nil {
t.Fatalf("create tx: %v", err1)
}

View File

@ -55,6 +55,8 @@ type Log struct {
Removed bool `json:"removed" codec:"-"`
}
type Logs []*Log
type logMarshaling struct {
Data hexutil.Bytes
BlockNumber hexutil.Uint64

View File

@ -31,7 +31,7 @@ import (
)
// go:generate gencodec -type Receipt -field-override receiptMarshaling -out gen_receipt_json.go
//go:generate codecgen -o receipt_codecgen_gen.go -r "Receipts|Receipt|Log" -rt "codec" -nx=true -d 2 receipt.go log.go
//go:generate codecgen -o receipt_codecgen_gen.go -r "^Receipts$|^Receipt$|^Logs$|^Log$" -st "codec" -j=false -nx=true -ta=true -oe=false -d 2 receipt.go log.go
var (
receiptStatusFailedRLP = []byte{}
@ -54,7 +54,7 @@ type Receipt struct {
Status uint64 `json:"status" codec:"2"`
CumulativeGasUsed uint64 `json:"cumulativeGasUsed" gencodec:"required" codec:"3"`
Bloom Bloom `json:"logsBloom" gencodec:"required" codec:"-"`
Logs []*Log `json:"logs" gencodec:"required" codec:"4"`
Logs Logs `json:"logs" gencodec:"required" codec:"-"`
// Implementation fields: These fields are added by geth when processing a transaction.
// They are stored in the chain database.

File diff suppressed because it is too large Load Diff

View File

@ -159,7 +159,7 @@ func (b *EthAPIBackend) StateAndHeaderByNumber(ctx context.Context, blockNr rpc.
}
//WARNING - this will leak transaction
fmt.Printf("LEAK!\n")
tx, err1 := b.eth.chainKV.Begin(context.Background(), nil, false)
tx, err1 := b.eth.chainKV.Begin(context.Background(), nil, ethdb.RO)
if err1 != nil {
return nil, nil, err1
}
@ -186,7 +186,7 @@ func (b *EthAPIBackend) StateAndHeaderByNumberOrHash(ctx context.Context, blockN
}
//WARNING - this will leak transaction
fmt.Printf("LEAK!\n")
tx, err1 := b.eth.chainKV.Begin(context.Background(), nil, false)
tx, err1 := b.eth.chainKV.Begin(context.Background(), nil, ethdb.RO)
if err1 != nil {
return nil, nil, err1
}
@ -215,7 +215,7 @@ func (b *EthAPIBackend) GetReceipts(ctx context.Context, hash common.Hash) (type
func (b *EthAPIBackend) getReceiptsByReApplyingTransactions(block *types.Block, number uint64) (types.Receipts, error) {
//WARNING - this will leak transaction
fmt.Printf("LEAK!\n")
tx, err1 := b.eth.chainKV.Begin(context.Background(), nil, false)
tx, err1 := b.eth.chainKV.Begin(context.Background(), nil, ethdb.RO)
if err1 != nil {
return nil, err1
}

View File

@ -539,7 +539,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, blockNumb
// but call .Begin() after hearer/body download stages
var tx ethdb.DbWithPendingMutations
if canRunCycleInOneTransaction {
tx = ethdb.NewTxDbWithoutTransaction(d.stateDB, true)
tx = ethdb.NewTxDbWithoutTransaction(d.stateDB, ethdb.RW)
defer tx.Rollback()
writeDB = tx
} else {
@ -579,7 +579,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, blockNumb
var errTx error
log.Debug("Begin tx")
tx, errTx = tx.Begin(context.Background(), true)
tx, errTx = tx.Begin(context.Background(), ethdb.RW)
return errTx
})
d.stagedSyncState.OnBeforeUnwind(func(id stages.SyncStage) error {
@ -594,7 +594,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, blockNumb
}
var errTx error
log.Debug("Begin tx")
tx, errTx = tx.Begin(context.Background(), true)
tx, errTx = tx.Begin(context.Background(), ethdb.RW)
return errTx
})
d.stagedSyncState.BeforeStageUnwind(stages.Bodies, func() error {

View File

@ -20,6 +20,7 @@ import (
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/ethdb"
)
type HeaderChain interface {
@ -44,13 +45,13 @@ type HeaderChain interface {
// sync commands from an existing local database.
type FakePeer struct {
id string
db rawdb.DatabaseReader
db ethdb.Database
hc HeaderChain
dl *Downloader
}
// NewFakePeer creates a new mock downloader peer with the given data sources.
func NewFakePeer(id string, db rawdb.DatabaseReader, hc HeaderChain, dl *Downloader) *FakePeer {
func NewFakePeer(id string, db ethdb.Database, hc HeaderChain, dl *Downloader) *FakePeer {
return &FakePeer{id: id, db: db, hc: hc, dl: dl}
}

View File

@ -92,7 +92,9 @@ func BenchmarkFilters(b *testing.B) {
panic(err)
}
rawdb.WriteHeadBlockHash(db, block.Hash())
rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), receipts[i])
if err := rawdb.WriteReceipts(db, block.NumberU64(), receipts[i]); err != nil {
panic(err)
}
}
b.ResetTimer()
@ -177,7 +179,9 @@ func TestFilters(t *testing.T) {
panic(err)
}
rawdb.WriteHeadBlockHash(db, block.Hash())
rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), receipts[i])
if err := rawdb.WriteReceipts(db, block.NumberU64(), receipts[i]); err != nil {
panic(err)
}
}
filter := NewRangeFilter(backend, 0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}})

View File

@ -41,7 +41,7 @@ func SpawnCallTraces(s *StageState, db ethdb.Database, chainConfig *params.Chain
useExternalTx = true
} else {
var err error
tx, err = db.Begin(context.Background(), true)
tx, err = db.Begin(context.Background(), ethdb.RW)
if err != nil {
return err
}
@ -327,7 +327,7 @@ func UnwindCallTraces(u *UnwindState, s *StageState, db ethdb.Database, chainCon
useExternalTx = true
} else {
var err error
tx, err = db.Begin(context.Background(), true)
tx, err = db.Begin(context.Background(), ethdb.RW)
if err != nil {
return err
}
@ -336,7 +336,7 @@ func UnwindCallTraces(u *UnwindState, s *StageState, db ethdb.Database, chainCon
logPrefix := s.state.LogPrefix()
if err := unwindCallTraces(logPrefix, tx, s.BlockNumber, u.UnwindPoint, chainConfig, chainContext, quitCh); err != nil {
return err
return fmt.Errorf("unwindCallTraces fail: %w", err)
}
if err := u.Done(tx); err != nil {
@ -383,7 +383,7 @@ func unwindCallTraces(logPrefix string, db rawdb.DatabaseReader, from, to uint64
stateWriter = state.NewCacheStateWriter()
if _, err = core.ExecuteBlockEphemerally(chainConfig, vmConfig, chainContext, engine, block, stateReader, stateWriter); err != nil {
return err
return fmt.Errorf("exec block: %w", err)
}
}
for addr := range tracer.froms {

View File

@ -7,14 +7,11 @@ import (
"runtime"
"time"
"github.com/ledgerwatch/turbo-geth/ethdb/cbor"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/core"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/core/state"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/core/types/accounts"
"github.com/ledgerwatch/turbo-geth/core/vm"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
@ -69,7 +66,7 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
useExternalTx = true
} else {
var err error
tx, err = stateDB.Begin(context.Background(), true)
tx, err = stateDB.Begin(context.Background(), ethdb.RW)
if err != nil {
return err
}
@ -128,7 +125,7 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
}
if params.WriteReceipts {
if err = appendReceipts(tx, receipts, block.NumberU64(), block.Hash()); err != nil {
if err = rawdb.AppendReceipts(tx, block.NumberU64(), receipts); err != nil {
return err
}
}
@ -193,19 +190,6 @@ func logProgress(logPrefix string, prev, now uint64, batch ethdb.DbWithPendingMu
return now
}
func appendReceipts(tx ethdb.DbWithPendingMutations, receipts types.Receipts, blockNumber uint64, blockHash common.Hash) error {
newV := make([]byte, 0, 1024)
err := cbor.Marshal(&newV, receipts)
if err != nil {
return fmt.Errorf("encode block receipts for block %d: %v", blockNumber, err)
}
// Store the flattened receipt slice
if err = tx.Append(dbutils.BlockReceiptsPrefix, dbutils.BlockReceiptsKey(blockNumber, blockHash), newV); err != nil {
return fmt.Errorf("writing receipts for block %d: %v", blockNumber, err)
}
return nil
}
func UnwindExecutionStage(u *UnwindState, s *StageState, stateDB ethdb.Database, writeReceipts bool) error {
if u.UnwindPoint >= s.BlockNumber {
s.Done()
@ -276,12 +260,7 @@ func UnwindExecutionStage(u *UnwindState, s *StageState, stateDB ethdb.Database,
return fmt.Errorf("%s: walking storage changesets: %v", logPrefix, err)
}
if writeReceipts {
if err := stateDB.Walk(dbutils.BlockReceiptsPrefix, dbutils.EncodeBlockNumber(u.UnwindPoint+1), 0, func(k, v []byte) (bool, error) {
if err := batch.Delete(dbutils.BlockReceiptsPrefix, common.CopyBytes(k)); err != nil {
return false, fmt.Errorf("%s: delete receipts: %v", logPrefix, err)
}
return true, nil
}); err != nil {
if err := rawdb.DeleteNewerReceipts(stateDB, u.UnwindPoint+1); err != nil {
return fmt.Errorf("%s: walking receipts: %v", logPrefix, err)
}
}

View File

@ -14,13 +14,13 @@ import (
func TestUnwindExecutionStagePlainStatic(t *testing.T) {
db1 := ethdb.NewMemDatabase()
defer db1.Close()
tx1, err := db1.Begin(context.Background(), true)
tx1, err := db1.Begin(context.Background(), ethdb.RW)
require.NoError(t, err)
defer tx1.Rollback()
db2 := ethdb.NewMemDatabase()
defer db2.Close()
tx2, err := db2.Begin(context.Background(), true)
tx2, err := db2.Begin(context.Background(), ethdb.RW)
require.NoError(t, err)
defer tx2.Rollback()
@ -53,13 +53,13 @@ func TestUnwindExecutionStagePlainStatic(t *testing.T) {
func TestUnwindExecutionStagePlainWithIncarnationChanges(t *testing.T) {
db1 := ethdb.NewMemDatabase()
defer db1.Close()
tx1, err := db1.Begin(context.Background(), true)
tx1, err := db1.Begin(context.Background(), ethdb.RW)
require.NoError(t, err)
defer tx1.Rollback()
db2 := ethdb.NewMemDatabase()
defer db2.Close()
tx2, err := db2.Begin(context.Background(), true)
tx2, err := db2.Begin(context.Background(), ethdb.RW)
require.NoError(t, err)
defer tx2.Rollback()
@ -94,13 +94,13 @@ func TestUnwindExecutionStagePlainWithCodeChanges(t *testing.T) {
t.Skip("not supported yet, to be restored")
db1 := ethdb.NewMemDatabase()
defer db1.Close()
tx1, err := db1.Begin(context.Background(), true)
tx1, err := db1.Begin(context.Background(), ethdb.RW)
require.NoError(t, err)
defer tx1.Rollback()
db2 := ethdb.NewMemDatabase()
defer db2.Close()
tx2, err := db2.Begin(context.Background(), true)
tx2, err := db2.Begin(context.Background(), ethdb.RW)
require.NoError(t, err)
defer tx2.Rollback()

View File

@ -22,13 +22,13 @@ func getTmpDir() string {
func TestPromoteHashedStateClearState(t *testing.T) {
db1 := ethdb.NewMemDatabase()
defer db1.Close()
tx1, err := db1.Begin(context.Background(), true)
tx1, err := db1.Begin(context.Background(), ethdb.RW)
require.NoError(t, err)
defer tx1.Rollback()
db2 := ethdb.NewMemDatabase()
defer db2.Close()
tx2, err := db2.Begin(context.Background(), true)
tx2, err := db2.Begin(context.Background(), ethdb.RW)
require.NoError(t, err)
defer tx2.Rollback()
@ -55,13 +55,13 @@ func TestPromoteHashedStateClearState(t *testing.T) {
func TestPromoteHashedStateIncremental(t *testing.T) {
db1 := ethdb.NewMemDatabase()
defer db1.Close()
tx1, err := db1.Begin(context.Background(), true)
tx1, err := db1.Begin(context.Background(), ethdb.RW)
require.NoError(t, err)
defer tx1.Rollback()
db2 := ethdb.NewMemDatabase()
defer db2.Close()
tx2, err := db2.Begin(context.Background(), true)
tx2, err := db2.Begin(context.Background(), ethdb.RW)
require.NoError(t, err)
defer tx2.Rollback()
@ -104,13 +104,13 @@ func TestPromoteHashedStateIncremental(t *testing.T) {
func TestPromoteHashedStateIncrementalMixed(t *testing.T) {
db1 := ethdb.NewMemDatabase()
defer db1.Close()
tx1, err := db1.Begin(context.Background(), true)
tx1, err := db1.Begin(context.Background(), ethdb.RW)
require.NoError(t, err)
defer tx1.Rollback()
db2 := ethdb.NewMemDatabase()
defer db2.Close()
tx2, err := db2.Begin(context.Background(), true)
tx2, err := db2.Begin(context.Background(), ethdb.RW)
require.NoError(t, err)
defer tx2.Rollback()
@ -137,13 +137,13 @@ func TestPromoteHashedStateIncrementalMixed(t *testing.T) {
func TestUnwindHashed(t *testing.T) {
db1 := ethdb.NewMemDatabase()
defer db1.Close()
tx1, err := db1.Begin(context.Background(), true)
tx1, err := db1.Begin(context.Background(), ethdb.RW)
require.NoError(t, err)
defer tx1.Rollback()
db2 := ethdb.NewMemDatabase()
defer db2.Close()
tx2, err := db2.Begin(context.Background(), true)
tx2, err := db2.Begin(context.Background(), ethdb.RW)
require.NoError(t, err)
defer tx2.Rollback()

View File

@ -39,7 +39,7 @@ func SpawnIntermediateHashesStage(s *StageState, db ethdb.Database, tmpdir strin
useExternalTx = true
} else {
var err error
tx, err = db.Begin(context.Background(), true)
tx, err = db.Begin(context.Background(), ethdb.RW)
if err != nil {
return err
}
@ -308,7 +308,7 @@ func UnwindIntermediateHashesStage(u *UnwindState, s *StageState, db ethdb.Datab
useExternalTx = true
} else {
var err error
tx, err = db.Begin(context.Background(), true)
tx, err = db.Begin(context.Background(), ethdb.RW)
if err != nil {
return err
}

View File

@ -36,7 +36,7 @@ func SpawnLogIndex(s *StageState, db ethdb.Database, tmpdir string, quit <-chan
useExternalTx = true
} else {
var err error
tx, err = db.Begin(context.Background(), true)
tx, err = db.Begin(context.Background(), ethdb.RW)
if err != nil {
return err
}
@ -81,15 +81,17 @@ func promoteLogIndex(logPrefix string, db ethdb.Database, start uint64, tmpdir s
tx := db.(ethdb.HasTx).Tx()
topics := map[string]*roaring.Bitmap{}
addresses := map[string]*roaring.Bitmap{}
receipts := tx.Cursor(dbutils.BlockReceiptsPrefix)
defer receipts.Close()
logs := tx.Cursor(dbutils.Log)
defer logs.Close()
checkFlushEvery := time.NewTicker(logIndicesCheckSizeEvery)
defer checkFlushEvery.Stop()
collectorTopics := etl.NewCollector(tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize))
collectorAddrs := etl.NewCollector(tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize))
for k, v, err := receipts.Seek(dbutils.EncodeBlockNumber(start)); k != nil; k, v, err = receipts.Next() {
reader := bytes.NewReader(nil)
for k, v, err := logs.Seek(dbutils.LogKey(start, 0)); k != nil; k, v, err = logs.Next() {
if err != nil {
return err
}
@ -121,31 +123,30 @@ func promoteLogIndex(logPrefix string, db ethdb.Database, start uint64, tmpdir s
}
}
receipts := types.Receipts{}
if err := cbor.Unmarshal(&receipts, v); err != nil {
var ll types.Logs
reader.Reset(v)
if err := cbor.Unmarshal(&ll, reader); err != nil {
return fmt.Errorf("%s: receipt unmarshal failed: %w, blocl=%d", logPrefix, err, blockNum)
}
for _, receipt := range receipts {
for _, log := range receipt.Logs {
for _, topic := range log.Topics {
topicStr := string(topic.Bytes())
m, ok := topics[topicStr]
if !ok {
m = roaring.New()
topics[topicStr] = m
}
m.Add(uint32(blockNum))
}
accStr := string(log.Address.Bytes())
m, ok := addresses[accStr]
for _, l := range ll {
for _, topic := range l.Topics {
topicStr := string(topic.Bytes())
m, ok := topics[topicStr]
if !ok {
m = roaring.New()
addresses[accStr] = m
topics[topicStr] = m
}
m.Add(uint32(blockNum))
}
accStr := string(l.Address.Bytes())
m, ok := addresses[accStr]
if !ok {
m = roaring.New()
addresses[accStr] = m
}
m.Add(uint32(blockNum))
}
}
@ -224,7 +225,7 @@ func UnwindLogIndex(u *UnwindState, s *StageState, db ethdb.Database, quitCh <-c
useExternalTx = true
} else {
var err error
tx, err = db.Begin(context.Background(), true)
tx, err = db.Begin(context.Background(), ethdb.RW)
if err != nil {
return err
}
@ -254,22 +255,20 @@ func unwindLogIndex(logPrefix string, db ethdb.DbWithPendingMutations, from, to
addrs := map[string]struct{}{}
start := dbutils.EncodeBlockNumber(to + 1)
if err := db.Walk(dbutils.BlockReceiptsPrefix, start, 0, func(k, v []byte) (bool, error) {
if err := db.Walk(dbutils.Log, start, 0, func(k, v []byte) (bool, error) {
if err := common.Stopped(quitCh); err != nil {
return false, err
}
receipts := types.Receipts{}
if err := cbor.Unmarshal(&receipts, v); err != nil {
return false, fmt.Errorf("%s: receipt unmarshal failed: %w, k=%x", logPrefix, err, k)
var logs types.Logs
if err := cbor.Unmarshal(&logs, bytes.NewReader(v)); err != nil {
return false, fmt.Errorf("%s: receipt unmarshal failed: %w, block=%d", logPrefix, err, binary.BigEndian.Uint64(k))
}
for _, receipt := range receipts {
for _, log := range receipt.Logs {
for _, topic := range log.Topics {
topics[string(topic.Bytes())] = struct{}{}
}
addrs[string(log.Address.Bytes())] = struct{}{}
for _, l := range logs {
for _, topic := range l.Topics {
topics[string(topic.Bytes())] = struct{}{}
}
addrs[string(l.Address.Bytes())] = struct{}{}
}
return true, nil
}); err != nil {

View File

@ -6,6 +6,7 @@ import (
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/ethdb/bitmapdb"
@ -18,7 +19,7 @@ func TestLogIndex(t *testing.T) {
db := ethdb.NewMemDatabase()
defer db.Close()
tx, err := db.Begin(context.Background(), true)
tx, err := db.Begin(context.Background(), ethdb.RW)
require.NoError(err)
defer tx.Rollback()
@ -44,32 +45,29 @@ func TestLogIndex(t *testing.T) {
},
},
}}
err = appendReceipts(tx, receipts1, 1, common.Hash{})
err = rawdb.AppendReceipts(tx, 1, receipts1)
require.NoError(err)
err = appendReceipts(tx, receipts2, 2, common.Hash{})
err = rawdb.AppendReceipts(tx, 2, receipts2)
require.NoError(err)
err = promoteLogIndex("logPrefix", tx, 0, "", nil)
require.NoError(err)
// Check indices GetCardinality (in how many blocks they meet)
logTopicIndex := tx.(ethdb.HasTx).Tx().Cursor(dbutils.LogTopicIndex)
logAddrIndex := tx.(ethdb.HasTx).Tx().Cursor(dbutils.LogAddressIndex)
m, err := bitmapdb.Get(logAddrIndex, addr1[:], 0, 10_000_000)
m, err := bitmapdb.Get(tx, dbutils.LogAddressIndex, addr1[:], 0, 10_000_000)
require.NoError(err)
require.Equal(1, int(m.GetCardinality()))
m, err = bitmapdb.Get(logAddrIndex, addr2[:], 0, 10_000_000)
m, err = bitmapdb.Get(tx, dbutils.LogAddressIndex, addr2[:], 0, 10_000_000)
require.NoError(err)
require.Equal(1, int(m.GetCardinality()))
m, err = bitmapdb.Get(logTopicIndex, topic1[:], 0, 10_000_000)
m, err = bitmapdb.Get(tx, dbutils.LogTopicIndex, topic1[:], 0, 10_000_000)
require.NoError(err)
require.Equal(1, int(m.GetCardinality()), 0, 10_000_000)
m, err = bitmapdb.Get(logTopicIndex, topic2[:], 0, 10_000_000)
m, err = bitmapdb.Get(tx, dbutils.LogTopicIndex, topic2[:], 0, 10_000_000)
require.NoError(err)
require.Equal(2, int(m.GetCardinality()))
@ -77,19 +75,19 @@ func TestLogIndex(t *testing.T) {
err = unwindLogIndex("logPrefix", tx, 2, 1, nil)
require.NoError(err)
m, err = bitmapdb.Get(logAddrIndex, addr1[:], 0, 10_000_000)
m, err = bitmapdb.Get(tx, dbutils.LogAddressIndex, addr1[:], 0, 10_000_000)
require.NoError(err)
require.Equal(1, int(m.GetCardinality()))
m, err = bitmapdb.Get(logAddrIndex, addr2[:], 0, 10_000_000)
m, err = bitmapdb.Get(tx, dbutils.LogAddressIndex, addr2[:], 0, 10_000_000)
require.NoError(err)
require.Equal(0, int(m.GetCardinality()))
m, err = bitmapdb.Get(logTopicIndex, topic1[:], 0, 10_000_000)
m, err = bitmapdb.Get(tx, dbutils.LogTopicIndex, topic1[:], 0, 10_000_000)
require.NoError(err)
require.Equal(1, int(m.GetCardinality()))
m, err = bitmapdb.Get(logTopicIndex, topic2[:], 0, 10_000_000)
m, err = bitmapdb.Get(tx, dbutils.LogTopicIndex, topic2[:], 0, 10_000_000)
require.NoError(err)
require.Equal(1, int(m.GetCardinality()))
@ -97,19 +95,19 @@ func TestLogIndex(t *testing.T) {
err = unwindLogIndex("logPrefix", tx, 1, 0, nil)
require.NoError(err)
m, err = bitmapdb.Get(logAddrIndex, addr1[:], 0, 10_000_000)
m, err = bitmapdb.Get(tx, dbutils.LogAddressIndex, addr1[:], 0, 10_000_000)
require.NoError(err)
require.Equal(0, int(m.GetCardinality()))
m, err = bitmapdb.Get(logAddrIndex, addr2[:], 0, 10_000_000)
m, err = bitmapdb.Get(tx, dbutils.LogAddressIndex, addr2[:], 0, 10_000_000)
require.NoError(err)
require.Equal(0, int(m.GetCardinality()))
m, err = bitmapdb.Get(logTopicIndex, topic1[:], 0, 10_000_000)
m, err = bitmapdb.Get(tx, dbutils.LogTopicIndex, topic1[:], 0, 10_000_000)
require.NoError(err)
require.Equal(0, int(m.GetCardinality()))
m, err = bitmapdb.Get(logTopicIndex, topic2[:], 0, 10_000_000)
m, err = bitmapdb.Get(tx, dbutils.LogTopicIndex, topic2[:], 0, 10_000_000)
require.NoError(err)
require.Equal(0, int(m.GetCardinality()))
}

View File

@ -142,31 +142,27 @@ func TruncateRange(tx ethdb.Tx, bucket string, key []byte, from, to uint64) erro
// Get - reading as much chunks as needed to satisfy [from, to] condition
// join all chunks to 1 bitmap by Or operator
func Get(c ethdb.Cursor, key []byte, from, to uint32) (*roaring.Bitmap, error) {
func Get(db ethdb.Getter, bucket string, key []byte, from, to uint32) (*roaring.Bitmap, error) {
var chunks []*roaring.Bitmap
fromKey := make([]byte, len(key)+4)
copy(fromKey, key)
binary.BigEndian.PutUint32(fromKey[len(fromKey)-4:], from)
for k, v, err := c.Seek(fromKey); k != nil; k, v, err = c.Next() {
if err != nil {
return nil, err
}
if !bytes.HasPrefix(k, key) {
break
}
if err := db.Walk(bucket, fromKey, len(key)*8, func(k, v []byte) (bool, error) {
bm := roaring.New()
_, err := bm.FromBuffer(v)
if err != nil {
return nil, err
return false, err
}
chunks = append(chunks, bm)
if binary.BigEndian.Uint32(k[len(k)-4:]) >= to {
break
return false, nil
}
return true, nil
}); err != nil {
return nil, err
}
if len(chunks) == 0 {

View File

@ -4,46 +4,28 @@ import (
"io"
)
// Marshal - se
// you don't need reset buffer if you reusing it - method will does it for you
func Marshal(dst *[]byte, v interface{}) error {
e := EncoderBytes(dst)
err := e.Encode(v)
returnEncoderToPool(e)
return err
}
// Unmarshal
// if you unmarshal data from database - and plan to use object outside of transaction - copy data before unmarshal
func Unmarshal(dst interface{}, data []byte) error {
d := DecoderBytes(data)
err := d.Decode(dst)
returnDecoderToPool(d)
return err
}
func MarshalWriter(dst io.Writer, v interface{}) error {
func Marshal(dst io.Writer, v interface{}) error {
e := Encoder(dst)
err := e.Encode(v)
returnEncoderToPool(e)
return err
}
func UnmarshalReader(dst interface{}, data io.Reader) error {
func Unmarshal(dst interface{}, data io.Reader) error {
d := Decoder(data)
err := d.Decode(dst)
returnDecoderToPool(d)
return err
}
func MustMarshal(dst *[]byte, v interface{}) {
func MustMarshal(dst io.Writer, v interface{}) {
err := Marshal(dst, v)
if err != nil {
panic(err)
}
}
func MustUnmarshal(dst interface{}, data []byte) {
func MustUnmarshal(dst interface{}, data io.Reader) {
err := Unmarshal(dst, data)
if err != nil {
panic(err)

View File

@ -143,9 +143,9 @@ func testPutGet(db MinDatabase, t *testing.T) {
}
func testNoPanicAfterDbClosed(db Database, t *testing.T) {
tx, err := db.(HasKV).KV().Begin(context.Background(), nil, false)
tx, err := db.(HasKV).KV().Begin(context.Background(), nil, RO)
require.NoError(t, err)
writeTx, err := db.(HasKV).KV().Begin(context.Background(), nil, true)
writeTx, err := db.(HasKV).KV().Begin(context.Background(), nil, RW)
require.NoError(t, err)
closeCh := make(chan struct{}, 1)

View File

@ -91,7 +91,7 @@ type Database interface {
// batch.Commit()
//
NewBatch() DbWithPendingMutations //
Begin(ctx context.Context, writable bool) (DbWithPendingMutations, error) // starts db transaction
Begin(ctx context.Context, flags TxFlags) (DbWithPendingMutations, error) // starts db transaction
Last(bucket string) ([]byte, []byte, error)
// IdealBatchSize defines the size of the data batches should ideally add in one write.
@ -165,13 +165,8 @@ type HasNetInterface interface {
type BucketsMigrator interface {
BucketExists(bucket string) (bool, error) // makes them empty
// freelist-friendly methods
DropBucketsAndCommitEvery(deleteKeysPerTx uint64, buckets ...string) error
ClearBucketsAndCommitEvery(deleteKeysPerTx uint64, buckets ...string) error
// _Deprecated: freelist-unfriendly methods
ClearBuckets(buckets ...string) error // makes them empty
DropBuckets(buckets ...string) error // drops them, use of them after drop will panic
ClearBuckets(buckets ...string) error // makes them empty
DropBuckets(buckets ...string) error // drops them, use of them after drop will panic
}
var errNotSupported = errors.New("not supported")

View File

@ -25,7 +25,7 @@ var (
// }
//
// Common pattern for long-living transactions:
// tx, err := db.Begin(true)
// tx, err := db.Begin(ethdb.RW)
// if err != nil {
// return err
// }
@ -56,10 +56,20 @@ type KV interface {
// as its parent. Transactions may be nested to any level. A parent
// transaction and its cursors may not issue any other operations than
// Commit and Rollback while it has active child transactions.
Begin(ctx context.Context, parent Tx, writable bool) (Tx, error)
Begin(ctx context.Context, parent Tx, flags TxFlags) (Tx, error)
AllBuckets() dbutils.BucketsCfg
}
type TxFlags uint
const (
RW TxFlags = 0x00 // default
RO TxFlags = 0x02
Try TxFlags = 0x04
NoMetaSync TxFlags = 0x08
NoSync TxFlags = 0x10
)
type Tx interface {
// Cursor - creates cursor object on top of given bucket. Type of cursor - depends on bucket configuration.
// If bucket was created with lmdb.DupSort flag, then cursor with interface CursorDupSort created

View File

@ -21,6 +21,11 @@ const (
NonExistingDBI = 999_999_999
)
const (
TxRO = 1
TxRW
)
var (
LMDBDefaultMapSize = 2 * datasize.TB
LMDBDefaultMaxFreelistReuse = uint(1000) // measured in pages
@ -97,7 +102,7 @@ func (opts LmdbOpts) Open() (KV, error) {
if opts.mapSize == 0 {
if opts.inMem {
opts.mapSize = 64 * datasize.MB
opts.mapSize = 128 * datasize.MB
} else {
opts.mapSize = LMDBDefaultMapSize
}
@ -144,7 +149,7 @@ func (opts LmdbOpts) Open() (KV, error) {
// Open or create buckets
if opts.readOnly {
tx, innerErr := db.Begin(context.Background(), nil, false)
tx, innerErr := db.Begin(context.Background(), nil, RO)
if innerErr != nil {
return nil, innerErr
}
@ -273,7 +278,7 @@ func (db *LmdbKV) DiskSize(_ context.Context) (uint64, error) {
return uint64(stats.PSize) * (stats.LeafPages + stats.BranchPages + stats.OverflowPages), nil
}
func (db *LmdbKV) Begin(_ context.Context, parent Tx, writable bool) (Tx, error) {
func (db *LmdbKV) Begin(_ context.Context, parent Tx, flags TxFlags) (Tx, error) {
if db.env == nil {
return nil, fmt.Errorf("db closed")
}
@ -283,15 +288,15 @@ func (db *LmdbKV) Begin(_ context.Context, parent Tx, writable bool) (Tx, error)
db.wg.Add(1)
}
flags := uint(0)
if !writable {
flags |= lmdb.Readonly
nativeFlags := uint(0)
if flags&RO != 0 {
nativeFlags |= lmdb.Readonly
}
var parentTx *lmdb.Txn
if parent != nil {
parentTx = parent.(*lmdbTx).tx
}
tx, err := db.env.BeginTxn(parentTx, flags)
tx, err := db.env.BeginTxn(parentTx, nativeFlags)
if err != nil {
if !isSubTx {
runtime.UnlockOSThread() // unlock only in case of error. normal flow is "defer .Rollback()"
@ -381,7 +386,7 @@ func (db *LmdbKV) View(ctx context.Context, f func(tx Tx) error) (err error) {
defer db.wg.Done()
// can't use db.evn.View method - because it calls commit for read transactions - it conflicts with write transactions.
tx, err := db.Begin(ctx, nil, false)
tx, err := db.Begin(ctx, nil, RO)
if err != nil {
return err
}
@ -397,7 +402,7 @@ func (db *LmdbKV) Update(ctx context.Context, f func(tx Tx) error) (err error) {
db.wg.Add(1)
defer db.wg.Done()
tx, err := db.Begin(ctx, nil, true)
tx, err := db.Begin(ctx, nil, RW)
if err != nil {
return err
}

View File

@ -16,7 +16,7 @@ func TestBucketCRUD(t *testing.T) {
defer kv.Close()
ctx := context.Background()
tx, err := kv.Begin(ctx, nil, true)
tx, err := kv.Begin(ctx, nil, RW)
require.NoError(err)
defer tx.Rollback()
@ -93,7 +93,7 @@ func TestReadOnlyMode(t *testing.T) {
}
}).ReadOnly().MustOpen()
tx, err := db2.Begin(context.Background(), nil, false)
tx, err := db2.Begin(context.Background(), nil, RO)
if err != nil {
t.Fatal(err)
}

View File

@ -224,7 +224,7 @@ func (db *RemoteKV) DiskSize(ctx context.Context) (uint64, error) {
return sizeReply.Size, nil
}
func (db *RemoteKV) Begin(ctx context.Context, parent Tx, writable bool) (Tx, error) {
func (db *RemoteKV) Begin(ctx context.Context, parent Tx, flags TxFlags) (Tx, error) {
streamCtx, streamCancelFn := context.WithCancel(ctx) // We create child context for the stream so we can cancel it to prevent leak
stream, err := db.remoteKV.Tx(streamCtx)
if err != nil {
@ -235,7 +235,7 @@ func (db *RemoteKV) Begin(ctx context.Context, parent Tx, writable bool) (Tx, er
}
func (db *RemoteKV) View(ctx context.Context, f func(tx Tx) error) (err error) {
tx, err := db.Begin(ctx, nil, false)
tx, err := db.Begin(ctx, nil, RO)
if err != nil {
return err
}

View File

@ -169,7 +169,7 @@ func (opts snapshotOpts) MustOpen() KV {
}
func (s *SnapshotKV) View(ctx context.Context, f func(tx Tx) error) error {
dbTx, err := s.db.Begin(ctx, nil, false)
dbTx, err := s.db.Begin(ctx, nil, RO)
if err != nil {
return err
}
@ -177,7 +177,7 @@ func (s *SnapshotKV) View(ctx context.Context, f func(tx Tx) error) error {
t := &snapshotTX{
dbTX: dbTx,
snTX: newVirtualTx(func() (Tx, error) {
return s.snapshotDB.Begin(ctx, nil, false)
return s.snapshotDB.Begin(ctx, nil, RO)
}, s.forBuckets),
forBuckets: s.forBuckets,
}
@ -196,8 +196,8 @@ func (s *SnapshotKV) Close() {
}
}
func (s *SnapshotKV) Begin(ctx context.Context, parentTx Tx, writable bool) (Tx, error) {
dbTx, err := s.db.Begin(ctx, parentTx, writable)
func (s *SnapshotKV) Begin(ctx context.Context, parentTx Tx, flags TxFlags) (Tx, error) {
dbTx, err := s.db.Begin(ctx, parentTx, flags)
if err != nil {
return nil, err
}
@ -205,7 +205,7 @@ func (s *SnapshotKV) Begin(ctx context.Context, parentTx Tx, writable bool) (Tx,
t := &snapshotTX{
dbTX: dbTx,
snTX: newVirtualTx(func() (Tx, error) {
return s.snapshotDB.Begin(ctx, parentTx, false)
return s.snapshotDB.Begin(ctx, parentTx, flags)
}, s.forBuckets),
forBuckets: s.forBuckets,
}

View File

@ -84,7 +84,7 @@ func TestSnapshotGet(t *testing.T) {
kv := NewSnapshotKV().For(dbutils.HeaderPrefix).SnapshotDB(sn1).DB(mainDB).MustOpen()
kv = NewSnapshotKV().For(dbutils.BlockBodyPrefix).SnapshotDB(sn2).DB(kv).MustOpen()
tx, err := kv.Begin(context.Background(), nil, false)
tx, err := kv.Begin(context.Background(), nil, RO)
if err != nil {
t.Fatal(err)
}
@ -231,7 +231,7 @@ func TestSnapshotWritableTxAndGet(t *testing.T) {
kv := NewSnapshotKV().For(dbutils.HeaderPrefix).SnapshotDB(sn1).DB(mainDB).MustOpen()
kv = NewSnapshotKV().For(dbutils.BlockBodyPrefix).SnapshotDB(sn2).DB(kv).MustOpen()
tx, err := kv.Begin(context.Background(), nil, true)
tx, err := kv.Begin(context.Background(), nil, RW)
if err != nil {
t.Fatal(err)
}
@ -264,7 +264,7 @@ func TestSnapshotWritableTxAndGet(t *testing.T) {
if err != nil {
t.Fatal(err)
}
tx, err = kv.Begin(context.Background(), nil, false)
tx, err = kv.Begin(context.Background(), nil, RO)
if err != nil {
t.Fatal(err)
}

View File

@ -224,8 +224,8 @@ func (m *mutation) NewBatch() DbWithPendingMutations {
return mm
}
func (m *mutation) Begin(ctx context.Context, writable bool) (DbWithPendingMutations, error) {
return m.db.Begin(ctx, writable)
func (m *mutation) Begin(ctx context.Context, flags TxFlags) (DbWithPendingMutations, error) {
return m.db.Begin(ctx, flags)
}
func (m *mutation) panicOnEmptyDB() {

View File

@ -238,104 +238,7 @@ func (db *ObjectDatabase) BucketExists(name string) (bool, error) {
return exists, nil
}
func (db *ObjectDatabase) ClearBucketsAndCommitEvery(deleteKeysPerTx uint64, buckets ...string) error {
for i := range buckets {
name := buckets[i]
log.Info("Cleaning bucket", "name", name)
if err := db.removeBucketContentByMultipleTransactions(name, deleteKeysPerTx); err != nil {
return err
}
if err := db.kv.Update(context.Background(), func(tx Tx) error {
migrator, ok := tx.(BucketMigrator)
if !ok {
return fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", db.kv)
}
if err := migrator.ClearBucket(name); err != nil {
return err
}
return nil
}); err != nil {
return err
}
}
return nil
}
// removeBucketContentByMultipleTransactions - allows to avoid single large freelist record inside database and
// avoid "too big transaction" error
func (db *ObjectDatabase) removeBucketContentByMultipleTransactions(bucket string, deleteKeysPerTx uint64) error {
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
var partialDropDone bool
for !partialDropDone {
if err := db.kv.Update(context.Background(), func(tx Tx) error {
c := tx.Cursor(bucket)
cnt, err := c.Count()
if err != nil {
return err
}
if cnt < deleteKeysPerTx {
partialDropDone = true
return nil
}
var deleted uint64
for k, _, err := c.First(); k != nil; k, _, err = c.First() {
if err != nil {
return err
}
deleted++
if deleted > deleteKeysPerTx {
break
}
err = c.DeleteCurrent()
if err != nil {
return err
}
select {
default:
case <-logEvery.C:
log.Info("ClearBuckets", "bucket", bucket, "records_left", cnt-deleted)
}
}
return nil
}); err != nil {
return fmt.Errorf("partial clean failed. bucket=%s, %w", bucket, err)
}
}
return nil
}
func (db *ObjectDatabase) DropBucketsAndCommitEvery(deleteKeysPerTx uint64, buckets ...string) error {
for i := range buckets {
name := buckets[i]
log.Info("Dropping bucket", "name", name)
if err := db.removeBucketContentByMultipleTransactions(name, deleteKeysPerTx); err != nil {
return err
}
if err := db.kv.Update(context.Background(), func(tx Tx) error {
migrator, ok := tx.(BucketMigrator)
if !ok {
return fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", db.kv)
}
if err := migrator.DropBucket(name); err != nil {
return err
}
return nil
}); err != nil {
return err
}
}
return nil
}
func (db *ObjectDatabase) ClearBuckets(buckets ...string) error {
for i := range buckets {
name := buckets[i]
if err := db.kv.Update(context.Background(), func(tx Tx) error {
@ -448,9 +351,9 @@ func (db *ObjectDatabase) NewBatch() DbWithPendingMutations {
return m
}
func (db *ObjectDatabase) Begin(ctx context.Context, writable bool) (DbWithPendingMutations, error) {
batch := &TxDb{db: db, writable: writable}
if err := batch.begin(ctx, nil, writable); err != nil {
func (db *ObjectDatabase) Begin(ctx context.Context, flags TxFlags) (DbWithPendingMutations, error) {
batch := &TxDb{db: db}
if err := batch.begin(ctx, nil, flags); err != nil {
panic(err)
}
return batch, nil

View File

@ -90,7 +90,7 @@ func NewKvServer(kv ethdb.KV) *KvServer {
}
func (s *KvServer) Tx(stream remote.KV_TxServer) error {
tx, errBegin := s.kv.Begin(stream.Context(), nil, false)
tx, errBegin := s.kv.Begin(stream.Context(), nil, ethdb.RO)
if errBegin != nil {
return fmt.Errorf("server-side error: %w", errBegin)
}
@ -134,7 +134,7 @@ func (s *KvServer) Tx(stream remote.KV_TxServer) error {
}
tx.Rollback()
tx, errBegin = s.kv.Begin(stream.Context(), nil, false)
tx, errBegin = s.kv.Begin(stream.Context(), nil, ethdb.RO)
if errBegin != nil {
return fmt.Errorf("server-side error: %w", errBegin)
}

View File

@ -19,9 +19,9 @@ import (
// Walk and MultiWalk methods - work outside of Tx object yet, will implement it later
type TxDb struct {
db Database
writable bool
tx Tx
ParentTx Tx
txFlags TxFlags
cursors map[string]Cursor
len uint64
}
@ -33,17 +33,17 @@ func (m *TxDb) Close() {
// NewTxDbWithoutTransaction creates TxDb object without opening transaction,
// such TxDb not usable before .Begin() call on it
// It allows inject TxDb object into class hierarchy, but open write transaction later
func NewTxDbWithoutTransaction(db Database, writable bool) *TxDb {
return &TxDb{db: db, writable: writable}
func NewTxDbWithoutTransaction(db Database, flags TxFlags) *TxDb {
return &TxDb{db: db, txFlags: flags}
}
func (m *TxDb) Begin(ctx context.Context, writable bool) (DbWithPendingMutations, error) {
func (m *TxDb) Begin(ctx context.Context, flags TxFlags) (DbWithPendingMutations, error) {
batch := m
if m.tx != nil {
batch = &TxDb{db: m.db}
batch = &TxDb{db: m.db, txFlags: flags}
}
if err := batch.begin(ctx, m.tx, writable); err != nil {
if err := batch.begin(ctx, m.tx, flags); err != nil {
return nil, err
}
return batch, nil
@ -86,8 +86,8 @@ func (m *TxDb) NewBatch() DbWithPendingMutations {
}
}
func (m *TxDb) begin(ctx context.Context, parent Tx, writable bool) error {
tx, err := m.db.(HasKV).KV().Begin(ctx, parent, writable)
func (m *TxDb) begin(ctx context.Context, parent Tx, flags TxFlags) error {
tx, err := m.db.(HasKV).KV().Begin(ctx, parent, flags)
if err != nil {
return err
}
@ -229,7 +229,9 @@ func (m *TxDb) IdealBatchSize() int {
func (m *TxDb) Walk(bucket string, startkey []byte, fixedbits int, walker func([]byte, []byte) (bool, error)) error {
m.panicOnEmptyDB()
return Walk(m.cursors[bucket], startkey, fixedbits, walker)
c := m.tx.Cursor(bucket) // create new cursor, then call other methods of TxDb inside MultiWalk callback will not affect this cursor
defer c.Close()
return Walk(c, startkey, fixedbits, walker)
}
func Walk(c Cursor, startkey []byte, fixedbits int, walker func(k, v []byte) (bool, error)) error {
@ -272,7 +274,9 @@ func ForEach(c Cursor, walker func(k, v []byte) (bool, error)) error {
func (m *TxDb) MultiWalk(bucket string, startkeys [][]byte, fixedbits []int, walker func(int, []byte, []byte) error) error {
m.panicOnEmptyDB()
return MultiWalk(m.cursors[bucket], startkeys, fixedbits, walker)
c := m.tx.Cursor(bucket) // create new cursor, then call other methods of TxDb inside MultiWalk callback will not affect this cursor
defer c.Close()
return MultiWalk(c, startkeys, fixedbits, walker)
}
func MultiWalk(c Cursor, startkeys [][]byte, fixedbits []int, walker func(int, []byte, []byte) error) error {
@ -335,12 +339,12 @@ func (m *TxDb) CommitAndBegin(ctx context.Context) error {
return err
}
return m.begin(ctx, m.ParentTx, m.writable)
return m.begin(ctx, m.ParentTx, m.txFlags)
}
func (m *TxDb) RollbackAndBegin(ctx context.Context) error {
m.Rollback()
return m.begin(ctx, m.ParentTx, m.writable)
return m.begin(ctx, m.ParentTx, m.txFlags)
}
func (m *TxDb) Commit() (uint64, error) {
@ -423,86 +427,6 @@ func (m *TxDb) ClearBuckets(buckets ...string) error {
return nil
}
func (m *TxDb) ClearBucketsAndCommitEvery(deleteKeysPerTx uint64, buckets ...string) error {
for i := range buckets {
name := buckets[i]
log.Info("Clear bucket", "name", name)
if err := m.removeBucketContentByMultipleTransactions(name, deleteKeysPerTx); err != nil {
return err
}
if err := m.tx.(BucketMigrator).ClearBucket(name); err != nil {
return err
}
if err := m.CommitAndBegin(context.Background()); err != nil {
return err
}
}
return nil
}
func (m *TxDb) DropBucketsAndCommitEvery(deleteKeysPerTx uint64, buckets ...string) error {
for i := range buckets {
name := buckets[i]
log.Info("Dropping bucket", "name", name)
if err := m.removeBucketContentByMultipleTransactions(name, deleteKeysPerTx); err != nil {
return err
}
if err := m.tx.(BucketMigrator).DropBucket(name); err != nil {
return err
}
if err := m.CommitAndBegin(context.Background()); err != nil {
return err
}
}
return nil
}
// removeBucketContentByMultipleTransactions - allows to avoid single large freelist record inside database and
// avoid "too big transaction" error
func (m *TxDb) removeBucketContentByMultipleTransactions(bucket string, deleteKeysPerTx uint64) error {
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
var partialDropDone bool
for !partialDropDone {
c := m.tx.Cursor(bucket)
cnt, err := c.Count()
if err != nil {
return err
}
if cnt < deleteKeysPerTx {
return nil
}
var deleted uint64
for k, _, err := c.First(); k != nil; k, _, err = c.First() {
if err != nil {
return err
}
deleted++
if deleted > deleteKeysPerTx {
break
}
err = c.DeleteCurrent()
if err != nil {
return err
}
select {
default:
case <-logEvery.C:
log.Info("ClearBuckets", "bucket", bucket, "records_left", cnt-deleted)
}
}
if err := m.CommitAndBegin(context.Background()); err != nil {
return err
}
}
return nil
}
func (m *TxDb) DropBuckets(buckets ...string) error {
for i := range buckets {
name := buckets[i]

4
go.mod
View File

@ -64,8 +64,8 @@ require (
github.com/status-im/keycard-go v0.0.0-20200402102358-957c09536969
github.com/stretchr/testify v1.6.1
github.com/tyler-smith/go-bip39 v1.0.2
github.com/ugorji/go/codec v1.1.9
github.com/ugorji/go/codec/codecgen v1.1.9
github.com/ugorji/go/codec v1.1.13
github.com/ugorji/go/codec/codecgen v1.1.13
github.com/urfave/cli v1.22.4
github.com/valyala/gozstd v1.8.3
github.com/wcharczuk/go-chart v2.0.1+incompatible

12
go.sum
View File

@ -714,13 +714,13 @@ github.com/tyler-smith/go-bip39 v1.0.2 h1:+t3w+KwLXO6154GNJY+qUtIxLTmFjfUmpguQT1
github.com/tyler-smith/go-bip39 v1.0.2/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go v1.1.9 h1:SObrQTaSuP8WOv2WNCj8gECiNSJIUvk3Q7N26c96Gws=
github.com/ugorji/go v1.1.9/go.mod h1:chLrngdsg43geAaeId+nXO57YsDdl5OZqd/QtBiD19g=
github.com/ugorji/go v1.1.13 h1:nB3O5kBSQGjEQAcfe1aLUYuxmXdFKmYgBZhY32rQb6Q=
github.com/ugorji/go v1.1.13/go.mod h1:jxau1n+/wyTGLQoCkjok9r5zFa/FxT6eI5HiHKQszjc=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/ugorji/go/codec v1.1.9 h1:J/7hhpkQwgypRNvaeh/T5gzJ2gEI/l8S3qyRrdEa1fA=
github.com/ugorji/go/codec v1.1.9/go.mod h1:+SWgpdqOgdW5sBaiDfkHilQ1SxQ1hBkq/R+kHfL7Suo=
github.com/ugorji/go/codec/codecgen v1.1.9 h1:kNvsTdvcZRcvQqrQRudW8T1M70KyJTLE28dtgMzjNtM=
github.com/ugorji/go/codec/codecgen v1.1.9/go.mod h1:bXoHoBru9qfIEAADCABNdxmURHgIzu8tMxnd50m9ujM=
github.com/ugorji/go/codec v1.1.13 h1:013LbFhocBoIqgHeIHKlV4JWYhqogATYWZhIcH0WHn4=
github.com/ugorji/go/codec v1.1.13/go.mod h1:oNVt3Dq+FO91WNQ/9JnHKQP2QJxTzoN7wCBFCq1OeuU=
github.com/ugorji/go/codec/codecgen v1.1.13 h1:rGpZ4Q63VcWA3DMBbIHvg+SQweUkfXBBa/f9X0W+tFg=
github.com/ugorji/go/codec/codecgen v1.1.13/go.mod h1:EhCxlc7Crov+HLygD4+hBCitXNrrGKRrRWj+pRsyJGg=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli v1.22.4 h1:u7tSpNPPswAFymm8IehJhy4uJMlUuU/GmqSkvJ1InXA=
github.com/urfave/cli v1.22.4/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=

View File

@ -50,7 +50,7 @@ func TestDupSortHashState(t *testing.T) {
require.NoError(err)
require.Equal([]byte{2}, v)
tx, err := db.Begin(context.Background(), true)
tx, err := db.Begin(context.Background(), ethdb.RW)
require.NoError(err)
defer tx.Rollback()
@ -109,7 +109,7 @@ func TestDupSortPlainState(t *testing.T) {
require.NoError(err)
require.Equal([]byte{2}, v)
tx, err := db.Begin(context.Background(), true)
tx, err := db.Begin(context.Background(), ethdb.RW)
require.NoError(err)
defer tx.Rollback()

View File

@ -65,6 +65,7 @@ var migrations = []Migration{
clearIndices,
resetIHBucketToRecoverDB,
receiptsCborEncode,
receiptsOnePerTx,
}
type Migration struct {
@ -124,7 +125,7 @@ func (m *Migrator) Apply(db ethdb.Database, tmpdir string) error {
uniqueNameCheck[m.Migrations[i].Name] = true
}
tx, err1 := db.Begin(context.Background(), true)
tx, err1 := db.Begin(context.Background(), ethdb.RW)
if err1 != nil {
return err1
}

View File

@ -1,6 +1,7 @@
package migrations
import (
"bytes"
"encoding/binary"
"fmt"
"os"
@ -22,7 +23,7 @@ var receiptsCborEncode = Migration{
Up: func(db ethdb.Database, tmpdir string, progress []byte, CommitProgress etl.LoadCommitHandler) error {
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
buf := make([]byte, 0, 100_000)
buf := bytes.NewBuffer(make([]byte, 0, 100_000))
const loadStep = "load"
collector, err1 := etl.NewCollectorFromFiles(tmpdir)
@ -59,11 +60,11 @@ var receiptsCborEncode = Migration{
return false, fmt.Errorf("invalid receipt array RLP: %w, k=%x", err, k)
}
buf = buf[:0]
if err := cbor.Marshal(&buf, storageReceipts); err != nil {
buf.Reset()
if err := cbor.Marshal(buf, storageReceipts); err != nil {
return false, err
}
if err := collector.Collect(k, buf); err != nil {
if err := collector.Collect(k, buf.Bytes()); err != nil {
return false, fmt.Errorf("collecting key %x: %w", k, err)
}
return true, nil
@ -92,3 +93,162 @@ var receiptsCborEncode = Migration{
return nil
},
}
var receiptsOnePerTx = Migration{
Name: "receipts_store_logs_separately",
Up: func(db ethdb.Database, tmpdir string, progress []byte, CommitProgress etl.LoadCommitHandler) (err error) {
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
logPrefix := "receipts_store_logs_separately"
// Recently was introduced receipts serialization problem
// Code was not generated well for types.Log type
// So, to fix this problem - need deserialize by reflection (LegacyReceipt doesn't have generated code)
// then serialize by generated code - types.Receipts and types.Log have generated code now
type LegacyReceipt struct {
PostState []byte `codec:"1"`
Status uint64 `codec:"2"`
CumulativeGasUsed uint64 `codec:"3"`
Logs []*types.Log `codec:"4"`
}
buf := bytes.NewBuffer(make([]byte, 0, 100_000))
reader := bytes.NewReader(nil)
const loadStep = "load"
collectorR, err1 := etl.NewCollectorFromFiles(tmpdir + "1")
if err1 != nil {
return err1
}
collectorL, err1 := etl.NewCollectorFromFiles(tmpdir + "2")
if err1 != nil {
return err1
}
switch string(progress) {
case "":
// can't use files if progress field not set, clear them
if collectorR != nil {
collectorR.Close(logPrefix)
collectorR = nil
}
if collectorL != nil {
collectorL.Close(logPrefix)
collectorL = nil
}
case loadStep:
if collectorR == nil || collectorL == nil {
return ErrMigrationETLFilesDeleted
}
defer func() {
// don't clean if error or panic happened
if err != nil {
return
}
if rec := recover(); rec != nil {
panic(rec)
}
collectorR.Close(logPrefix)
collectorL.Close(logPrefix)
}()
goto LoadStep
}
collectorR = etl.NewCriticalCollector(tmpdir+"1", etl.NewSortableBuffer(etl.BufferOptimalSize*2))
collectorL = etl.NewCriticalCollector(tmpdir+"2", etl.NewSortableBuffer(etl.BufferOptimalSize*2))
defer func() {
// don't clean if error or panic happened
if err != nil {
return
}
if rec := recover(); rec != nil {
panic(rec)
}
collectorR.Close(logPrefix)
collectorL.Close(logPrefix)
}()
if err := db.Walk(dbutils.BlockReceiptsPrefix, nil, 0, func(k, v []byte) (bool, error) {
blockNum := binary.BigEndian.Uint64(k[:8])
select {
default:
case <-logEvery.C:
var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Info(fmt.Sprintf("[%s] Progress", logPrefix), "blockNum", blockNum, "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys))
}
// Convert the receipts from their storage form to their internal representation
var legacyReceipts []*LegacyReceipt
reader.Reset(v)
if err := cbor.Unmarshal(&legacyReceipts, reader); err != nil {
return false, err
}
// Convert the receipts from their storage form to their internal representation
receipts := make(types.Receipts, len(legacyReceipts))
for i := range legacyReceipts {
receipts[i] = &types.Receipt{}
receipts[i].PostState = legacyReceipts[i].PostState
receipts[i].Status = legacyReceipts[i].Status
receipts[i].CumulativeGasUsed = legacyReceipts[i].CumulativeGasUsed
receipts[i].Logs = legacyReceipts[i].Logs
}
for txId, r := range receipts {
if len(r.Logs) == 0 {
continue
}
newK := make([]byte, 8+4)
copy(newK, k[:8])
binary.BigEndian.PutUint32(newK[8:], uint32(txId))
buf.Reset()
if err := cbor.Marshal(buf, r.Logs); err != nil {
return false, err
}
if err := collectorL.Collect(newK, buf.Bytes()); err != nil {
return false, fmt.Errorf("collecting key %x: %w", k, err)
}
}
buf.Reset()
if err := cbor.Marshal(buf, receipts); err != nil {
return false, err
}
if err := collectorR.Collect(common.CopyBytes(k[:8]), buf.Bytes()); err != nil {
return false, fmt.Errorf("collecting key %x: %w", k, err)
}
return true, nil
}); err != nil {
return err
}
if err := db.(ethdb.BucketsMigrator).ClearBuckets(dbutils.BlockReceiptsPrefix); err != nil {
return fmt.Errorf("clearing the receipt bucket: %w", err)
}
// Commit clearing of the bucket - freelist should now be written to the database
if err := CommitProgress(db, []byte(loadStep), false); err != nil {
return fmt.Errorf("committing the removal of receipt table")
}
LoadStep:
// Commit again
if err := CommitProgress(db, []byte(loadStep), false); err != nil {
return fmt.Errorf("committing the removal of receipt table")
}
// Now transaction would have been re-opened, and we should be re-using the space
if err := collectorR.Load(logPrefix, db, dbutils.BlockReceiptsPrefix, etl.IdentityLoadFunc, etl.TransformArgs{}); err != nil {
return fmt.Errorf("loading the transformed data back into the receipts table: %w", err)
}
if err := collectorL.Load(logPrefix, db, dbutils.Log, etl.IdentityLoadFunc, etl.TransformArgs{OnLoadCommit: CommitProgress}); err != nil {
return fmt.Errorf("loading the transformed data back into the receipts table: %w", err)
}
return nil
},
}

View File

@ -1,12 +1,17 @@
package migrations
import (
"bytes"
"context"
"encoding/binary"
"errors"
"testing"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/ethdb/cbor"
"github.com/stretchr/testify/require"
)
@ -33,3 +38,154 @@ func TestReceiptCbor(t *testing.T) {
})
require.True(errors.Is(err, ErrMigrationETLFilesDeleted))
}
func TestReceiptOnePerTx(t *testing.T) {
require, db := require.New(t), ethdb.NewMemDatabase()
err := db.KV().Update(context.Background(), func(tx ethdb.Tx) error {
return tx.(ethdb.BucketMigrator).CreateBucket(dbutils.BlockReceiptsPrefix)
})
require.NoError(err)
type LegacyReceipt struct {
PostState []byte `codec:"1"`
Status uint64 `codec:"2"`
CumulativeGasUsed uint64 `codec:"3"`
Logs []*types.Log `codec:"4"`
}
buf := bytes.NewBuffer(nil)
k := make([]byte, 8+32)
binary.BigEndian.PutUint64(k, 1)
block1 := []*LegacyReceipt{
{
CumulativeGasUsed: 1_000_000_000,
Logs: []*types.Log{
{Address: common.HexToAddress("01"), Data: common.FromHex("02")},
{Address: common.HexToAddress("03"), Data: common.FromHex("04")},
},
},
{
CumulativeGasUsed: 2_000_000_000,
Logs: []*types.Log{
{Address: common.HexToAddress("05"), Data: common.FromHex("06")},
{Address: common.HexToAddress("07"), Data: common.FromHex("08")},
},
},
}
err = cbor.Marshal(buf, block1)
require.NoError(err)
err = db.Put(dbutils.BlockReceiptsPrefix, common.CopyBytes(k), common.CopyBytes(buf.Bytes()))
require.NoError(err)
buf.Reset()
binary.BigEndian.PutUint64(k, 2)
block2 := []*LegacyReceipt{
{
CumulativeGasUsed: 3_000_000_000,
Logs: []*types.Log{
{Address: common.HexToAddress("09"), Data: common.FromHex("10")},
{Address: common.HexToAddress("11"), Data: common.FromHex("12")},
},
},
}
err = cbor.Marshal(buf, block2)
require.NoError(err)
err = db.Put(dbutils.BlockReceiptsPrefix, common.CopyBytes(k), common.CopyBytes(buf.Bytes()))
require.NoError(err)
migrator := NewMigrator()
migrator.Migrations = []Migration{receiptsOnePerTx}
err = migrator.Apply(db, "")
require.NoError(err)
// test high-level data access didn't change
i := 0
err = db.Walk(dbutils.BlockReceiptsPrefix, nil, 0, func(k, v []byte) (bool, error) {
i++
return true, nil
})
require.NoError(err)
require.Equal(2, i)
i = 0
err = db.Walk(dbutils.Log, nil, 0, func(k, v []byte) (bool, error) {
i++
return true, nil
})
require.NoError(err)
require.Equal(3, i)
{
newK := make([]byte, 8)
binary.BigEndian.PutUint64(newK, 1)
v, err := db.Get(dbutils.BlockReceiptsPrefix, newK)
require.NoError(err)
var r types.Receipts
err = cbor.Unmarshal(&r, bytes.NewReader(v))
require.NoError(err)
require.Equal(len(r), len(block1))
require.Equal(r[0].CumulativeGasUsed, block1[0].CumulativeGasUsed)
require.Equal(r[1].CumulativeGasUsed, block1[1].CumulativeGasUsed)
require.Nil(r[0].Logs)
require.Nil(r[1].Logs)
}
{
newK := make([]byte, 8)
binary.BigEndian.PutUint64(newK, 2)
v, err := db.Get(dbutils.BlockReceiptsPrefix, newK)
require.NoError(err)
var r types.Receipts
err = cbor.Unmarshal(&r, bytes.NewReader(v))
require.NoError(err)
require.Equal(len(r), len(block2))
require.Equal(r[0].CumulativeGasUsed, block2[0].CumulativeGasUsed)
require.Nil(r[0].Logs)
}
{
newK := make([]byte, 8+4)
binary.BigEndian.PutUint64(newK, 1)
binary.BigEndian.PutUint32(newK[8:], 0)
v, err := db.Get(dbutils.Log, newK)
require.NoError(err)
var l types.Logs
err = cbor.Unmarshal(&l, bytes.NewReader(v))
require.NoError(err)
require.Equal(len(l), len(block1[0].Logs))
require.Equal(l[0], block1[0].Logs[0])
require.Equal(l[1], block1[0].Logs[1])
}
{
newK := make([]byte, 8+4)
binary.BigEndian.PutUint64(newK, 1)
binary.BigEndian.PutUint32(newK[8:], 1)
v, err := db.Get(dbutils.Log, newK)
require.NoError(err)
var l types.Logs
err = cbor.Unmarshal(&l, bytes.NewReader(v))
require.NoError(err)
require.Equal(len(l), len(block1[1].Logs))
require.Equal(l[0], block1[1].Logs[0])
require.Equal(l[1], block1[1].Logs[1])
}
{
newK := make([]byte, 8+4)
binary.BigEndian.PutUint64(newK, 2)
binary.BigEndian.PutUint32(newK[8:], 0)
v, err := db.Get(dbutils.Log, newK)
require.NoError(err)
var l types.Logs
err = cbor.Unmarshal(&l, bytes.NewReader(v))
require.NoError(err)
require.Equal(len(l), len(block2[0].Logs))
require.Equal(l[0], block2[0].Logs[0])
}
}

View File

@ -136,7 +136,7 @@ func (t *BlockTest) Run(_ bool) error {
if common.Hash(t.json.BestBlock) != cmlast {
return fmt.Errorf("last block hash validation mismatch: want: %x, have: %x", t.json.BestBlock, cmlast)
}
tx, err1 := db.KV().Begin(context.Background(), nil, false)
tx, err1 := db.KV().Begin(context.Background(), nil, ethdb.RO)
if err1 != nil {
return fmt.Errorf("blockTest create tx: %v", err1)
}

View File

@ -593,7 +593,7 @@ func (fstl *FlatDbSubTrieLoader) LoadSubTries() (SubTries, error) {
}
if fstl.tx == nil {
var err error
fstl.tx, err = fstl.kv.Begin(context.Background(), nil, false)
fstl.tx, err = fstl.kv.Begin(context.Background(), nil, ethdb.RO)
if err != nil {
return SubTries{}, err
}

View File

@ -372,7 +372,7 @@ func (l *FlatDBTrieLoader) CalcTrieRoot(db ethdb.Database, quit <-chan struct{})
useExternalTx = true
} else {
var err error
txDB, err = db.Begin(context.Background(), true)
txDB, err = db.Begin(context.Background(), ethdb.RW)
if err != nil {
return EmptyRoot, err
}