package etl import ( "bytes" "container/heap" "fmt" "io" "runtime" "time" "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/log" "github.com/ugorji/go/codec" ) type LoadNextFunc func(originalK, k, v []byte) error type LoadFunc func(k []byte, value []byte, state State, next LoadNextFunc) error // Collector performs the job of ETL Transform, but can also be used without "E" (Extract) part // as a Collect Transform Load type Collector struct { extractNextFunc ExtractNextFunc flushBuffer func([]byte, bool) error dataProviders []dataProvider allFlushed bool } func NewCollector(datadir string, sortableBuffer Buffer) *Collector { c := &Collector{} encoder := codec.NewEncoder(nil, &cbor) c.flushBuffer = func(currentKey []byte, canStoreInRam bool) error { if sortableBuffer.Len() == 0 { return nil } var provider dataProvider var err error sortableBuffer.Sort() if canStoreInRam && len(c.dataProviders) == 0 { provider = KeepInRAM(sortableBuffer) c.allFlushed = true } else { provider, err = FlushToDisk(encoder, currentKey, sortableBuffer, datadir) } if err != nil { return err } if provider != nil { c.dataProviders = append(c.dataProviders, provider) } return nil } c.extractNextFunc = func(originalK, k []byte, v []byte) error { sortableBuffer.Put(common.CopyBytes(k), common.CopyBytes(v)) if sortableBuffer.CheckFlushSize() { if err := c.flushBuffer(originalK, false); err != nil { return err } } return nil } return c } func (c *Collector) Collect(k, v []byte) error { return c.extractNextFunc(k, k, v) } func (c *Collector) Load(db ethdb.Database, toBucket string, loadFunc LoadFunc, args TransformArgs) error { defer func() { disposeProviders(c.dataProviders) }() if !c.allFlushed { if err := c.flushBuffer(nil, true); err != nil { return err } } return loadFilesIntoBucket(db, toBucket, c.dataProviders, loadFunc, args) } func loadFilesIntoBucket(db ethdb.Database, bucket string, providers []dataProvider, loadFunc LoadFunc, args TransformArgs) error { decoder := codec.NewDecoder(nil, &cbor) var m runtime.MemStats h := &Heap{} heap.Init(h) for i, provider := range providers { if key, value, err := provider.Next(decoder); err == nil { he := HeapElem{key, i, value} heap.Push(h, he) } else /* we must have at least one entry per file */ { eee := fmt.Errorf("error reading first readers: n=%d current=%d provider=%s err=%v", len(providers), i, provider, err) panic(eee) } } batch, err := db.Begin() if err != nil { return err } defer batch.Rollback() state := &bucketState{batch, bucket, args.Quit} haveSortingGuaranties := isIdentityLoadFunc(loadFunc) // user-defined loadFunc may change ordering var lastKey []byte if bucket != "" { // passing empty bucket name is valid case for etl when DB modification is not expected var errLast error lastKey, _, errLast = batch.Last(bucket) if errLast != nil { return errLast } } var canUseAppend bool putTimer := time.Now() i := 0 loadNextFunc := func(originalK, k, v []byte) error { if i == 0 { isEndOfBucket := lastKey == nil || bytes.Compare(lastKey, k) == -1 canUseAppend = haveSortingGuaranties && isEndOfBucket } i++ if i%1_000_000 == 0 && time.Since(putTimer) > 30*time.Second { putTimer = time.Now() runtime.ReadMemStats(&m) log.Info( "Loading into bucket", "bucket", bucket, "size", common.StorageSize(batch.BatchSize()), "keys", fmt.Sprintf("%.1fM", float64(i)/1_000_000), "use append", canUseAppend, "current key", makeCurrentKeyStr(originalK), "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC)) } if canUseAppend && len(v) == 0 { return nil // nothing to delete after end of bucket } if len(v) == 0 { if err := batch.Delete(bucket, k); err != nil { return err } return nil } if canUseAppend { if err := batch.(*ethdb.TxDb).Append(bucket, k, v); err != nil { return err } return nil } if err := batch.Put(bucket, k, v); err != nil { return err } return nil } // Main loading loop for h.Len() > 0 { if err := common.Stopped(args.Quit); err != nil { return err } element := (heap.Pop(h)).(HeapElem) provider := providers[element.TimeIdx] err := loadFunc(element.Key, element.Value, state, loadNextFunc) if err != nil { return err } if element.Key, element.Value, err = provider.Next(decoder); err == nil { heap.Push(h, element) } else if err != io.EOF { return fmt.Errorf("error while reading next element from disk: %v", err) } } // Final commit if args.OnLoadCommit != nil { if err := args.OnLoadCommit(batch, []byte{}, true); err != nil { return err } } commitTimer := time.Now() if _, err := batch.Commit(); err != nil { return err } commitTook := time.Since(commitTimer) runtime.ReadMemStats(&m) log.Debug( "Committed batch", "bucket", bucket, "commit", commitTook, "size", common.StorageSize(batch.BatchSize()), "current key", makeCurrentKeyStr(nil), "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC)) return nil } func makeCurrentKeyStr(k []byte) string { var currentKeyStr string if k == nil { currentKeyStr = "final" } else if len(k) < 4 { currentKeyStr = fmt.Sprintf("%x", k) } else if k[0] == 0 && k[1] == 0 && k[2] == 0 && k[3] == 0 && len(k) >= 8 { // if key has leading zeroes, show a bit more info currentKeyStr = fmt.Sprintf("%x...", k[:8]) } else { currentKeyStr = fmt.Sprintf("%x...", k[:4]) } return currentKeyStr }