erigon-pulse/common/etl/collector.go
Alex Sharov e02d6acc7d
bitmap indices for logs (#1124)
* save progress

* try now

* don't create bloom inside rlpDecode

* don't create bloom inside ApplyTransaction

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* rename method

* print timings

* print timings

* print timings

* sort before flush

* fix err lint

* clean

* move tests to transactions

* compressed version

* up bound

* up bound

* more tests

* more tests

* more tests

* more tests

* better removal

* clean

* better performance of get/put methods

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* optimize rpcdaemon

* fix test

* fix rpcdaemon

* fix test

* simplify

* simplify

* fix nil pointer

* clean

* revert some changes

* add some logs

* clean

* try without optimize

* clean

* clean

* clean

* clean

* try

* move log_index to own stage

* move log_index to own stage

* integration add log_index stage

* integration add log_index stage

* clean

* clean

* print timing

* remove duplicates at unwind

* extract truncateBitmaps func

* try detect

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* add blackList of topics

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* sharding 1

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 3

* sharded 3

* sharded 3

* speedup things by putCurrent and putReserve

* clean

* optimize trim

* clean

* remove blacklist

* add more info to err

* ?

* clean

* clean

* clean

* clean

* clean

* working version

* switch to cgo version of roaring bitmaps

* clean

* clean

* clean

* clean

* more docs

* clean

* clean

* fix logs bloom field

* Fix debug_getModifiedAccountsByNumber

* Try to fix crash

* fix problem with "absent block"

* fix problem with "absent block"

* remove optimize method call

* remove roaring iterator

* fix problem with rebuild indicess

* remove debug prints

* tests for eth_getLogs involving topics

* add tests for new stage, speparate topics into 2 buckets

* version up

* remove debug logs

* remove debug logs

* remove bloom filter implementation

* Optimisation

* Optimisatin not required, make rpctest lenient to geth errors

* Lenient to geth failures

Co-authored-by: Alexey Akhunov <akhounov@gmail.com>
2020-09-28 18:18:36 +01:00

230 lines
5.9 KiB
Go

package etl
import (
"bytes"
"container/heap"
"context"
"fmt"
"io"
"runtime"
"time"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ugorji/go/codec"
)
type LoadNextFunc func(originalK, k, v []byte) error
type LoadFunc func(k []byte, value []byte, state State, next LoadNextFunc) error
// Collector performs the job of ETL Transform, but can also be used without "E" (Extract) part
// as a Collect Transform Load
type Collector struct {
extractNextFunc ExtractNextFunc
flushBuffer func([]byte, bool) error
dataProviders []dataProvider
allFlushed bool
}
func NewCollector(datadir string, sortableBuffer Buffer) *Collector {
c := &Collector{}
encoder := codec.NewEncoder(nil, &cbor)
c.flushBuffer = func(currentKey []byte, canStoreInRam bool) error {
if sortableBuffer.Len() == 0 {
return nil
}
var provider dataProvider
var err error
sortableBuffer.Sort()
if canStoreInRam && len(c.dataProviders) == 0 {
provider = KeepInRAM(sortableBuffer)
c.allFlushed = true
} else {
provider, err = FlushToDisk(encoder, currentKey, sortableBuffer, datadir)
}
if err != nil {
return err
}
if provider != nil {
c.dataProviders = append(c.dataProviders, provider)
}
return nil
}
c.extractNextFunc = func(originalK, k []byte, v []byte) error {
sortableBuffer.Put(common.CopyBytes(k), common.CopyBytes(v))
if sortableBuffer.CheckFlushSize() {
if err := c.flushBuffer(originalK, false); err != nil {
return err
}
}
return nil
}
return c
}
func (c *Collector) Collect(k, v []byte) error {
return c.extractNextFunc(k, k, v)
}
func (c *Collector) Load(db ethdb.Database, toBucket string, loadFunc LoadFunc, args TransformArgs) error {
defer func() {
disposeProviders(c.dataProviders)
}()
if !c.allFlushed {
if err := c.flushBuffer(nil, true); err != nil {
return err
}
}
return loadFilesIntoBucket(db, toBucket, c.dataProviders, loadFunc, args)
}
func loadFilesIntoBucket(db ethdb.Database, bucket string, providers []dataProvider, loadFunc LoadFunc, args TransformArgs) error {
decoder := codec.NewDecoder(nil, &cbor)
var m runtime.MemStats
h := &Heap{comparator: args.Comparator}
heap.Init(h)
for i, provider := range providers {
if key, value, err := provider.Next(decoder); err == nil {
he := HeapElem{key, i, value}
heap.Push(h, he)
} else /* we must have at least one entry per file */ {
eee := fmt.Errorf("error reading first readers: n=%d current=%d provider=%s err=%v",
len(providers), i, provider, err)
panic(eee)
}
}
var tx ethdb.DbWithPendingMutations
var useExternalTx bool
if hasTx, ok := db.(ethdb.HasTx); ok && hasTx.Tx() != nil {
tx = db.(ethdb.DbWithPendingMutations)
useExternalTx = true
} else {
var err error
tx, err = db.Begin(context.Background())
if err != nil {
return err
}
defer tx.Rollback()
}
state := &bucketState{tx, bucket, args.Quit}
haveSortingGuaranties := isIdentityLoadFunc(loadFunc) // user-defined loadFunc may change ordering
var lastKey []byte
if bucket != "" { // passing empty bucket name is valid case for etl when DB modification is not expected
var errLast error
lastKey, _, errLast = tx.Last(bucket)
if errLast != nil {
return errLast
}
}
var canUseAppend bool
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
i := 0
loadNextFunc := func(originalK, k, v []byte) error {
if i == 0 {
isEndOfBucket := lastKey == nil || bytes.Compare(lastKey, k) == -1
canUseAppend = haveSortingGuaranties && isEndOfBucket
}
i++
select {
default:
case <-logEvery.C:
logArs := []interface{}{"into", bucket}
if args.LogDetailsLoad != nil {
logArs = append(logArs, args.LogDetailsLoad(k, v)...)
} else {
logArs = append(logArs, "current key", makeCurrentKeyStr(k))
}
runtime.ReadMemStats(&m)
logArs = append(logArs, "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC))
log.Info("ETL [2/2] Loading", logArs...)
}
if canUseAppend && len(v) == 0 {
return nil // nothing to delete after end of bucket
}
if len(v) == 0 {
if err := tx.Delete(bucket, k); err != nil {
return err
}
return nil
}
if canUseAppend {
if err := tx.(*ethdb.TxDb).Append(bucket, k, v); err != nil {
return err
}
return nil
}
if err := tx.Put(bucket, k, v); err != nil {
return err
}
return nil
}
// Main loading loop
for h.Len() > 0 {
if err := common.Stopped(args.Quit); err != nil {
return err
}
element := (heap.Pop(h)).(HeapElem)
provider := providers[element.TimeIdx]
err := loadFunc(element.Key, element.Value, state, loadNextFunc)
if err != nil {
return err
}
if element.Key, element.Value, err = provider.Next(decoder); err == nil {
heap.Push(h, element)
} else if err != io.EOF {
return fmt.Errorf("error while reading next element from disk: %v", err)
}
}
// Final commit
if args.OnLoadCommit != nil {
if err := args.OnLoadCommit(tx, []byte{}, true); err != nil {
return err
}
}
commitTimer := time.Now()
if !useExternalTx {
if _, err := tx.Commit(); err != nil {
return err
}
}
commitTook := time.Since(commitTimer)
runtime.ReadMemStats(&m)
log.Debug(
"Committed batch",
"bucket", bucket,
"commit", commitTook,
"records", i,
"current key", makeCurrentKeyStr(nil),
"alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC))
return nil
}
func makeCurrentKeyStr(k []byte) string {
var currentKeyStr string
if k == nil {
currentKeyStr = "final"
} else if len(k) < 4 {
currentKeyStr = fmt.Sprintf("%x", k)
} else if k[0] == 0 && k[1] == 0 && k[2] == 0 && k[3] == 0 && len(k) >= 8 { // if key has leading zeroes, show a bit more info
currentKeyStr = fmt.Sprintf("%x...", k[:8])
} else {
currentKeyStr = fmt.Sprintf("%x...", k[:4])
}
return currentKeyStr
}