feat: add erigon_getLatestLogs as a new feature API. (#5875)

feat: add `erigon_getLatestLogs` as a new feature API.
1. `erigon_getLatestLogs` returns latest logs that match the filter with
`logCount` length. Implementation is similar to `erigon_getLogs` but it
uses `ReverseIterator` which makes it more efficient to fetch the latest
logs.
This commit is contained in:
J1ang 2022-11-08 09:35:00 +08:00 committed by GitHub
parent 61559477a6
commit 7f9edd68aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 192 additions and 5 deletions

View File

@ -3,6 +3,7 @@ package commands
import (
"context"
"github.com/ledgerwatch/erigon/eth/filters"
ethFilters "github.com/ledgerwatch/erigon/eth/filters"
"github.com/ledgerwatch/erigon-lib/kv"
@ -30,6 +31,7 @@ type ErigonAPI interface {
GetLogsByHash(ctx context.Context, hash common.Hash) ([][]*types.Log, error)
//GetLogsByNumber(ctx context.Context, number rpc.BlockNumber) ([][]*types.Log, error)
GetLogs(ctx context.Context, crit ethFilters.FilterCriteria) (types.ErigonLogs, error)
GetLatestLogs(ctx context.Context, crit filters.FilterCriteria, logCount uint64) (types.ErigonLogs, error)
// WatchTheBurn / reward related (see ./erigon_issuance.go)
WatchTheBurn(ctx context.Context, blockNr rpc.BlockNumber) (Issuance, error)

View File

@ -180,11 +180,7 @@ func (api *ErigonImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria)
}
timestamp := header.Time
blockHash, err := rawdb.ReadCanonicalHash(tx, blockNumber)
if err != nil {
return nil, err
}
blockHash := header.Hash()
body, err := api._blockReader.BodyWithTransactions(ctx, tx, blockHash, blockNumber)
if err != nil {
return nil, err
@ -214,6 +210,145 @@ func (api *ErigonImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria)
return erigonLogs, nil
}
// GetLatestLogs implements erigon_getLatestLogs.
// Return specific number of logs matching a give filter objects by descend.
func (api *ErigonImpl) GetLatestLogs(ctx context.Context, crit filters.FilterCriteria, logCount uint64) (types.ErigonLogs, error) {
if logCount == 0 {
logCount = 1
}
erigonLogs := types.ErigonLogs{}
tx, beginErr := api.db.BeginRo(ctx)
if beginErr != nil {
return erigonLogs, beginErr
}
defer tx.Rollback()
latest, err := rpchelper.GetLatestBlockNumber(tx)
if err != nil {
return nil, err
}
blockNumbers := bitmapdb.NewBitmap()
defer bitmapdb.ReturnToPool(blockNumbers)
blockNumbers.AddRange(0, latest+1)
topicsBitmap, err := getTopicsBitmap(tx, crit.Topics, 0, uint32(latest))
if err != nil {
return nil, err
}
if topicsBitmap != nil {
blockNumbers.And(topicsBitmap)
}
rx := make([]*roaring.Bitmap, len(crit.Addresses))
for idx, addr := range crit.Addresses {
m, err := bitmapdb.Get(tx, kv.LogAddressIndex, addr[:], uint32(0), uint32(latest))
if err != nil {
return nil, err
}
rx[idx] = m
}
addrBitmap := roaring.FastOr(rx...)
if len(rx) > 0 {
blockNumbers.And(addrBitmap)
}
if blockNumbers.GetCardinality() == 0 {
return erigonLogs, nil
}
addrMap := make(map[common.Address]struct{}, len(crit.Addresses))
for _, v := range crit.Addresses {
addrMap[v] = struct{}{}
}
// latest logs that match the filter crit
iter := blockNumbers.ReverseIterator()
var count uint64
for iter.HasNext() {
if err = ctx.Err(); err != nil {
return nil, err
}
blockNumber := uint64(iter.Next())
var logIndex uint
var txIndex uint
var blockLogs []*types.Log
err := tx.ForPrefix(kv.Log, dbutils.EncodeBlockNumber(blockNumber), func(k, v []byte) error {
var logs types.Logs
if err := cbor.Unmarshal(&logs, bytes.NewReader(v)); err != nil {
return fmt.Errorf("receipt unmarshal failed: %w", err)
}
for _, log := range logs {
log.Index = logIndex
logIndex++
}
filtered := logs.Filter(addrMap, crit.Topics)
if len(filtered) == 0 {
return nil
}
txIndex = uint(binary.BigEndian.Uint32(k[8:]))
for i := range filtered {
filtered[i].TxIndex = txIndex
}
for i := len(filtered) - 1; i >= 0; i-- {
blockLogs = append(blockLogs, filtered[i])
count++
if count == logCount {
return nil
}
}
return nil
})
if err != nil {
return erigonLogs, err
}
if len(blockLogs) == 0 {
continue
}
header, err := api._blockReader.HeaderByNumber(ctx, tx, blockNumber)
if err != nil {
return nil, err
}
if header == nil {
return nil, fmt.Errorf("block header not found: %d", blockNumber)
}
timestamp := header.Time
blockHash := header.Hash()
body, err := api._blockReader.BodyWithTransactions(ctx, tx, blockHash, blockNumber)
if err != nil {
return nil, err
}
if body == nil {
return nil, fmt.Errorf("block not found %d", blockNumber)
}
for _, log := range blockLogs {
erigonLog := &types.ErigonLog{}
erigonLog.BlockNumber = blockNumber
erigonLog.BlockHash = blockHash
if log.TxIndex == uint(len(body.Transactions)) {
erigonLog.TxHash = types.ComputeBorTxHash(blockNumber, blockHash)
} else {
erigonLog.TxHash = body.Transactions[log.TxIndex].Hash()
}
erigonLog.Timestamp = timestamp
erigonLog.Address = log.Address
erigonLog.Topics = log.Topics
erigonLog.Data = log.Data
erigonLog.Index = log.Index
erigonLog.Removed = log.Removed
erigonLogs = append(erigonLogs, erigonLog)
}
if count == logCount {
return erigonLogs, nil
}
}
return erigonLogs, nil
}
// GetLogsByNumber implements erigon_getLogsByHash. Returns all the logs that appear in a block given the block's hash.
// func (api *ErigonImpl) GetLogsByNumber(ctx context.Context, number rpc.BlockNumber) ([][]*types.Log, error) {
// tx, err := api.db.Begin(ctx, false)

View File

@ -0,0 +1,50 @@
package commands
import (
"context"
"math/big"
"testing"
"github.com/ledgerwatch/erigon-lib/kv/kvcache"
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/rpcdaemontest"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/filters"
"github.com/ledgerwatch/erigon/rpc"
"github.com/ledgerwatch/erigon/rpc/rpccfg"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestErigonGetLatestLogs(t *testing.T) {
assert := assert.New(t)
m, _, _ := rpcdaemontest.CreateTestSentry(t)
br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots)
stateCache := kvcache.New(kvcache.DefaultCoherentConfig)
db := m.DB
agg := m.HistoryV3Components()
api := NewErigonAPI(NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout), db, nil)
expectedLogs, _ := api.GetLogs(context.Background(), filters.FilterCriteria{FromBlock: big.NewInt(0), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())})
expectedErigonLogs := make([]*types.ErigonLog, 0)
for i := len(expectedLogs) - 1; i >= 0; i-- {
expectedErigonLogs = append(expectedErigonLogs, &types.ErigonLog{
Address: expectedLogs[i].Address,
Topics: expectedLogs[i].Topics,
Data: expectedLogs[i].Data,
BlockNumber: expectedLogs[i].BlockNumber,
TxHash: expectedLogs[i].TxHash,
TxIndex: expectedLogs[i].TxIndex,
BlockHash: expectedLogs[i].BlockHash,
Index: expectedLogs[i].Index,
Removed: expectedLogs[i].Removed,
Timestamp: expectedLogs[i].Timestamp,
})
}
actual, err := api.GetLatestLogs(context.Background(), filters.FilterCriteria{}, uint64((len(expectedLogs))))
if err != nil {
t.Errorf("calling erigon_getLatestLogs: %v", err)
}
require.NotNil(t, actual)
assert.EqualValues(expectedErigonLogs, actual)
}