Trace Filter 7 (#1941)

* use roaring64

* Fix bitmap finalisation

Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
This commit is contained in:
ledgerwatch 2021-05-16 20:01:04 +01:00 committed by GitHub
parent f6ac04e53e
commit 896be5fbe3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 17 additions and 8 deletions

View File

@ -4,7 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/RoaringBitmap/roaring" "github.com/RoaringBitmap/roaring/roaring64"
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
"github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils" "github.com/ledgerwatch/turbo-geth/common/dbutils"
@ -218,11 +218,12 @@ func (api *TraceAPIImpl) Filter(ctx context.Context, req TraceFilterRequest, str
fromAddresses := make(map[common.Address]struct{}, len(req.FromAddress)) fromAddresses := make(map[common.Address]struct{}, len(req.FromAddress))
toAddresses := make(map[common.Address]struct{}, len(req.ToAddress)) toAddresses := make(map[common.Address]struct{}, len(req.ToAddress))
var allBlocks roaring.Bitmap var allBlocks roaring64.Bitmap
for _, addr := range req.FromAddress { for _, addr := range req.FromAddress {
if addr != nil { if addr != nil {
b, err := bitmapdb.Get(dbtx, dbutils.CallFromIndex, addr.Bytes(), uint32(fromBlock), uint32(toBlock)) b, err := bitmapdb.Get64(dbtx, dbutils.CallFromIndex, addr.Bytes(), fromBlock, toBlock)
if err != nil { if err != nil {
stream.WriteNil()
return err return err
} }
allBlocks.Or(b) allBlocks.Or(b)
@ -231,8 +232,9 @@ func (api *TraceAPIImpl) Filter(ctx context.Context, req TraceFilterRequest, str
} }
for _, addr := range req.ToAddress { for _, addr := range req.ToAddress {
if addr != nil { if addr != nil {
b, err := bitmapdb.Get(dbtx, dbutils.CallToIndex, addr.Bytes(), uint32(fromBlock), uint32(toBlock)) b, err := bitmapdb.Get64(dbtx, dbutils.CallToIndex, addr.Bytes(), fromBlock, toBlock)
if err != nil { if err != nil {
stream.WriteNil()
return err return err
} }
allBlocks.Or(b) allBlocks.Or(b)
@ -244,6 +246,7 @@ func (api *TraceAPIImpl) Filter(ctx context.Context, req TraceFilterRequest, str
chainConfig, err := api.chainConfig(dbtx) chainConfig, err := api.chainConfig(dbtx)
if err != nil { if err != nil {
stream.WriteNil()
return err return err
} }
var json = jsoniter.ConfigCompatibleWithStandardLibrary var json = jsoniter.ConfigCompatibleWithStandardLibrary
@ -262,6 +265,7 @@ func (api *TraceAPIImpl) Filter(ctx context.Context, req TraceFilterRequest, str
block, _, bErr := rawdb.ReadBlockWithSenders(dbtx, hash, b) block, _, bErr := rawdb.ReadBlockWithSenders(dbtx, hash, b)
if bErr != nil { if bErr != nil {
stream.WriteNil()
return bErr return bErr
} }
if block == nil { if block == nil {
@ -287,6 +291,7 @@ func (api *TraceAPIImpl) Filter(ctx context.Context, req TraceFilterRequest, str
pt.TransactionPosition = &txPosition pt.TransactionPosition = &txPosition
b, err := json.Marshal(pt) b, err := json.Marshal(pt)
if err != nil { if err != nil {
stream.WriteNil()
return err return err
} }
if first { if first {
@ -314,6 +319,7 @@ func (api *TraceAPIImpl) Filter(ctx context.Context, req TraceFilterRequest, str
tr.TraceAddress = []int{} tr.TraceAddress = []int{}
b, err := json.Marshal(tr) b, err := json.Marshal(tr)
if err != nil { if err != nil {
stream.WriteNil()
return err return err
} }
if first { if first {
@ -340,6 +346,7 @@ func (api *TraceAPIImpl) Filter(ctx context.Context, req TraceFilterRequest, str
tr.TraceAddress = []int{} tr.TraceAddress = []int{}
b, err := json.Marshal(tr) b, err := json.Marshal(tr)
if err != nil { if err != nil {
stream.WriteNil()
return err return err
} }
if first { if first {

View File

@ -106,6 +106,7 @@ func promoteCallTraces(logPrefix string, tx ethdb.RwTx, startBlock, endBlock uin
if err != nil { if err != nil {
return fmt.Errorf("%s: failed to create cursor for call traces: %w", logPrefix, err) return fmt.Errorf("%s: failed to create cursor for call traces: %w", logPrefix, err)
} }
defer traceCursor.Close()
var k, v []byte var k, v []byte
prev := startBlock prev := startBlock
@ -176,22 +177,23 @@ func promoteCallTraces(logPrefix string, tx ethdb.RwTx, startBlock, endBlock uin
return err return err
} }
if err := finaliseCallTraces(froms, tos, collectorFrom, collectorTo, logPrefix, tx, quit); err != nil { if err := finaliseCallTraces(collectorFrom, collectorTo, logPrefix, tx, quit); err != nil {
return fmt.Errorf("[%s] %w", logPrefix, err) return fmt.Errorf("[%s] %w", logPrefix, err)
} }
return nil return nil
} }
func finaliseCallTraces(froms, tos map[string]*roaring64.Bitmap, collectorFrom, collectorTo *etl.Collector, logPrefix string, tx ethdb.RwTx, quit <-chan struct{}) error { func finaliseCallTraces(collectorFrom, collectorTo *etl.Collector, logPrefix string, tx ethdb.RwTx, quit <-chan struct{}) error {
var currentBitmap = roaring64.New() var currentBitmap = roaring64.New()
var buf = bytes.NewBuffer(nil) var buf = bytes.NewBuffer(nil)
lastChunkKey := make([]byte, 128)
var loaderFunc = func(k []byte, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { var loaderFunc = func(k []byte, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
if _, err := currentBitmap.ReadFrom(bytes.NewReader(v)); err != nil { if _, err := currentBitmap.ReadFrom(bytes.NewReader(v)); err != nil {
return err return err
} }
lastChunkKey := make([]byte, len(k)+4) lastChunkKey = lastChunkKey[:len(k)+8]
copy(lastChunkKey, k) copy(lastChunkKey, k)
binary.BigEndian.PutUint32(lastChunkKey[len(k):], ^uint32(0)) binary.BigEndian.PutUint64(lastChunkKey[len(k):], ^uint64(0))
lastChunkBytes, err := table.Get(lastChunkKey) lastChunkBytes, err := table.Get(lastChunkKey)
if err != nil { if err != nil {
return fmt.Errorf("find last chunk failed: %w", err) return fmt.Errorf("find last chunk failed: %w", err)