mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-11 13:30:05 +00:00
d697b6fe7c
this pr changes filterLogs to use a pre computed hashset of addresses,
instead of iterating across the list of addresses once per log.
this greatly increases the speed of filter queries that use many
addresses and also return a large number of logs. In our case, we are
performing a query for all the trades performed in a uniswap v3 pool in
a 250 block range.
my benchmarks were performed with the data & code below:
address list gist is here
[addrs](2c30b0df43/gistfile1.txt
)
```
c := NewRpcClient()
addrs := []common.Address{AddressListGist}
logs, err := c.FilterLogs(context.TODO(), ethereum.FilterQuery{
FromBlock:big.NewInt(15640000),
ToBlock: big.NewInt(15640250),
Addresses: addrs,
Topics: [][]common.Hash{
{
common.HexToHash("c42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"),
},
},
```
the query contains 8442 addresses, while the response contains 1277 logs
On average, current devel averages a 15.57 second response time on my machine after 10 runs, while the new filterLogs averages 1.05 seconds.
for CURRENT DEVEL, the profile is here: https://pprof.aaaaa.news/cd8dkv0tidul37sctmi0/flamegraph
for the filterLogs branch, the profile is here: https://pprof.aaaaa.news/cd8dlmgtidul37sctmig/flamegraph
while the tests pass with this branch, I am not really sure why filterLogs was originally programmed the way it was. Is there some sort of edge case / compatibility thing that I am missing with this change?
Co-authored-by: a <a@a.a>
241 lines
6.5 KiB
Go
241 lines
6.5 KiB
Go
package commands
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/binary"
|
|
"fmt"
|
|
|
|
"github.com/RoaringBitmap/roaring"
|
|
"github.com/ledgerwatch/erigon-lib/kv"
|
|
"github.com/ledgerwatch/erigon/common"
|
|
"github.com/ledgerwatch/erigon/common/dbutils"
|
|
"github.com/ledgerwatch/erigon/core/rawdb"
|
|
"github.com/ledgerwatch/erigon/core/types"
|
|
"github.com/ledgerwatch/erigon/eth/filters"
|
|
"github.com/ledgerwatch/erigon/ethdb/bitmapdb"
|
|
"github.com/ledgerwatch/erigon/ethdb/cbor"
|
|
"github.com/ledgerwatch/erigon/rpc"
|
|
"github.com/ledgerwatch/erigon/turbo/rpchelper"
|
|
)
|
|
|
|
// GetLogsByHash implements erigon_getLogsByHash. Returns an array of arrays of logs generated by the transactions in the block given by the block's hash.
|
|
func (api *ErigonImpl) GetLogsByHash(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
|
|
tx, err := api.db.BeginRo(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
chainConfig, err := api.chainConfig(tx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
block, err := api.blockByHashWithSenders(tx, hash)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if block == nil {
|
|
return nil, nil
|
|
}
|
|
receipts, err := api.getReceipts(ctx, tx, chainConfig, block, block.Body().SendersFromTxs())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("getReceipts error: %w", err)
|
|
}
|
|
|
|
logs := make([][]*types.Log, len(receipts))
|
|
for i, receipt := range receipts {
|
|
logs[i] = receipt.Logs
|
|
}
|
|
return logs, nil
|
|
}
|
|
|
|
// GetLogs implements erigon_getLogs. Returns an array of logs matching a given filter object.
|
|
func (api *ErigonImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) (types.ErigonLogs, error) {
|
|
var begin, end uint64
|
|
erigonLogs := types.ErigonLogs{}
|
|
|
|
tx, beginErr := api.db.BeginRo(ctx)
|
|
if beginErr != nil {
|
|
return erigonLogs, beginErr
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
if crit.BlockHash != nil {
|
|
number := rawdb.ReadHeaderNumber(tx, *crit.BlockHash)
|
|
if number == nil {
|
|
return nil, fmt.Errorf("block not found: %x", *crit.BlockHash)
|
|
}
|
|
begin = *number
|
|
end = *number
|
|
} else {
|
|
// Convert the RPC block numbers into internal representations
|
|
latest, err := rpchelper.GetLatestBlockNumber(tx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
begin = latest
|
|
if crit.FromBlock != nil {
|
|
if crit.FromBlock.Sign() >= 0 {
|
|
begin = crit.FromBlock.Uint64()
|
|
} else if !crit.FromBlock.IsInt64() || crit.FromBlock.Int64() != int64(rpc.LatestBlockNumber) {
|
|
return nil, fmt.Errorf("negative value for FromBlock: %v", crit.FromBlock)
|
|
}
|
|
}
|
|
end = latest
|
|
if crit.ToBlock != nil {
|
|
if crit.ToBlock.Sign() >= 0 {
|
|
end = crit.ToBlock.Uint64()
|
|
} else if !crit.ToBlock.IsInt64() || crit.ToBlock.Int64() != int64(rpc.LatestBlockNumber) {
|
|
return nil, fmt.Errorf("negative value for ToBlock: %v", crit.ToBlock)
|
|
}
|
|
}
|
|
}
|
|
if end < begin {
|
|
return nil, fmt.Errorf("end (%d) < begin (%d)", end, begin)
|
|
}
|
|
if end > roaring.MaxUint32 {
|
|
return nil, fmt.Errorf("end (%d) > MaxUint32", end)
|
|
}
|
|
blockNumbers := bitmapdb.NewBitmap()
|
|
defer bitmapdb.ReturnToPool(blockNumbers)
|
|
blockNumbers.AddRange(begin, end+1) // [min,max)
|
|
|
|
topicsBitmap, err := getTopicsBitmap(tx, crit.Topics, uint32(begin), uint32(end))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if topicsBitmap != nil {
|
|
blockNumbers.And(topicsBitmap)
|
|
}
|
|
|
|
rx := make([]*roaring.Bitmap, len(crit.Addresses))
|
|
for idx, addr := range crit.Addresses {
|
|
m, err := bitmapdb.Get(tx, kv.LogAddressIndex, addr[:], uint32(begin), uint32(end))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rx[idx] = m
|
|
}
|
|
addrBitmap := roaring.FastOr(rx...)
|
|
|
|
if len(rx) > 0 {
|
|
blockNumbers.And(addrBitmap)
|
|
}
|
|
|
|
if blockNumbers.GetCardinality() == 0 {
|
|
return erigonLogs, nil
|
|
}
|
|
|
|
addrMap := make(map[common.Address]struct{}, len(crit.Addresses))
|
|
for _, v := range crit.Addresses {
|
|
addrMap[v] = struct{}{}
|
|
}
|
|
iter := blockNumbers.Iterator()
|
|
for iter.HasNext() {
|
|
if err = ctx.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
blockNumber := uint64(iter.Next())
|
|
var logIndex uint
|
|
var txIndex uint
|
|
var blockLogs []*types.Log
|
|
err := tx.ForPrefix(kv.Log, dbutils.EncodeBlockNumber(blockNumber), func(k, v []byte) error {
|
|
var logs types.Logs
|
|
if err := cbor.Unmarshal(&logs, bytes.NewReader(v)); err != nil {
|
|
return fmt.Errorf("receipt unmarshal failed: %w", err)
|
|
}
|
|
for _, log := range logs {
|
|
log.Index = logIndex
|
|
logIndex++
|
|
}
|
|
filtered := filterLogs(logs, addrMap, crit.Topics)
|
|
if len(filtered) == 0 {
|
|
return nil
|
|
}
|
|
txIndex = uint(binary.BigEndian.Uint32(k[8:]))
|
|
for _, log := range filtered {
|
|
log.TxIndex = txIndex
|
|
}
|
|
blockLogs = append(blockLogs, filtered...)
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return erigonLogs, err
|
|
}
|
|
if len(blockLogs) == 0 {
|
|
continue
|
|
}
|
|
|
|
header, err := api._blockReader.HeaderByNumber(ctx, tx, blockNumber)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if header == nil {
|
|
return nil, fmt.Errorf("block header not found: %d", blockNumber)
|
|
}
|
|
timestamp := header.Time
|
|
|
|
blockHash, err := rawdb.ReadCanonicalHash(tx, blockNumber)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
body, err := api._blockReader.BodyWithTransactions(ctx, tx, blockHash, blockNumber)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if body == nil {
|
|
return nil, fmt.Errorf("block not found %d", blockNumber)
|
|
}
|
|
for _, log := range blockLogs {
|
|
erigonLog := &types.ErigonLog{}
|
|
erigonLog.BlockNumber = blockNumber
|
|
erigonLog.BlockHash = blockHash
|
|
if log.TxIndex == uint(len(body.Transactions)) {
|
|
erigonLog.TxHash = types.ComputeBorTxHash(blockNumber, blockHash)
|
|
} else {
|
|
erigonLog.TxHash = body.Transactions[log.TxIndex].Hash()
|
|
}
|
|
erigonLog.Timestamp = timestamp
|
|
erigonLog.Address = log.Address
|
|
erigonLog.Topics = log.Topics
|
|
erigonLog.Data = log.Data
|
|
erigonLog.Index = log.Index
|
|
erigonLog.Removed = log.Removed
|
|
erigonLogs = append(erigonLogs, erigonLog)
|
|
}
|
|
}
|
|
|
|
return erigonLogs, nil
|
|
}
|
|
|
|
// GetLogsByNumber implements erigon_getLogsByHash. Returns all the logs that appear in a block given the block's hash.
|
|
// func (api *ErigonImpl) GetLogsByNumber(ctx context.Context, number rpc.BlockNumber) ([][]*types.Log, error) {
|
|
// tx, err := api.db.Begin(ctx, false)
|
|
// if err != nil {
|
|
// return nil, err
|
|
// }
|
|
// defer tx.Rollback()
|
|
|
|
// number := rawdb.ReadHeaderNumber(tx, hash)
|
|
// if number == nil {
|
|
// return nil, fmt.Errorf("block not found: %x", hash)
|
|
// }
|
|
|
|
// receipts, err := getReceipts(ctx, tx, *number, hash)
|
|
// if err != nil {
|
|
// return nil, fmt.Errorf("getReceipts error: %w", err)
|
|
// }
|
|
|
|
// logs := make([][]*types.Log, len(receipts))
|
|
// for i, receipt := range receipts {
|
|
// logs[i] = receipt.Logs
|
|
// }
|
|
// return logs, nil
|
|
// }
|