From 7f9edd68aabe7cf9a30f9e072b485a5f18372168 Mon Sep 17 00:00:00 2001 From: J1ang Date: Tue, 8 Nov 2022 09:35:00 +0800 Subject: [PATCH] feat: add `erigon_getLatestLogs` as a new feature API. (#5875) feat: add `erigon_getLatestLogs` as a new feature API. 1. `erigon_getLatestLogs` returns latest logs that match the filter with `logCount` length. Implementation is similar to `erigon_getLogs` but it uses `ReverseIterator` which makes it more efficient to fetch the latest logs. --- cmd/rpcdaemon/commands/erigon_api.go | 2 + cmd/rpcdaemon/commands/erigon_receipts.go | 145 +++++++++++++++++- .../commands/erigon_receipts_test.go | 50 ++++++ 3 files changed, 192 insertions(+), 5 deletions(-) create mode 100644 cmd/rpcdaemon/commands/erigon_receipts_test.go diff --git a/cmd/rpcdaemon/commands/erigon_api.go b/cmd/rpcdaemon/commands/erigon_api.go index ec35f04e8..a95aa078d 100644 --- a/cmd/rpcdaemon/commands/erigon_api.go +++ b/cmd/rpcdaemon/commands/erigon_api.go @@ -3,6 +3,7 @@ package commands import ( "context" + "github.com/ledgerwatch/erigon/eth/filters" ethFilters "github.com/ledgerwatch/erigon/eth/filters" "github.com/ledgerwatch/erigon-lib/kv" @@ -30,6 +31,7 @@ type ErigonAPI interface { GetLogsByHash(ctx context.Context, hash common.Hash) ([][]*types.Log, error) //GetLogsByNumber(ctx context.Context, number rpc.BlockNumber) ([][]*types.Log, error) GetLogs(ctx context.Context, crit ethFilters.FilterCriteria) (types.ErigonLogs, error) + GetLatestLogs(ctx context.Context, crit filters.FilterCriteria, logCount uint64) (types.ErigonLogs, error) // WatchTheBurn / reward related (see ./erigon_issuance.go) WatchTheBurn(ctx context.Context, blockNr rpc.BlockNumber) (Issuance, error) diff --git a/cmd/rpcdaemon/commands/erigon_receipts.go b/cmd/rpcdaemon/commands/erigon_receipts.go index a8fe6fc76..d56c60a7b 100644 --- a/cmd/rpcdaemon/commands/erigon_receipts.go +++ b/cmd/rpcdaemon/commands/erigon_receipts.go @@ -180,11 +180,7 @@ func (api *ErigonImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) } timestamp := header.Time - blockHash, err := rawdb.ReadCanonicalHash(tx, blockNumber) - if err != nil { - return nil, err - } - + blockHash := header.Hash() body, err := api._blockReader.BodyWithTransactions(ctx, tx, blockHash, blockNumber) if err != nil { return nil, err @@ -214,6 +210,145 @@ func (api *ErigonImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) return erigonLogs, nil } +// GetLatestLogs implements erigon_getLatestLogs. +// Return specific number of logs matching a give filter objects by descend. +func (api *ErigonImpl) GetLatestLogs(ctx context.Context, crit filters.FilterCriteria, logCount uint64) (types.ErigonLogs, error) { + if logCount == 0 { + logCount = 1 + } + erigonLogs := types.ErigonLogs{} + tx, beginErr := api.db.BeginRo(ctx) + if beginErr != nil { + return erigonLogs, beginErr + } + defer tx.Rollback() + latest, err := rpchelper.GetLatestBlockNumber(tx) + if err != nil { + return nil, err + } + + blockNumbers := bitmapdb.NewBitmap() + defer bitmapdb.ReturnToPool(blockNumbers) + blockNumbers.AddRange(0, latest+1) + topicsBitmap, err := getTopicsBitmap(tx, crit.Topics, 0, uint32(latest)) + if err != nil { + return nil, err + } + if topicsBitmap != nil { + blockNumbers.And(topicsBitmap) + } + + rx := make([]*roaring.Bitmap, len(crit.Addresses)) + for idx, addr := range crit.Addresses { + m, err := bitmapdb.Get(tx, kv.LogAddressIndex, addr[:], uint32(0), uint32(latest)) + if err != nil { + return nil, err + } + rx[idx] = m + } + addrBitmap := roaring.FastOr(rx...) + + if len(rx) > 0 { + blockNumbers.And(addrBitmap) + } + + if blockNumbers.GetCardinality() == 0 { + return erigonLogs, nil + } + + addrMap := make(map[common.Address]struct{}, len(crit.Addresses)) + for _, v := range crit.Addresses { + addrMap[v] = struct{}{} + } + + // latest logs that match the filter crit + iter := blockNumbers.ReverseIterator() + var count uint64 + for iter.HasNext() { + if err = ctx.Err(); err != nil { + return nil, err + } + + blockNumber := uint64(iter.Next()) + var logIndex uint + var txIndex uint + var blockLogs []*types.Log + err := tx.ForPrefix(kv.Log, dbutils.EncodeBlockNumber(blockNumber), func(k, v []byte) error { + var logs types.Logs + if err := cbor.Unmarshal(&logs, bytes.NewReader(v)); err != nil { + return fmt.Errorf("receipt unmarshal failed: %w", err) + } + for _, log := range logs { + log.Index = logIndex + logIndex++ + } + filtered := logs.Filter(addrMap, crit.Topics) + if len(filtered) == 0 { + return nil + } + txIndex = uint(binary.BigEndian.Uint32(k[8:])) + for i := range filtered { + filtered[i].TxIndex = txIndex + } + for i := len(filtered) - 1; i >= 0; i-- { + blockLogs = append(blockLogs, filtered[i]) + count++ + if count == logCount { + return nil + } + } + return nil + }) + if err != nil { + return erigonLogs, err + } + if len(blockLogs) == 0 { + continue + } + + header, err := api._blockReader.HeaderByNumber(ctx, tx, blockNumber) + if err != nil { + return nil, err + } + if header == nil { + return nil, fmt.Errorf("block header not found: %d", blockNumber) + } + timestamp := header.Time + + blockHash := header.Hash() + + body, err := api._blockReader.BodyWithTransactions(ctx, tx, blockHash, blockNumber) + if err != nil { + return nil, err + } + if body == nil { + return nil, fmt.Errorf("block not found %d", blockNumber) + } + for _, log := range blockLogs { + erigonLog := &types.ErigonLog{} + erigonLog.BlockNumber = blockNumber + erigonLog.BlockHash = blockHash + if log.TxIndex == uint(len(body.Transactions)) { + erigonLog.TxHash = types.ComputeBorTxHash(blockNumber, blockHash) + } else { + erigonLog.TxHash = body.Transactions[log.TxIndex].Hash() + } + erigonLog.Timestamp = timestamp + erigonLog.Address = log.Address + erigonLog.Topics = log.Topics + erigonLog.Data = log.Data + erigonLog.Index = log.Index + erigonLog.Removed = log.Removed + erigonLogs = append(erigonLogs, erigonLog) + } + + if count == logCount { + return erigonLogs, nil + } + } + return erigonLogs, nil +} + // GetLogsByNumber implements erigon_getLogsByHash. Returns all the logs that appear in a block given the block's hash. // func (api *ErigonImpl) GetLogsByNumber(ctx context.Context, number rpc.BlockNumber) ([][]*types.Log, error) { // tx, err := api.db.Begin(ctx, false) diff --git a/cmd/rpcdaemon/commands/erigon_receipts_test.go b/cmd/rpcdaemon/commands/erigon_receipts_test.go new file mode 100644 index 000000000..f29f03440 --- /dev/null +++ b/cmd/rpcdaemon/commands/erigon_receipts_test.go @@ -0,0 +1,50 @@ +package commands + +import ( + "context" + "math/big" + "testing" + + "github.com/ledgerwatch/erigon-lib/kv/kvcache" + "github.com/ledgerwatch/erigon/cmd/rpcdaemon/rpcdaemontest" + "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/eth/filters" + "github.com/ledgerwatch/erigon/rpc" + "github.com/ledgerwatch/erigon/rpc/rpccfg" + "github.com/ledgerwatch/erigon/turbo/snapshotsync" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestErigonGetLatestLogs(t *testing.T) { + assert := assert.New(t) + m, _, _ := rpcdaemontest.CreateTestSentry(t) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + stateCache := kvcache.New(kvcache.DefaultCoherentConfig) + db := m.DB + agg := m.HistoryV3Components() + api := NewErigonAPI(NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout), db, nil) + expectedLogs, _ := api.GetLogs(context.Background(), filters.FilterCriteria{FromBlock: big.NewInt(0), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}) + + expectedErigonLogs := make([]*types.ErigonLog, 0) + for i := len(expectedLogs) - 1; i >= 0; i-- { + expectedErigonLogs = append(expectedErigonLogs, &types.ErigonLog{ + Address: expectedLogs[i].Address, + Topics: expectedLogs[i].Topics, + Data: expectedLogs[i].Data, + BlockNumber: expectedLogs[i].BlockNumber, + TxHash: expectedLogs[i].TxHash, + TxIndex: expectedLogs[i].TxIndex, + BlockHash: expectedLogs[i].BlockHash, + Index: expectedLogs[i].Index, + Removed: expectedLogs[i].Removed, + Timestamp: expectedLogs[i].Timestamp, + }) + } + actual, err := api.GetLatestLogs(context.Background(), filters.FilterCriteria{}, uint64((len(expectedLogs)))) + if err != nil { + t.Errorf("calling erigon_getLatestLogs: %v", err) + } + require.NotNil(t, actual) + assert.EqualValues(expectedErigonLogs, actual) +}