diff --git a/README.md b/README.md index f08ca0abc..5ddc70a79 100644 --- a/README.md +++ b/README.md @@ -163,7 +163,7 @@ is being updated on recurring basis. **Preprocessing**. For some operations, Erigon uses temporary files to preprocess data before inserting it into the main DB. That reduces write amplification and DB inserts are orders of magnitude quicker. - 🔬 See our detailed ETL explanation [here](/common/etl/README.md). + 🔬 See our detailed ETL explanation [here](https://github.com/ledgerwatch/erigon-lib/blob/main/etl/README.md). **Plain state**. diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index dc41c6314..69d3c281b 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -8,11 +8,11 @@ import ( "strings" "github.com/c2h5oh/datasize" + "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/cmd/sentry/download" "github.com/ledgerwatch/erigon/cmd/utils" "github.com/ledgerwatch/erigon/common" - "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/consensus/ethash" "github.com/ledgerwatch/erigon/core" diff --git a/cmd/integration/commands/state_stages.go b/cmd/integration/commands/state_stages.go index 77990cf5d..d2d2d3a6a 100644 --- a/cmd/integration/commands/state_stages.go +++ b/cmd/integration/commands/state_stages.go @@ -14,12 +14,12 @@ import ( "github.com/ledgerwatch/erigon-lib/kv" "github.com/spf13/cobra" + "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon/cmd/utils" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/changeset" "github.com/ledgerwatch/erigon/common/dbutils" "github.com/ledgerwatch/erigon/common/debugprint" - "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/core" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/state" diff --git a/cmd/rpcdaemon/commands/eth_receipts.go b/cmd/rpcdaemon/commands/eth_receipts.go index 8e3991bef..e18e039d3 100644 --- a/cmd/rpcdaemon/commands/eth_receipts.go +++ b/cmd/rpcdaemon/commands/eth_receipts.go @@ -11,6 +11,7 @@ import ( "github.com/ledgerwatch/erigon-lib/kv" "github.com/RoaringBitmap/roaring" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" "github.com/ledgerwatch/erigon/common/hexutil" @@ -136,7 +137,7 @@ func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([ iter := blockNumbers.Iterator() for iter.HasNext() { - if err = common.Stopped(ctx.Done()); err != nil { + if err = libcommon.Stopped(ctx.Done()); err != nil { return nil, err } diff --git a/cmd/rpcdaemon/commands/trace_adhoc.go b/cmd/rpcdaemon/commands/trace_adhoc.go index 13181432a..f60e6201c 100644 --- a/cmd/rpcdaemon/commands/trace_adhoc.go +++ b/cmd/rpcdaemon/commands/trace_adhoc.go @@ -11,6 +11,7 @@ import ( "time" "github.com/holiman/uint256" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/hexutil" @@ -1095,7 +1096,7 @@ func (api *TraceAPIImpl) doCallMany(ctx context.Context, dbtx kv.Tx, msgs []type useParent = true } for txIndex, msg := range msgs { - if err := common.Stopped(ctx.Done()); err != nil { + if err := libcommon.Stopped(ctx.Done()); err != nil { return nil, err } traceResult := &TraceCallResult{Trace: []*ParityTrace{}} diff --git a/cmd/sentry/download/sentry.go b/cmd/sentry/download/sentry.go index 58a0904fc..b0ec2976a 100644 --- a/cmd/sentry/download/sentry.go +++ b/cmd/sentry/download/sentry.go @@ -23,6 +23,7 @@ import ( //grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/holiman/uint256" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/gointerfaces" proto_sentry "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" proto_types "github.com/ledgerwatch/erigon-lib/gointerfaces/types" @@ -276,7 +277,7 @@ func runPeer( peerPrinted = true } } - if err := common.Stopped(ctx.Done()); err != nil { + if err := libcommon.Stopped(ctx.Done()); err != nil { return err } if peerInfo.Removed() { diff --git a/cmd/snapshots/generator/commands/generate_body_snapshot.go b/cmd/snapshots/generator/commands/generate_body_snapshot.go index 194acebfc..8a601488c 100644 --- a/cmd/snapshots/generator/commands/generate_body_snapshot.go +++ b/cmd/snapshots/generator/commands/generate_body_snapshot.go @@ -10,6 +10,7 @@ import ( kv2 "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/spf13/cobra" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" "github.com/ledgerwatch/erigon/core/rawdb" @@ -55,7 +56,7 @@ func BodySnapshot(ctx context.Context, logger log.Logger, dbPath, snapshotPath s if err := snKV.Update(ctx, func(sntx kv.RwTx) error { for i := uint64(1); i <= toBlock; i++ { if common.IsCanceled(ctx) { - return common.ErrStopped + return libcommon.ErrStopped } hash, err = rawdb.ReadCanonicalHash(tx, i) diff --git a/cmd/snapshots/generator/commands/generate_header_snapshot.go b/cmd/snapshots/generator/commands/generate_header_snapshot.go index fd3b85d67..750e3d66a 100644 --- a/cmd/snapshots/generator/commands/generate_header_snapshot.go +++ b/cmd/snapshots/generator/commands/generate_header_snapshot.go @@ -11,6 +11,7 @@ import ( kv2 "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/spf13/cobra" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" "github.com/ledgerwatch/erigon/core/rawdb" @@ -71,7 +72,7 @@ func HeaderSnapshot(ctx context.Context, logger log.Logger, dbPath, snapshotPath defer c.Close() for i := uint64(1); i <= toBlock; i++ { if common.IsCanceled(ctx) { - return common.ErrStopped + return libcommon.ErrStopped } hash, err = rawdb.ReadCanonicalHash(tx, i) diff --git a/cmd/snapshots/seeder/commands/seeder.go b/cmd/snapshots/seeder/commands/seeder.go index add8940a0..c65f5d415 100644 --- a/cmd/snapshots/seeder/commands/seeder.go +++ b/cmd/snapshots/seeder/commands/seeder.go @@ -11,6 +11,7 @@ import ( "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/metainfo" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon/common" trnt "github.com/ledgerwatch/erigon/turbo/snapshotsync" "github.com/ledgerwatch/log/v3" @@ -62,7 +63,7 @@ func Seed(ctx context.Context, datadir string) error { } tt := time.Now() if common.IsCanceled(ctx) { - return common.ErrStopped + return libcommon.ErrStopped } info, err := trnt.BuildInfoBytesForSnapshot(v, trnt.MdbxFilename) if err != nil { @@ -91,7 +92,7 @@ func Seed(ctx context.Context, datadir string) error { } if common.IsCanceled(ctx) { - return common.ErrStopped + return libcommon.ErrStopped } } diff --git a/cmd/state/verify/verify_txlookup.go b/cmd/state/verify/verify_txlookup.go index e3afea6b5..61b039b2c 100644 --- a/cmd/state/verify/verify_txlookup.go +++ b/cmd/state/verify/verify_txlookup.go @@ -9,9 +9,9 @@ import ( "os/signal" "time" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" - "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/log/v3" ) @@ -41,7 +41,7 @@ func ValidateTxLookups(chaindata string) error { // Validation Process blockBytes := big.NewInt(0) for !interrupt { - if err := common.Stopped(quitCh); err != nil { + if err := libcommon.Stopped(quitCh); err != nil { return err } blockHash, err := rawdb.ReadCanonicalHash(tx, blockNum) diff --git a/common/chan.go b/common/chan.go deleted file mode 100644 index 455de9460..000000000 --- a/common/chan.go +++ /dev/null @@ -1,30 +0,0 @@ -package common - -import "errors" - -var ErrStopped = errors.New("stopped") -var ErrUnwind = errors.New("unwound") - -func Stopped(ch <-chan struct{}) error { - if ch == nil { - return nil - } - select { - case <-ch: - return ErrStopped - default: - } - return nil -} - -func SafeClose(ch chan struct{}) { - if ch == nil { - return - } - select { - case <-ch: - // Channel was already closed - default: - close(ch) - } -} diff --git a/common/changeset/storage_changeset.go b/common/changeset/storage_changeset.go index b595f493b..29b42a3fc 100644 --- a/common/changeset/storage_changeset.go +++ b/common/changeset/storage_changeset.go @@ -6,10 +6,11 @@ import ( "errors" "sort" + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/ethdb" ) @@ -105,14 +106,14 @@ func walkAndCollect(collectorFunc func([]byte, []byte) error, db kv.Tx, bucket s } defer c.Close() return ethdb.Walk(c, dbutils.EncodeBlockNumber(timestampDst), 0, func(dbKey, dbValue []byte) (bool, error) { - if err := common.Stopped(quit); err != nil { + if err := libcommon.Stopped(quit); err != nil { return false, err } timestamp, k, v := Mapper[bucket].Decode(dbKey, dbValue) if timestamp > timestampSrc { return false, nil } - if innerErr := collectorFunc(common.CopyBytes(k), common.CopyBytes(v)); innerErr != nil { + if innerErr := collectorFunc(libcommon.Copy(k), libcommon.Copy(v)); innerErr != nil { return false, innerErr } return true, nil diff --git a/common/etl/ETL-collector.png b/common/etl/ETL-collector.png deleted file mode 100644 index 2e88cb2d4..000000000 Binary files a/common/etl/ETL-collector.png and /dev/null differ diff --git a/common/etl/ETL.png b/common/etl/ETL.png deleted file mode 100644 index 757921eef..000000000 Binary files a/common/etl/ETL.png and /dev/null differ diff --git a/common/etl/README.md b/common/etl/README.md deleted file mode 100644 index 79bc21bce..000000000 --- a/common/etl/README.md +++ /dev/null @@ -1,183 +0,0 @@ -# ETL - -ETL framework is most commonly used in [staged sync](../../eth/stagedsync/README.md). - -It implements a pattern where we extract some data from a database, transform it, -then put it into temp files and insert back to the database in sorted order. - -Inserting entries into our KV storage sorted by keys helps to minimize write -amplification, hence it is much faster, even considering additional I/O that -is generated by storing files. - -It behaves similarly to enterprise [Extract, Tranform, Load](https://en.wikipedia.org/wiki/Extract,_transform,_load) frameworks, hence the name. -We use temporary files because that helps keep RAM usage predictable and allows -using ETL on large amounts of data. - -### Example - -``` -func keyTransformExtractFunc(transformKey func([]byte) ([]byte, error)) etl.ExtractFunc { - return func(k, v []byte, next etl.ExtractNextFunc) error { - newK, err := transformKey(k) - if err != nil { - return err - } - return next(k, newK, v) - } -} - -err := etl.Transform( - db, // database - dbutils.PlainStateBucket, // "from" bucket - dbutils.CurrentStateBucket, // "to" bucket - datadir, // where to store temp files - keyTransformExtractFunc(transformPlainStateKey), // transformFunc on extraction - etl.IdentityLoadFunc, // transform on load - etl.TransformArgs{ // additional arguments - Quit: quit, - }, - ) - if err != nil { - return err - } - -``` - -## Data Transformation - -The whole flow is shown in the image - -![](./ETL.png) - -Data could be transformed in two places along the pipeline: - -* transform on extraction - -* transform on loading - -### Transform On Extraction - -`type ExtractFunc func(k []byte, v []byte, next ExtractNextFunc) error` - -Transform on extraction function receives the currenk key and value from the -source bucket. - -### Transform On Loading - -`type LoadFunc func(k []byte, value []byte, state State, next LoadNextFunc) error` - -As well as the current key and value, the transform on loading function -receives the `State` object that can receive data from the destination bucket. - -That is used in index generation where we want to extend index entries with new -data instead of just adding new ones. - -### `<...>NextFunc` pattern - -Sometimes we need to produce multiple entries from a single entry when -transforming. - -To do that, each of the transform function receives a next function that should -be called to move data further. That means that each transformation can produce -any number of outputs for a single input. - -It can be one output, like in `IdentityLoadFunc`: - -``` -func IdentityLoadFunc(k []byte, value []byte, _ State, next LoadNextFunc) error { - return next(k, k, value) // go to the next step -} -``` - -It can be multiple outputs like when each entry is a `ChangeSet`: - -``` -func(dbKey, dbValue []byte, next etl.ExtractNextFunc) error { - blockNum, _ := dbutils.DecodeTimestamp(dbKey) - return bytes2walker(dbValue).Walk(func(changesetKey, changesetValue []byte) error { - key := common.CopyBytes(changesetKey) - v := make([]byte, 9) - binary.BigEndian.PutUint64(v, blockNum) - if len(changesetValue) == 0 { - v[8] = 1 - } - return next(dbKey, key, v) // go to the next step - }) - } -``` - -### Buffer Types - -Before the data is being flushed into temp files, it is getting collected into -a buffer until if overflows (`etl.ExtractArgs.BufferSize`). - -There are different types of buffers available with different behaviour. - -* `SortableSliceBuffer` -- just append `(k, v1)`, `(k, v2)` onto a slice. Duplicate keys - will lead to duplicate entries: `[(k, v1) (k, v2)]`. - -* `SortableAppendBuffer` -- on duplicate keys: merge. `(k, v1)`, `(k, v2)` - will lead to `k: [v1 v2]` - -* `SortableOldestAppearedBuffer` -- on duplicate keys: keep the oldest. `(k, - v1)`, `(k v2)` will lead to `k: v1` - -### Transforming Structs - -Both transform functions and next functions allow only byte arrays. -If you need to pass a struct, you will need to marshal it. - -### Loading Into Database - -We load data from the temp files into a database in batches, limited by -`IdealBatchSize()` of an `ethdb.Mutation`. - -(for tests we can also override it) - -### Handling Interruptions - -ETL processes are long, so we need to be able to handle interruptions. - -#### Handing `Ctrl+C` - -You can pass your quit channel into `Quit` parameter into `etl.TransformArgs`. - -When this channel is closed, ETL will be interrupted. - -#### Saving & Restoring State - -Interrupting in the middle of loading can lead to inconsistent state in the -database. - -To avoid that, the ETL framework allows storing progress by setting `OnLoadCommit` in `etl.TransformArgs`. - -Then we can use this data to know the progress the ETL transformation made. - -You can also specify `ExtractStartKey` and `ExtractEndKey` to limit the nubmer -of items transformed. - -## Ways to work with ETL framework - -There might be 2 scenarios on how you want to work with the ETL framework. - -![](./ETL-collector.png) - -### `etl.Transform` function - -The vast majority of use-cases is when we extract data from one bucket and in -the end, load it into another bucket. That is the use-case for `etl.Transform` -function. - -### `etl.Collector` struct - -If you want a more modular behaviour instead of just reading from the DB (like -generating intermediate hashes in `../../core/chain_makers.go`, you can use -`etl.Collector` struct directly. - -It has a `.Collect()` method that you can provide your data to. - - -## Optimizations - -* if all data fits into a single file, we don't write anything to disk and just - use in-memory storage. diff --git a/common/etl/buffers.go b/common/etl/buffers.go deleted file mode 100644 index f07876ef7..000000000 --- a/common/etl/buffers.go +++ /dev/null @@ -1,285 +0,0 @@ -package etl - -import ( - "bytes" - "fmt" - "sort" - "strconv" - - "github.com/c2h5oh/datasize" - "github.com/ledgerwatch/erigon-lib/kv" -) - -const ( - //SliceBuffer - just simple slice w - SortableSliceBuffer = iota - //SortableAppendBuffer - map[k] [v1 v2 v3] - SortableAppendBuffer - // SortableOldestAppearedBuffer - buffer that keeps only the oldest entries. - // if first v1 was added under key K, then v2; only v1 will stay - SortableOldestAppearedBuffer - - BufIOSize = 64 * 4096 // 64 pages | default is 1 page | increasing further doesn't show speedup on SSD -) - -var BufferOptimalSize = 256 * datasize.MB /* var because we want to sometimes change it from tests or command-line flags */ - -type Buffer interface { - Put(k, v []byte) - Get(i int) sortableBufferEntry - Len() int - Reset() - GetEntries() []sortableBufferEntry - Sort() - CheckFlushSize() bool - SetComparator(cmp kv.CmpFunc) -} - -type sortableBufferEntry struct { - key []byte - value []byte -} - -var ( - _ Buffer = &sortableBuffer{} - _ Buffer = &appendSortableBuffer{} - _ Buffer = &oldestEntrySortableBuffer{} -) - -func NewSortableBuffer(bufferOptimalSize datasize.ByteSize) *sortableBuffer { - return &sortableBuffer{ - entries: make([]sortableBufferEntry, 0), - size: 0, - optimalSize: int(bufferOptimalSize.Bytes()), - } -} - -type sortableBuffer struct { - entries []sortableBufferEntry - size int - optimalSize int - comparator kv.CmpFunc -} - -func (b *sortableBuffer) Put(k, v []byte) { - b.size += len(k) - b.size += len(v) - b.entries = append(b.entries, sortableBufferEntry{k, v}) -} - -func (b *sortableBuffer) Size() int { - return b.size -} - -func (b *sortableBuffer) Len() int { - return len(b.entries) -} - -func (b *sortableBuffer) SetComparator(cmp kv.CmpFunc) { - b.comparator = cmp -} - -func (b *sortableBuffer) Less(i, j int) bool { - if b.comparator != nil { - return b.comparator(b.entries[i].key, b.entries[j].key, b.entries[i].value, b.entries[j].value) < 0 - } - return bytes.Compare(b.entries[i].key, b.entries[j].key) < 0 -} - -func (b *sortableBuffer) Swap(i, j int) { - b.entries[i], b.entries[j] = b.entries[j], b.entries[i] -} - -func (b *sortableBuffer) Get(i int) sortableBufferEntry { - return b.entries[i] -} - -func (b *sortableBuffer) Reset() { - b.entries = nil - b.size = 0 -} -func (b *sortableBuffer) Sort() { - sort.Stable(b) -} - -func (b *sortableBuffer) GetEntries() []sortableBufferEntry { - return b.entries -} - -func (b *sortableBuffer) CheckFlushSize() bool { - return b.size >= b.optimalSize -} - -func NewAppendBuffer(bufferOptimalSize datasize.ByteSize) *appendSortableBuffer { - return &appendSortableBuffer{ - entries: make(map[string][]byte), - size: 0, - optimalSize: int(bufferOptimalSize.Bytes()), - } -} - -type appendSortableBuffer struct { - entries map[string][]byte - size int - optimalSize int - sortedBuf []sortableBufferEntry - comparator kv.CmpFunc -} - -func (b *appendSortableBuffer) Put(k, v []byte) { - ks := string(k) - stored, ok := b.entries[ks] - if !ok { - b.size += len(k) - } - b.size += len(v) - stored = append(stored, v...) - b.entries[ks] = stored -} - -func (b *appendSortableBuffer) SetComparator(cmp kv.CmpFunc) { - b.comparator = cmp -} - -func (b *appendSortableBuffer) Size() int { - return b.size -} - -func (b *appendSortableBuffer) Len() int { - return len(b.entries) -} -func (b *appendSortableBuffer) Sort() { - for i := range b.entries { - b.sortedBuf = append(b.sortedBuf, sortableBufferEntry{key: []byte(i), value: b.entries[i]}) - } - sort.Stable(b) -} - -func (b *appendSortableBuffer) Less(i, j int) bool { - if b.comparator != nil { - return b.comparator(b.sortedBuf[i].key, b.sortedBuf[j].key, b.sortedBuf[i].value, b.sortedBuf[j].value) < 0 - } - return bytes.Compare(b.sortedBuf[i].key, b.sortedBuf[j].key) < 0 -} - -func (b *appendSortableBuffer) Swap(i, j int) { - b.sortedBuf[i], b.sortedBuf[j] = b.sortedBuf[j], b.sortedBuf[i] -} - -func (b *appendSortableBuffer) Get(i int) sortableBufferEntry { - return b.sortedBuf[i] -} -func (b *appendSortableBuffer) Reset() { - b.sortedBuf = nil - b.entries = make(map[string][]byte) - b.size = 0 -} - -func (b *appendSortableBuffer) GetEntries() []sortableBufferEntry { - return b.sortedBuf -} - -func (b *appendSortableBuffer) CheckFlushSize() bool { - return b.size >= b.optimalSize -} - -func NewOldestEntryBuffer(bufferOptimalSize datasize.ByteSize) *oldestEntrySortableBuffer { - return &oldestEntrySortableBuffer{ - entries: make(map[string][]byte), - size: 0, - optimalSize: int(bufferOptimalSize.Bytes()), - } -} - -type oldestEntrySortableBuffer struct { - entries map[string][]byte - size int - optimalSize int - sortedBuf []sortableBufferEntry - comparator kv.CmpFunc -} - -func (b *oldestEntrySortableBuffer) SetComparator(cmp kv.CmpFunc) { - b.comparator = cmp -} - -func (b *oldestEntrySortableBuffer) Put(k, v []byte) { - ks := string(k) - _, ok := b.entries[ks] - if ok { - // if we already had this entry, we are going to keep it and ignore new value - return - } - - b.size += len(k) + len(v) - b.entries[ks] = v -} - -func (b *oldestEntrySortableBuffer) Size() int { - return b.size -} - -func (b *oldestEntrySortableBuffer) Len() int { - return len(b.entries) -} - -func (b *oldestEntrySortableBuffer) Sort() { - for k, v := range b.entries { - b.sortedBuf = append(b.sortedBuf, sortableBufferEntry{key: []byte(k), value: v}) - } - sort.Stable(b) -} - -func (b *oldestEntrySortableBuffer) Less(i, j int) bool { - if b.comparator != nil { - return b.comparator(b.sortedBuf[i].key, b.sortedBuf[j].key, b.sortedBuf[i].value, b.sortedBuf[j].value) < 0 - } - return bytes.Compare(b.sortedBuf[i].key, b.sortedBuf[j].key) < 0 -} - -func (b *oldestEntrySortableBuffer) Swap(i, j int) { - b.sortedBuf[i], b.sortedBuf[j] = b.sortedBuf[j], b.sortedBuf[i] -} - -func (b *oldestEntrySortableBuffer) Get(i int) sortableBufferEntry { - return b.sortedBuf[i] -} -func (b *oldestEntrySortableBuffer) Reset() { - b.sortedBuf = nil - b.entries = make(map[string][]byte) - b.size = 0 -} - -func (b *oldestEntrySortableBuffer) GetEntries() []sortableBufferEntry { - return b.sortedBuf -} - -func (b *oldestEntrySortableBuffer) CheckFlushSize() bool { - return b.size >= b.optimalSize -} - -func getBufferByType(tp int, size datasize.ByteSize) Buffer { - switch tp { - case SortableSliceBuffer: - return NewSortableBuffer(size) - case SortableAppendBuffer: - return NewAppendBuffer(size) - case SortableOldestAppearedBuffer: - return NewOldestEntryBuffer(size) - default: - panic("unknown buffer type " + strconv.Itoa(tp)) - } -} - -func getTypeByBuffer(b Buffer) int { - switch b.(type) { - case *sortableBuffer: - return SortableSliceBuffer - case *appendSortableBuffer: - return SortableAppendBuffer - case *oldestEntrySortableBuffer: - return SortableOldestAppearedBuffer - default: - panic(fmt.Sprintf("unknown buffer type: %T ", b)) - } -} diff --git a/common/etl/collector.go b/common/etl/collector.go deleted file mode 100644 index d080d7f87..000000000 --- a/common/etl/collector.go +++ /dev/null @@ -1,277 +0,0 @@ -package etl - -import ( - "bytes" - "container/heap" - "fmt" - "io" - "io/ioutil" - "os" - "path/filepath" - "runtime" - "time" - - "github.com/c2h5oh/datasize" - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/common" - "github.com/ledgerwatch/log/v3" - "github.com/ugorji/go/codec" -) - -const TmpDirName = "etl-temp" - -type LoadNextFunc func(originalK, k, v []byte) error -type LoadFunc func(k, v []byte, table CurrentTableReader, next LoadNextFunc) error - -// Collector performs the job of ETL Transform, but can also be used without "E" (Extract) part -// as a Collect Transform Load -type Collector struct { - extractNextFunc ExtractNextFunc - flushBuffer func([]byte, bool) error - dataProviders []dataProvider - allFlushed bool - autoClean bool - bufType int -} - -// NewCollectorFromFiles creates collector from existing files (left over from previous unsuccessful loading) -func NewCollectorFromFiles(tmpdir string) (*Collector, error) { - if _, err := os.Stat(tmpdir); os.IsNotExist(err) { - return nil, nil - } - fileInfos, err := ioutil.ReadDir(tmpdir) - if err != nil { - return nil, fmt.Errorf("collector from files - reading directory %s: %w", tmpdir, err) - } - if len(fileInfos) == 0 { - return nil, nil - } - dataProviders := make([]dataProvider, len(fileInfos)) - for i, fileInfo := range fileInfos { - var dataProvider fileDataProvider - dataProvider.file, err = os.Open(filepath.Join(tmpdir, fileInfo.Name())) - if err != nil { - return nil, fmt.Errorf("collector from files - opening file %s: %w", fileInfo.Name(), err) - } - dataProviders[i] = &dataProvider - } - return &Collector{dataProviders: dataProviders, allFlushed: true, autoClean: false}, nil -} - -// NewCriticalCollector does not clean up temporary files if loading has failed -func NewCriticalCollector(tmpdir string, sortableBuffer Buffer) *Collector { - c := NewCollector(tmpdir, sortableBuffer) - c.autoClean = false - return c -} - -func NewCollector(tmpdir string, sortableBuffer Buffer) *Collector { - c := &Collector{autoClean: true, bufType: getTypeByBuffer(sortableBuffer)} - encoder := codec.NewEncoder(nil, &cbor) - - c.flushBuffer = func(currentKey []byte, canStoreInRam bool) error { - if sortableBuffer.Len() == 0 { - return nil - } - var provider dataProvider - var err error - sortableBuffer.Sort() - if canStoreInRam && len(c.dataProviders) == 0 { - provider = KeepInRAM(sortableBuffer) - c.allFlushed = true - } else { - provider, err = FlushToDisk(encoder, currentKey, sortableBuffer, tmpdir) - } - if err != nil { - return err - } - if provider != nil { - c.dataProviders = append(c.dataProviders, provider) - } - return nil - } - - c.extractNextFunc = func(originalK, k []byte, v []byte) error { - sortableBuffer.Put(common.CopyBytes(k), common.CopyBytes(v)) - if sortableBuffer.CheckFlushSize() { - if err := c.flushBuffer(originalK, false); err != nil { - return err - } - } - return nil - } - return c -} - -func (c *Collector) Collect(k, v []byte) error { - return c.extractNextFunc(k, k, v) -} - -func (c *Collector) Load(logPrefix string, db kv.RwTx, toBucket string, loadFunc LoadFunc, args TransformArgs) error { - defer func() { - if c.autoClean { - c.Close(logPrefix) - } - }() - if !c.allFlushed { - if e := c.flushBuffer(nil, true); e != nil { - return e - } - } - if err := loadFilesIntoBucket(logPrefix, db, toBucket, c.bufType, c.dataProviders, loadFunc, args); err != nil { - return err - } - return nil -} - -func (c *Collector) Close(logPrefix string) { - totalSize := uint64(0) - for _, p := range c.dataProviders { - totalSize += p.Dispose() - } - if totalSize > 0 { - log.Info(fmt.Sprintf("[%s] etl: temp files removed", logPrefix), "total size", datasize.ByteSize(totalSize).HumanReadable()) - } -} - -func loadFilesIntoBucket(logPrefix string, db kv.RwTx, bucket string, bufType int, providers []dataProvider, loadFunc LoadFunc, args TransformArgs) error { - decoder := codec.NewDecoder(nil, &cbor) - var m runtime.MemStats - - h := &Heap{comparator: args.Comparator} - heap.Init(h) - for i, provider := range providers { - if key, value, err := provider.Next(decoder); err == nil { - he := HeapElem{key, i, value} - heap.Push(h, he) - } else /* we must have at least one entry per file */ { - eee := fmt.Errorf("%s: error reading first readers: n=%d current=%d provider=%s err=%v", - logPrefix, len(providers), i, provider, err) - panic(eee) - } - } - var c kv.RwCursor - - currentTable := ¤tTableReader{db, bucket} - haveSortingGuaranties := isIdentityLoadFunc(loadFunc) // user-defined loadFunc may change ordering - var lastKey []byte - if bucket != "" { // passing empty bucket name is valid case for etl when DB modification is not expected - var err error - c, err = db.RwCursor(bucket) - if err != nil { - return err - } - var errLast error - lastKey, _, errLast = c.Last() - if errLast != nil { - return errLast - } - } - var canUseAppend bool - isDupSort := kv.ChaindataTablesCfg[bucket].Flags&kv.DupSort != 0 && !kv.ChaindataTablesCfg[bucket].AutoDupSortKeysConversion - - logEvery := time.NewTicker(30 * time.Second) - defer logEvery.Stop() - - i := 0 - var prevK []byte - loadNextFunc := func(originalK, k, v []byte) error { - if i == 0 { - isEndOfBucket := lastKey == nil || bytes.Compare(lastKey, k) == -1 - canUseAppend = haveSortingGuaranties && isEndOfBucket - } - i++ - - // SortableOldestAppearedBuffer must guarantee that only 1 oldest value of key will appear - // but because size of buffer is limited - each flushed file does guarantee "oldest appeared" - // property, but files may overlap. files are sorted, just skip repeated keys here - if bufType == SortableOldestAppearedBuffer && bytes.Equal(prevK, k) { - return nil - } - prevK = k - - select { - default: - case <-logEvery.C: - logArs := []interface{}{"into", bucket} - if args.LogDetailsLoad != nil { - logArs = append(logArs, args.LogDetailsLoad(k, v)...) - } else { - logArs = append(logArs, "current key", makeCurrentKeyStr(k)) - } - - runtime.ReadMemStats(&m) - logArs = append(logArs, "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys)) - log.Info(fmt.Sprintf("[%s] ETL [2/2] Loading", logPrefix), logArs...) - } - - if canUseAppend && len(v) == 0 { - return nil // nothing to delete after end of bucket - } - if len(v) == 0 { - if err := c.Delete(k, nil); err != nil { - return err - } - return nil - } - if canUseAppend { - if isDupSort { - if err := c.(kv.RwCursorDupSort).AppendDup(k, v); err != nil { - return fmt.Errorf("%s: bucket: %s, appendDup: k=%x, %w", logPrefix, bucket, k, err) - } - } else { - if err := c.Append(k, v); err != nil { - return fmt.Errorf("%s: bucket: %s, append: k=%x, v=%x, %w", logPrefix, bucket, k, v, err) - } - } - - return nil - } - if err := c.Put(k, v); err != nil { - return fmt.Errorf("%s: put: k=%x, %w", logPrefix, k, err) - } - return nil - } - // Main loading loop - for h.Len() > 0 { - if err := common.Stopped(args.Quit); err != nil { - return err - } - - element := (heap.Pop(h)).(HeapElem) - provider := providers[element.TimeIdx] - err := loadFunc(element.Key, element.Value, currentTable, loadNextFunc) - if err != nil { - return err - } - if element.Key, element.Value, err = provider.Next(decoder); err == nil { - heap.Push(h, element) - } else if err != io.EOF { - return fmt.Errorf("%s: error while reading next element from disk: %v", logPrefix, err) - } - } - - runtime.ReadMemStats(&m) - log.Debug( - fmt.Sprintf("[%s] Committed batch", logPrefix), - "bucket", bucket, - "records", i, - "current key", makeCurrentKeyStr(nil), - "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys)) - - return nil -} - -func makeCurrentKeyStr(k []byte) string { - var currentKeyStr string - if k == nil { - currentKeyStr = "final" - } else if len(k) < 4 { - currentKeyStr = fmt.Sprintf("%x", k) - } else if k[0] == 0 && k[1] == 0 && k[2] == 0 && k[3] == 0 && len(k) >= 8 { // if key has leading zeroes, show a bit more info - currentKeyStr = fmt.Sprintf("%x", k) - } else { - currentKeyStr = fmt.Sprintf("%x...", k[:4]) - } - return currentKeyStr -} diff --git a/common/etl/dataprovider.go b/common/etl/dataprovider.go deleted file mode 100644 index 04adf7c87..000000000 --- a/common/etl/dataprovider.go +++ /dev/null @@ -1,133 +0,0 @@ -package etl - -import ( - "bufio" - "fmt" - "io" - "io/ioutil" - "os" - "runtime" - - "github.com/ledgerwatch/erigon/common" - "github.com/ledgerwatch/log/v3" -) - -type dataProvider interface { - Next(decoder Decoder) ([]byte, []byte, error) - Dispose() uint64 // Safe for repeated call, doesn't return error - means defer-friendly -} - -type fileDataProvider struct { - file *os.File - reader io.Reader -} - -type Encoder interface { - Encode(toWrite interface{}) error - Reset(writer io.Writer) -} - -func FlushToDisk(encoder Encoder, currentKey []byte, b Buffer, tmpdir string) (dataProvider, error) { - if b.Len() == 0 { - return nil, nil - } - // if we are going to create files in the system temp dir, we don't need any - // subfolders. - if tmpdir != "" { - if err := os.MkdirAll(tmpdir, 0755); err != nil { - return nil, err - } - } - - bufferFile, err := ioutil.TempFile(tmpdir, "tg-sync-sortable-buf") - if err != nil { - return nil, err - } - defer bufferFile.Sync() //nolint:errcheck - - w := bufio.NewWriterSize(bufferFile, BufIOSize) - defer w.Flush() //nolint:errcheck - - defer func() { - b.Reset() // run it after buf.flush and file.sync - var m runtime.MemStats - runtime.ReadMemStats(&m) - log.Info( - "Flushed buffer file", - "name", bufferFile.Name(), - "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys)) - }() - - encoder.Reset(w) - for _, entry := range b.GetEntries() { - err = writeToDisk(encoder, entry.key, entry.value) - if err != nil { - return nil, fmt.Errorf("error writing entries to disk: %v", err) - } - } - - return &fileDataProvider{bufferFile, nil}, nil -} - -func (p *fileDataProvider) Next(decoder Decoder) ([]byte, []byte, error) { - if p.reader == nil { - _, err := p.file.Seek(0, 0) - if err != nil { - return nil, nil, err - } - p.reader = bufio.NewReaderSize(p.file, BufIOSize) - } - decoder.Reset(p.reader) - return readElementFromDisk(decoder) -} - -func (p *fileDataProvider) Dispose() uint64 { - info, _ := os.Stat(p.file.Name()) - _ = p.file.Close() - _ = os.Remove(p.file.Name()) - if info == nil { - return 0 - } - return uint64(info.Size()) -} - -func (p *fileDataProvider) String() string { - return fmt.Sprintf("%T(file: %s)", p, p.file.Name()) -} - -func writeToDisk(encoder Encoder, key []byte, value []byte) error { - toWrite := [][]byte{key, value} - return encoder.Encode(toWrite) -} - -func readElementFromDisk(decoder Decoder) ([]byte, []byte, error) { - result := make([][]byte, 2) - err := decoder.Decode(&result) - return result[0], result[1], err -} - -type memoryDataProvider struct { - buffer Buffer - currentIndex int -} - -func KeepInRAM(buffer Buffer) dataProvider { - return &memoryDataProvider{buffer, 0} -} - -func (p *memoryDataProvider) Next(decoder Decoder) ([]byte, []byte, error) { - if p.currentIndex >= p.buffer.Len() { - return nil, nil, io.EOF - } - entry := p.buffer.Get(p.currentIndex) - p.currentIndex++ - return entry.key, entry.value, nil -} - -func (p *memoryDataProvider) Dispose() uint64 { - return 0 /* doesn't take space on disk */ -} - -func (p *memoryDataProvider) String() string { - return fmt.Sprintf("%T(buffer.Len: %d)", p, p.buffer.Len()) -} diff --git a/common/etl/etl.go b/common/etl/etl.go deleted file mode 100644 index 2769bdd0a..000000000 --- a/common/etl/etl.go +++ /dev/null @@ -1,175 +0,0 @@ -package etl - -import ( - "bytes" - "fmt" - "io" - "reflect" - "runtime" - "time" - - "github.com/c2h5oh/datasize" - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/common" - "github.com/ledgerwatch/erigon/ethdb" - "github.com/ledgerwatch/log/v3" - "github.com/ugorji/go/codec" -) - -var ( - cbor codec.CborHandle -) - -type Decoder interface { - Reset(reader io.Reader) - Decode(interface{}) error -} - -type CurrentTableReader interface { - Get([]byte) ([]byte, error) -} - -type ExtractNextFunc func(originalK, k []byte, v []byte) error -type ExtractFunc func(k []byte, v []byte, next ExtractNextFunc) error - -// NextKey generates the possible next key w/o changing the key length. -// for [0x01, 0x01, 0x01] it will generate [0x01, 0x01, 0x02], etc -func NextKey(key []byte) ([]byte, error) { - if len(key) == 0 { - return key, fmt.Errorf("could not apply NextKey for the empty key") - } - nextKey := common.CopyBytes(key) - for i := len(key) - 1; i >= 0; i-- { - b := nextKey[i] - if b < 0xFF { - nextKey[i] = b + 1 - return nextKey, nil - } - if b == 0xFF { - nextKey[i] = 0 - } - } - return key, fmt.Errorf("overflow while applying NextKey") -} - -// LoadCommitHandler is a callback called each time a new batch is being -// loaded from files into a DB -// * `key`: last commited key to the database (use etl.NextKey helper to use in LoadStartKey) -// * `isDone`: true, if everything is processed -type LoadCommitHandler func(db kv.Putter, key []byte, isDone bool) error -type AdditionalLogArguments func(k, v []byte) (additionalLogArguments []interface{}) - -type TransformArgs struct { - ExtractStartKey []byte - ExtractEndKey []byte - FixedBits int - BufferType int - BufferSize int - Quit <-chan struct{} - - LogDetailsExtract AdditionalLogArguments - LogDetailsLoad AdditionalLogArguments - - Comparator kv.CmpFunc -} - -func Transform( - logPrefix string, - db kv.RwTx, - fromBucket string, - toBucket string, - tmpdir string, - extractFunc ExtractFunc, - loadFunc LoadFunc, - args TransformArgs, -) error { - bufferSize := BufferOptimalSize - if args.BufferSize > 0 { - bufferSize = datasize.ByteSize(args.BufferSize) - } - buffer := getBufferByType(args.BufferType, bufferSize) - collector := NewCollector(tmpdir, buffer) - defer collector.Close(logPrefix) - - t := time.Now() - if err := extractBucketIntoFiles(logPrefix, db, fromBucket, args.ExtractStartKey, args.ExtractEndKey, args.FixedBits, collector, extractFunc, args.Quit, args.LogDetailsExtract); err != nil { - return err - } - log.Debug(fmt.Sprintf("[%s] Extraction finished", logPrefix), "it took", time.Since(t)) - - defer func(t time.Time) { - log.Debug(fmt.Sprintf("[%s] Collection finished", logPrefix), "it took", time.Since(t)) - }(time.Now()) - return collector.Load(logPrefix, db, toBucket, loadFunc, args) -} - -func extractBucketIntoFiles( - logPrefix string, - db kv.Tx, - bucket string, - startkey []byte, - endkey []byte, - fixedBits int, - collector *Collector, - extractFunc ExtractFunc, - quit <-chan struct{}, - additionalLogArguments AdditionalLogArguments, -) error { - logEvery := time.NewTicker(30 * time.Second) - defer logEvery.Stop() - var m runtime.MemStats - - c, err := db.Cursor(bucket) - if err != nil { - return err - } - defer c.Close() - if err := ethdb.Walk(c, startkey, fixedBits, func(k, v []byte) (bool, error) { - if err := common.Stopped(quit); err != nil { - return false, err - } - - select { - default: - case <-logEvery.C: - logArs := []interface{}{"from", bucket} - if additionalLogArguments != nil { - logArs = append(logArs, additionalLogArguments(k, v)...) - } else { - logArs = append(logArs, "current key", makeCurrentKeyStr(k)) - } - - runtime.ReadMemStats(&m) - logArs = append(logArs, "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys)) - log.Info(fmt.Sprintf("[%s] ETL [1/2] Extracting", logPrefix), logArs...) - } - if endkey != nil && bytes.Compare(k, endkey) > 0 { - return false, nil - } - if err := extractFunc(k, v, collector.extractNextFunc); err != nil { - return false, err - } - return true, nil - }); err != nil { - return err - } - return collector.flushBuffer(nil, true) -} - -type currentTableReader struct { - getter kv.Tx - bucket string -} - -func (s *currentTableReader) Get(key []byte) ([]byte, error) { - return s.getter.GetOne(s.bucket, key) -} - -// IdentityLoadFunc loads entries as they are, without transformation -var IdentityLoadFunc LoadFunc = func(k []byte, value []byte, _ CurrentTableReader, next LoadNextFunc) error { - return next(k, k, value) -} - -func isIdentityLoadFunc(f LoadFunc) bool { - return f == nil || reflect.ValueOf(IdentityLoadFunc).Pointer() == reflect.ValueOf(f).Pointer() -} diff --git a/common/etl/etl_test.go b/common/etl/etl_test.go deleted file mode 100644 index 8466b4148..000000000 --- a/common/etl/etl_test.go +++ /dev/null @@ -1,359 +0,0 @@ -package etl - -import ( - "bytes" - "fmt" - "io" - "os" - "strings" - "testing" - - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon-lib/kv/memdb" - "github.com/ledgerwatch/erigon/common" - "github.com/stretchr/testify/assert" - "github.com/ugorji/go/codec" -) - -func TestWriteAndReadBufferEntry(t *testing.T) { - buffer := bytes.NewBuffer(make([]byte, 0)) - encoder := codec.NewEncoder(buffer, &cbor) - - keys := make([]string, 100) - vals := make([]string, 100) - - for i := range keys { - keys[i] = fmt.Sprintf("key-%d", i) - vals[i] = fmt.Sprintf("value-%d", i) - } - - for i := range keys { - if err := writeToDisk(encoder, []byte(keys[i]), []byte(vals[i])); err != nil { - t.Error(err) - } - } - - bb := buffer.Bytes() - - readBuffer := bytes.NewReader(bb) - - decoder := codec.NewDecoder(readBuffer, &cbor) - - for i := range keys { - k, v, err := readElementFromDisk(decoder) - if err != nil { - t.Error(err) - } - assert.Equal(t, keys[i], string(k)) - assert.Equal(t, vals[i], string(v)) - } - - _, _, err := readElementFromDisk(decoder) - assert.Equal(t, io.EOF, err) -} - -func TestNextKey(t *testing.T) { - for _, tc := range []string{ - "00000001->00000002", - "000000FF->00000100", - "FEFFFFFF->FF000000", - } { - parts := strings.Split(tc, "->") - input := common.Hex2Bytes(parts[0]) - expectedOutput := common.Hex2Bytes(parts[1]) - actualOutput, err := NextKey(input) - assert.NoError(t, err) - assert.Equal(t, expectedOutput, actualOutput) - } -} - -func TestNextKeyErr(t *testing.T) { - for _, tc := range []string{ - "", - "FFFFFF", - } { - input := common.Hex2Bytes(tc) - _, err := NextKey(input) - assert.Error(t, err) - } -} - -func TestFileDataProviders(t *testing.T) { - // test invariant when we go through files (> 1 buffer) - _, tx := memdb.NewTestTx(t) - sourceBucket := kv.ChaindataTables[0] - - generateTestData(t, tx, sourceBucket, 10) - - collector := NewCollector("", NewSortableBuffer(1)) - - err := extractBucketIntoFiles("logPrefix", tx, sourceBucket, nil, nil, 0, collector, testExtractToMapFunc, nil, nil) - assert.NoError(t, err) - - assert.Equal(t, 10, len(collector.dataProviders)) - - for _, p := range collector.dataProviders { - fp, ok := p.(*fileDataProvider) - assert.True(t, ok) - _, err = os.Stat(fp.file.Name()) - assert.NoError(t, err) - } - - collector.Close("logPrefix") - - for _, p := range collector.dataProviders { - fp, ok := p.(*fileDataProvider) - assert.True(t, ok) - _, err = os.Stat(fp.file.Name()) - assert.True(t, os.IsNotExist(err)) - } -} - -func TestRAMDataProviders(t *testing.T) { - // test invariant when we go through memory (1 buffer) - _, tx := memdb.NewTestTx(t) - sourceBucket := kv.ChaindataTables[0] - generateTestData(t, tx, sourceBucket, 10) - - collector := NewCollector("", NewSortableBuffer(BufferOptimalSize)) - err := extractBucketIntoFiles("logPrefix", tx, sourceBucket, nil, nil, 0, collector, testExtractToMapFunc, nil, nil) - assert.NoError(t, err) - - assert.Equal(t, 1, len(collector.dataProviders)) - - for _, p := range collector.dataProviders { - mp, ok := p.(*memoryDataProvider) - assert.True(t, ok) - assert.Equal(t, 10, mp.buffer.Len()) - } -} - -func TestTransformRAMOnly(t *testing.T) { - // test invariant when we only have one buffer and it fits into RAM (exactly 1 buffer) - _, tx := memdb.NewTestTx(t) - - sourceBucket := kv.ChaindataTables[0] - destBucket := kv.ChaindataTables[1] - generateTestData(t, tx, sourceBucket, 20) - err := Transform( - "logPrefix", - tx, - sourceBucket, - destBucket, - "", // temp dir - testExtractToMapFunc, - testLoadFromMapFunc, - TransformArgs{}, - ) - assert.Nil(t, err) - compareBuckets(t, tx, sourceBucket, destBucket, nil) -} - -func TestEmptySourceBucket(t *testing.T) { - _, tx := memdb.NewTestTx(t) - sourceBucket := kv.ChaindataTables[0] - destBucket := kv.ChaindataTables[1] - err := Transform( - "logPrefix", - tx, - sourceBucket, - destBucket, - "", // temp dir - testExtractToMapFunc, - testLoadFromMapFunc, - TransformArgs{}, - ) - assert.Nil(t, err) - compareBuckets(t, tx, sourceBucket, destBucket, nil) -} - -func TestTransformExtractStartKey(t *testing.T) { - // test invariant when we only have one buffer and it fits into RAM (exactly 1 buffer) - _, tx := memdb.NewTestTx(t) - sourceBucket := kv.ChaindataTables[0] - destBucket := kv.ChaindataTables[1] - generateTestData(t, tx, sourceBucket, 10) - err := Transform( - "logPrefix", - tx, - sourceBucket, - destBucket, - "", // temp dir - testExtractToMapFunc, - testLoadFromMapFunc, - TransformArgs{ExtractStartKey: []byte(fmt.Sprintf("%10d-key-%010d", 5, 5))}, - ) - assert.Nil(t, err) - compareBuckets(t, tx, sourceBucket, destBucket, []byte(fmt.Sprintf("%10d-key-%010d", 5, 5))) -} - -func TestTransformThroughFiles(t *testing.T) { - // test invariant when we go through files (> 1 buffer) - _, tx := memdb.NewTestTx(t) - sourceBucket := kv.ChaindataTables[0] - destBucket := kv.ChaindataTables[1] - generateTestData(t, tx, sourceBucket, 10) - err := Transform( - "logPrefix", - tx, - sourceBucket, - destBucket, - "", // temp dir - testExtractToMapFunc, - testLoadFromMapFunc, - TransformArgs{ - BufferSize: 1, - }, - ) - assert.Nil(t, err) - compareBuckets(t, tx, sourceBucket, destBucket, nil) -} - -func TestTransformDoubleOnExtract(t *testing.T) { - // test invariant when extractFunc multiplies the data 2x - _, tx := memdb.NewTestTx(t) - sourceBucket := kv.ChaindataTables[0] - destBucket := kv.ChaindataTables[1] - generateTestData(t, tx, sourceBucket, 10) - err := Transform( - "logPrefix", - tx, - sourceBucket, - destBucket, - "", // temp dir - testExtractDoubleToMapFunc, - testLoadFromMapFunc, - TransformArgs{}, - ) - assert.Nil(t, err) - compareBucketsDouble(t, tx, sourceBucket, destBucket) -} - -func TestTransformDoubleOnLoad(t *testing.T) { - // test invariant when loadFunc multiplies the data 2x - _, tx := memdb.NewTestTx(t) - sourceBucket := kv.ChaindataTables[0] - destBucket := kv.ChaindataTables[1] - generateTestData(t, tx, sourceBucket, 10) - err := Transform( - "logPrefix", - tx, - sourceBucket, - destBucket, - "", // temp dir - testExtractToMapFunc, - testLoadFromMapDoubleFunc, - TransformArgs{}, - ) - assert.Nil(t, err) - compareBucketsDouble(t, tx, sourceBucket, destBucket) -} - -func generateTestData(t *testing.T, db kv.Putter, bucket string, count int) { - for i := 0; i < count; i++ { - k := []byte(fmt.Sprintf("%10d-key-%010d", i, i)) - v := []byte(fmt.Sprintf("val-%099d", i)) - err := db.Put(bucket, k, v) - assert.NoError(t, err) - } -} - -func testExtractToMapFunc(k, v []byte, next ExtractNextFunc) error { - buf := bytes.NewBuffer(nil) - encoder := codec.NewEncoder(nil, &cbor) - - valueMap := make(map[string][]byte) - valueMap["value"] = v - encoder.Reset(buf) - encoder.MustEncode(valueMap) - return next(k, k, buf.Bytes()) -} - -func testExtractDoubleToMapFunc(k, v []byte, next ExtractNextFunc) error { - buf := bytes.NewBuffer(nil) - encoder := codec.NewEncoder(nil, &cbor) - - valueMap := make(map[string][]byte) - valueMap["value"] = append(v, 0xAA) - k1 := append(k, 0xAA) - encoder.Reset(buf) - encoder.MustEncode(valueMap) - - err := next(k, k1, buf.Bytes()) - if err != nil { - return err - } - - valueMap = make(map[string][]byte) - valueMap["value"] = append(v, 0xBB) - k2 := append(k, 0xBB) - buf.Reset() - encoder.Reset(buf) - encoder.MustEncode(valueMap) - return next(k, k2, buf.Bytes()) -} - -func testLoadFromMapFunc(k []byte, v []byte, _ CurrentTableReader, next LoadNextFunc) error { - decoder := codec.NewDecoder(nil, &cbor) - decoder.ResetBytes(v) - valueMap := make(map[string][]byte) - err := decoder.Decode(&valueMap) - if err != nil { - return err - } - realValue := valueMap["value"] - return next(k, k, realValue) -} - -func testLoadFromMapDoubleFunc(k []byte, v []byte, _ CurrentTableReader, next LoadNextFunc) error { - decoder := codec.NewDecoder(nil, &cbor) - decoder.ResetBytes(v) - - valueMap := make(map[string][]byte) - err := decoder.Decode(valueMap) - if err != nil { - return err - } - realValue := valueMap["value"] - - err = next(k, append(k, 0xAA), append(realValue, 0xAA)) - if err != nil { - return err - } - return next(k, append(k, 0xBB), append(realValue, 0xBB)) -} - -func compareBuckets(t *testing.T, db kv.Tx, b1, b2 string, startKey []byte) { - t.Helper() - b1Map := make(map[string]string) - err := db.ForEach(b1, startKey, func(k, v []byte) error { - b1Map[fmt.Sprintf("%x", k)] = fmt.Sprintf("%x", v) - return nil - }) - assert.NoError(t, err) - b2Map := make(map[string]string) - err = db.ForEach(b2, nil, func(k, v []byte) error { - b2Map[fmt.Sprintf("%x", k)] = fmt.Sprintf("%x", v) - return nil - }) - assert.NoError(t, err) - assert.Equal(t, b1Map, b2Map) -} - -func compareBucketsDouble(t *testing.T, db kv.Tx, b1, b2 string) { - t.Helper() - b1Map := make(map[string]string) - err := db.ForEach(b1, nil, func(k, v []byte) error { - b1Map[fmt.Sprintf("%x", append(k, 0xAA))] = fmt.Sprintf("%x", append(v, 0xAA)) - b1Map[fmt.Sprintf("%x", append(k, 0xBB))] = fmt.Sprintf("%x", append(v, 0xBB)) - return nil - }) - assert.NoError(t, err) - b2Map := make(map[string]string) - err = db.ForEach(b2, nil, func(k, v []byte) error { - b2Map[fmt.Sprintf("%x", k)] = fmt.Sprintf("%x", v) - return nil - }) - assert.NoError(t, err) - assert.Equal(t, b1Map, b2Map) -} diff --git a/common/etl/heap.go b/common/etl/heap.go deleted file mode 100644 index 37c8f7426..000000000 --- a/common/etl/heap.go +++ /dev/null @@ -1,54 +0,0 @@ -package etl - -import ( - "bytes" - - "github.com/ledgerwatch/erigon-lib/kv" -) - -type HeapElem struct { - Key []byte - TimeIdx int - Value []byte -} - -type Heap struct { - comparator kv.CmpFunc - elems []HeapElem -} - -func (h Heap) Len() int { - return len(h.elems) -} - -func (h Heap) Less(i, j int) bool { - if h.comparator != nil { - if c := h.comparator(h.elems[i].Key, h.elems[j].Key, h.elems[i].Value, h.elems[j].Value); c != 0 { - return c < 0 - } - return h.elems[i].TimeIdx < h.elems[j].TimeIdx - } - - if c := bytes.Compare(h.elems[i].Key, h.elems[j].Key); c != 0 { - return c < 0 - } - return h.elems[i].TimeIdx < h.elems[j].TimeIdx -} - -func (h Heap) Swap(i, j int) { - h.elems[i], h.elems[j] = h.elems[j], h.elems[i] -} - -func (h *Heap) Push(x interface{}) { - // Push and Pop use pointer receivers because they modify the slice's length, - // not just its contents. - h.elems = append(h.elems, x.(HeapElem)) -} - -func (h *Heap) Pop() interface{} { - old := h.elems - n := len(old) - x := old[n-1] - h.elems = old[0 : n-1] - return x -} diff --git a/common/etl/progress.go b/common/etl/progress.go deleted file mode 100644 index f12ec7f9e..000000000 --- a/common/etl/progress.go +++ /dev/null @@ -1,8 +0,0 @@ -package etl - -func ProgressFromKey(k []byte) int { - if len(k) < 1 { - return 0 - } - return int(float64(k[0]>>4) * 3.3) -} diff --git a/consensus/aura/aura.go b/consensus/aura/aura.go index 5e34525c1..b08f1073a 100644 --- a/consensus/aura/aura.go +++ b/consensus/aura/aura.go @@ -28,6 +28,7 @@ import ( lru "github.com/hashicorp/golang-lru" "github.com/holiman/uint256" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/accounts/abi" "github.com/ledgerwatch/erigon/common" @@ -1244,7 +1245,7 @@ func (c *AuRa) SealHash(header *types.Header) common.Hash { // Close implements consensus.Engine. It's a noop for clique as there are no background threads. func (c *AuRa) Close() error { - common.SafeClose(c.exitCh) + libcommon.SafeClose(c.exitCh) return nil } diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index a35d55acd..37c9abde1 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -32,6 +32,7 @@ import ( "github.com/ledgerwatch/erigon-lib/kv" "golang.org/x/crypto/sha3" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" "github.com/ledgerwatch/erigon/common/debug" @@ -500,7 +501,7 @@ func (c *Clique) SealHash(header *types.Header) common.Hash { // Close implements consensus.Engine. It's a noop for clique as there are no background threads. func (c *Clique) Close() error { - common.SafeClose(c.exitCh) + libcommon.SafeClose(c.exitCh) return nil } diff --git a/core/tx_cacher.go b/core/tx_cacher.go index 3ecefa227..e4c8cf057 100644 --- a/core/tx_cacher.go +++ b/core/tx_cacher.go @@ -19,7 +19,7 @@ package core import ( "sync" - "github.com/ledgerwatch/erigon/common" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon/common/debug" "github.com/ledgerwatch/erigon/core/types" ) @@ -70,7 +70,7 @@ func NewTxSenderCacher(threads int) *TxSenderCacher { // data structures. func (cacher *TxSenderCacher) cache() { for task := range cacher.tasks { - if err := common.Stopped(cacher.exitCh); err != nil { + if err := libcommon.Stopped(cacher.exitCh); err != nil { return } @@ -81,7 +81,7 @@ func (cacher *TxSenderCacher) cache() { } func (cacher *TxSenderCacher) Close() { - common.SafeClose(cacher.exitCh) + libcommon.SafeClose(cacher.exitCh) close(cacher.tasks) cacher.wg.Wait() } diff --git a/core/tx_pool.go b/core/tx_pool.go index 4d861f4df..561eb3584 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -25,6 +25,7 @@ import ( "github.com/VictoriaMetrics/metrics" "github.com/holiman/uint256" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/debug" @@ -351,7 +352,7 @@ func (pool *TxPool) loop() { // System shutdown. case <-pool.stopCh: - common.SafeClose(pool.reorgShutdownCh) + libcommon.SafeClose(pool.reorgShutdownCh) return // Handle stats reporting ticks diff --git a/eth/backend.go b/eth/backend.go index cd2f40d86..dfac89ac7 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -30,7 +30,9 @@ import ( "time" "github.com/holiman/uint256" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/direct" + "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil" "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" txpool_proto "github.com/ledgerwatch/erigon-lib/gointerfaces/txpool" @@ -40,7 +42,6 @@ import ( "github.com/ledgerwatch/erigon/cmd/sentry/download" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/debug" - "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/consensus/clique" "github.com/ledgerwatch/erigon/consensus/ethash" @@ -677,7 +678,7 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy case err := <-errc: works = false hasWork = false - if errors.Is(err, common.ErrStopped) { + if errors.Is(err, libcommon.ErrStopped) { return } if err != nil { diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go index 4df1f219d..3388009dd 100644 --- a/eth/fetcher/block_fetcher.go +++ b/eth/fetcher/block_fetcher.go @@ -22,6 +22,7 @@ import ( "math/rand" "time" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/debug" "github.com/ledgerwatch/erigon/common/prque" @@ -229,7 +230,7 @@ func (f *BlockFetcher) Start() { // Stop terminates the announcement based synchroniser, canceling all pending // operations. func (f *BlockFetcher) Stop() { - common.SafeClose(f.quit) + libcommon.SafeClose(f.quit) } // Notify announces the fetcher of the potential availability of a new block in diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 24095bf3e..a75a099d5 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -24,6 +24,7 @@ import ( "time" mapset "github.com/deckarep/golang-set" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/debug" "github.com/ledgerwatch/erigon/common/mclock" @@ -341,7 +342,7 @@ func (f *TxFetcher) Start() { // Stop terminates the announcement based synchroniser, canceling all pending // operations. func (f *TxFetcher) Stop() { - common.SafeClose(f.quit) + libcommon.SafeClose(f.quit) } func (f *TxFetcher) loop() { diff --git a/eth/gasprice/feehistory.go b/eth/gasprice/feehistory.go index ec3cd3a75..673f53bce 100644 --- a/eth/gasprice/feehistory.go +++ b/eth/gasprice/feehistory.go @@ -25,6 +25,7 @@ import ( "sync/atomic" "github.com/holiman/uint256" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/consensus/misc" "github.com/ledgerwatch/erigon/core/types" @@ -241,7 +242,7 @@ func (oracle *Oracle) FeeHistory(ctx context.Context, blocks int, unresolvedLast firstMissing = blocks ) for ; blocks > 0; blocks-- { - if err = common.Stopped(ctx.Done()); err != nil { + if err = libcommon.Stopped(ctx.Done()); err != nil { return common.Big0, nil, nil, nil, err } // Retrieve the next block number to fetch with this goroutine diff --git a/eth/stagedsync/stage_blockhashes.go b/eth/stagedsync/stage_blockhashes.go index 886fdbb7a..97cb328e1 100644 --- a/eth/stagedsync/stage_blockhashes.go +++ b/eth/stagedsync/stage_blockhashes.go @@ -5,10 +5,10 @@ import ( "encoding/binary" "fmt" + "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" ) diff --git a/eth/stagedsync/stage_bodies.go b/eth/stagedsync/stage_bodies.go index 24dd9e7a3..da2837698 100644 --- a/eth/stagedsync/stage_bodies.go +++ b/eth/stagedsync/stage_bodies.go @@ -7,6 +7,7 @@ import ( "time" "github.com/c2h5oh/datasize" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/core/rawdb" @@ -212,7 +213,7 @@ Loop: } } if stopped { - return common.ErrStopped + return libcommon.ErrStopped } if bodyProgress > s.BlockNumber+16 { log.Info(fmt.Sprintf("[%s] Processed", logPrefix), "highest", bodyProgress) diff --git a/eth/stagedsync/stage_call_traces.go b/eth/stagedsync/stage_call_traces.go index 4ad3aff06..aeaf0563c 100644 --- a/eth/stagedsync/stage_call_traces.go +++ b/eth/stagedsync/stage_call_traces.go @@ -11,10 +11,11 @@ import ( "github.com/RoaringBitmap/roaring/roaring64" "github.com/c2h5oh/datasize" + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/core/vm" "github.com/ledgerwatch/erigon/core/vm/stack" "github.com/ledgerwatch/erigon/crypto" @@ -309,7 +310,7 @@ func DoUnwindCallTraces(logPrefix string, db kv.RwTx, from, to uint64, ctx conte "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys)) case <-ctx.Done(): - return common.ErrStopped + return libcommon.ErrStopped default: } } @@ -456,7 +457,7 @@ func pruneCallTraces(tx kv.RwTx, logPrefix string, pruneTo uint64, ctx context.C runtime.ReadMemStats(&m) log.Info(fmt.Sprintf("[%s] Progress", logPrefix), "number", blockNum, "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys)) case <-ctx.Done(): - return common.ErrStopped + return libcommon.ErrStopped default: } } @@ -486,7 +487,7 @@ func pruneCallTraces(tx kv.RwTx, logPrefix string, pruneTo uint64, ctx context.C case <-logEvery.C: log.Info(fmt.Sprintf("[%s]", logPrefix), "table", kv.CallFromIndex, "key", from) case <-ctx.Done(): - return common.ErrStopped + return libcommon.ErrStopped default: } return nil @@ -518,7 +519,7 @@ func pruneCallTraces(tx kv.RwTx, logPrefix string, pruneTo uint64, ctx context.C case <-logEvery.C: log.Info(fmt.Sprintf("[%s]", logPrefix), "table", kv.CallToIndex, "key", to) case <-ctx.Done(): - return common.ErrStopped + return libcommon.ErrStopped default: } return nil diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index 5344f5721..76aec160c 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -9,11 +9,12 @@ import ( "time" "github.com/c2h5oh/datasize" + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/changeset" "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/core" "github.com/ledgerwatch/erigon/core/rawdb" @@ -268,7 +269,7 @@ func SpawnExecuteBlocksStage(s *StageState, u Unwinder, tx kv.RwTx, toBlock uint var stoppedErr error Loop: for blockNum := stageProgress + 1; blockNum <= to; blockNum++ { - if stoppedErr = common.Stopped(quit); stoppedErr != nil { + if stoppedErr = libcommon.Stopped(quit); stoppedErr != nil { break } var err error @@ -372,7 +373,7 @@ func pruneChangeSets(tx kv.RwTx, logPrefix string, table string, pruneTo uint64, case <-logEvery.C: log.Info(fmt.Sprintf("[%s]", logPrefix), "table", table, "block", blockNum) case <-ctx.Done(): - return common.ErrStopped + return libcommon.ErrStopped default: } if err = c.DeleteCurrentDuplicates(); err != nil { @@ -653,7 +654,7 @@ func pruneReceipts(tx kv.RwTx, logPrefix string, pruneTo uint64, logEvery *time. case <-logEvery.C: log.Info(fmt.Sprintf("[%s]", logPrefix), "table", kv.Receipts, "block", blockNum) case <-ctx.Done(): - return common.ErrStopped + return libcommon.ErrStopped default: } if err = c.DeleteCurrent(); err != nil { @@ -679,7 +680,7 @@ func pruneReceipts(tx kv.RwTx, logPrefix string, pruneTo uint64, logEvery *time. case <-logEvery.C: log.Info(fmt.Sprintf("[%s]", logPrefix), "table", kv.Log, "block", blockNum) case <-ctx.Done(): - return common.ErrStopped + return libcommon.ErrStopped default: } if err = c.DeleteCurrent(); err != nil { @@ -708,7 +709,7 @@ func pruneCallTracesSet(tx kv.RwTx, logPrefix string, pruneTo uint64, logEvery * case <-logEvery.C: log.Info(fmt.Sprintf("[%s]", logPrefix), "table", kv.CallTraceSet, "block", blockNum) case <-ctx.Done(): - return common.ErrStopped + return libcommon.ErrStopped default: } if err = c.DeleteCurrentDuplicates(); err != nil { diff --git a/eth/stagedsync/stage_hashstate.go b/eth/stagedsync/stage_hashstate.go index 2421aee64..5d6d36218 100644 --- a/eth/stagedsync/stage_hashstate.go +++ b/eth/stagedsync/stage_hashstate.go @@ -7,11 +7,11 @@ import ( "fmt" "os" + "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/changeset" "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/core/types/accounts" "github.com/ledgerwatch/log/v3" ) diff --git a/eth/stagedsync/stage_hashstate_test.go b/eth/stagedsync/stage_hashstate_test.go index 5e18d4f86..8c0534c93 100644 --- a/eth/stagedsync/stage_hashstate_test.go +++ b/eth/stagedsync/stage_hashstate_test.go @@ -9,7 +9,7 @@ import ( "github.com/ledgerwatch/erigon-lib/kv/memdb" "github.com/stretchr/testify/require" - "github.com/ledgerwatch/erigon/common" + libcommon "github.com/ledgerwatch/erigon-lib/common" ) func TestPromoteHashedStateClearState(t *testing.T) { @@ -94,7 +94,7 @@ func TestPromoteIncrementallyShutdown(t *testing.T) { cancelFuncExec bool errExp error }{ - {"cancel", true, common.ErrStopped}, + {"cancel", true, libcommon.ErrStopped}, {"no cancel", false, nil}, } @@ -124,7 +124,7 @@ func TestPromoteHashedStateCleanlyShutdown(t *testing.T) { cancelFuncExec bool errExp error }{ - {"cancel", true, common.ErrStopped}, + {"cancel", true, libcommon.ErrStopped}, {"no cancel", false, nil}, } @@ -157,7 +157,7 @@ func TestUnwindHashStateShutdown(t *testing.T) { cancelFuncExec bool errExp error }{ - {"cancel", true, common.ErrStopped}, + {"cancel", true, libcommon.ErrStopped}, {"no cancel", false, nil}, } diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index f5e3fb4c4..fb6069dad 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -9,6 +9,7 @@ import ( "time" "github.com/c2h5oh/datasize" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" @@ -213,7 +214,7 @@ Loop: } } if stopped { - return common.ErrStopped + return libcommon.ErrStopped } // We do not print the followin line if the stage was interrupted log.Info(fmt.Sprintf("[%s] Processed", logPrefix), "highest inserted", headerInserter.GetHighest(), "age", common.PrettyAge(time.Unix(int64(headerInserter.GetHighestTimestamp()), 0))) diff --git a/eth/stagedsync/stage_indexes.go b/eth/stagedsync/stage_indexes.go index 2c4b10422..c5084076f 100644 --- a/eth/stagedsync/stage_indexes.go +++ b/eth/stagedsync/stage_indexes.go @@ -12,11 +12,12 @@ import ( "github.com/RoaringBitmap/roaring/roaring64" "github.com/c2h5oh/datasize" + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/changeset" "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/ethdb" "github.com/ledgerwatch/erigon/ethdb/bitmapdb" "github.com/ledgerwatch/erigon/ethdb/prune" @@ -146,7 +147,7 @@ func promoteHistory(logPrefix string, tx kv.RwTx, changesetBucket string, start, if blockN >= stop { return false, nil } - if err := common.Stopped(quit); err != nil { + if err := libcommon.Stopped(quit); err != nil { return false, err } @@ -157,7 +158,7 @@ func promoteHistory(logPrefix string, tx kv.RwTx, changesetBucket string, start, case <-logEvery.C: var m runtime.MemStats runtime.ReadMemStats(&m) - log.Info(fmt.Sprintf("[%s] Progress", logPrefix), "number", blockN, "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys)) + log.Info(fmt.Sprintf("[%s] Progress", logPrefix), "number", blockN, "alloc", libcommon.ByteCount(m.Alloc), "sys", libcommon.ByteCount(m.Sys)) case <-checkFlushEvery.C: if needFlush64(updates, cfg.bufLimit) { if err := flushBitmaps64(collectorUpdates, updates); err != nil { @@ -294,9 +295,9 @@ func unwindHistory(logPrefix string, db kv.RwTx, csBucket string, to uint64, cfg case <-logEvery.C: var m runtime.MemStats runtime.ReadMemStats(&m) - log.Info(fmt.Sprintf("[%s] Progress", logPrefix), "number", blockN, "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys)) + log.Info(fmt.Sprintf("[%s] Progress", logPrefix), "number", blockN, "alloc", libcommon.ByteCount(m.Alloc), "sys", libcommon.ByteCount(m.Sys)) case <-quitCh: - return false, common.ErrStopped + return false, libcommon.ErrStopped default: } k = dbutils.CompositeKeyWithoutIncarnation(k) @@ -447,7 +448,7 @@ func pruneHistoryIndex(tx kv.RwTx, csTable, logPrefix, tmpDir string, pruneTo ui case <-logEvery.C: log.Info(fmt.Sprintf("[%s]", logPrefix), "table", changeset.Mapper[csTable].IndexBucket, "key", fmt.Sprintf("%x", addr)) case <-ctx.Done(): - return common.ErrStopped + return libcommon.ErrStopped default: } for k, _, err := c.Seek(addr); k != nil; k, _, err = c.Next() { diff --git a/eth/stagedsync/stage_interhashes.go b/eth/stagedsync/stage_interhashes.go index 8f48c3a91..e1200485e 100644 --- a/eth/stagedsync/stage_interhashes.go +++ b/eth/stagedsync/stage_interhashes.go @@ -8,11 +8,11 @@ import ( "os" "sort" + "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/changeset" "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types/accounts" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" diff --git a/eth/stagedsync/stage_log_index.go b/eth/stagedsync/stage_log_index.go index 19ad67ea6..ed368bf10 100644 --- a/eth/stagedsync/stage_log_index.go +++ b/eth/stagedsync/stage_log_index.go @@ -11,10 +11,10 @@ import ( "github.com/RoaringBitmap/roaring" "github.com/c2h5oh/datasize" + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/ethdb/bitmapdb" "github.com/ledgerwatch/erigon/ethdb/cbor" @@ -117,7 +117,7 @@ func promoteLogIndex(logPrefix string, tx kv.RwTx, start uint64, cfg LogIndexCfg return err } - if err := common.Stopped(quit); err != nil { + if err := libcommon.Stopped(quit); err != nil { return err } blockNum := binary.BigEndian.Uint64(k[:8]) @@ -127,7 +127,7 @@ func promoteLogIndex(logPrefix string, tx kv.RwTx, start uint64, cfg LogIndexCfg case <-logEvery.C: var m runtime.MemStats runtime.ReadMemStats(&m) - log.Info(fmt.Sprintf("[%s] Progress", logPrefix), "number", blockNum, "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys)) + log.Info(fmt.Sprintf("[%s] Progress", logPrefix), "number", blockNum, "alloc", libcommon.ByteCount(m.Alloc), "sys", libcommon.ByteCount(m.Sys)) case <-checkFlushEvery.C: if needFlush(topics, cfg.bufLimit) { if err := flushBitmaps(collectorTopics, topics); err != nil { @@ -265,7 +265,7 @@ func unwindLogIndex(logPrefix string, db kv.RwTx, to uint64, cfg LogIndexCfg, qu return err } - if err := common.Stopped(quitCh); err != nil { + if err := libcommon.Stopped(quitCh); err != nil { return err } var logs types.Logs @@ -359,7 +359,7 @@ func pruneOldLogChunks(tx kv.RwTx, bucket string, inMem map[string]struct{}, pru case <-logEvery.C: log.Info(fmt.Sprintf("[%s]", logPrefix), "table", kv.AccountsHistory, "block", blockNum) case <-ctx.Done(): - return common.ErrStopped + return libcommon.ErrStopped default: } if err = c.DeleteCurrent(); err != nil { @@ -428,7 +428,7 @@ func pruneLogIndex(logPrefix string, tx kv.RwTx, tmpDir string, pruneTo uint64, case <-logEvery.C: log.Info(fmt.Sprintf("[%s]", logPrefix), "table", kv.Log, "block", blockNum) case <-ctx.Done(): - return common.ErrStopped + return libcommon.ErrStopped default: } diff --git a/eth/stagedsync/stage_mining_exec.go b/eth/stagedsync/stage_mining_exec.go index 173a62218..1061932c6 100644 --- a/eth/stagedsync/stage_mining_exec.go +++ b/eth/stagedsync/stage_mining_exec.go @@ -3,6 +3,7 @@ package stagedsync import ( "fmt" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/consensus" @@ -173,7 +174,7 @@ func addTransactionsToMiningBlock(current *MiningBlock, chainConfig params.Chain } for { - if err := common.Stopped(quit); err != nil { + if err := libcommon.Stopped(quit); err != nil { return nil, err } diff --git a/eth/stagedsync/stage_senders.go b/eth/stagedsync/stage_senders.go index 7b993d287..c34c47a1d 100644 --- a/eth/stagedsync/stage_senders.go +++ b/eth/stagedsync/stage_senders.go @@ -9,11 +9,12 @@ import ( "sync" "time" + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" "github.com/ledgerwatch/erigon/common/debug" - "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" @@ -97,7 +98,7 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R if err != nil { return err } - if err := common.Stopped(quitCh); err != nil { + if err := libcommon.Stopped(quitCh); err != nil { return err } @@ -194,7 +195,7 @@ Loop: if err != nil { return err } - if err := common.Stopped(quitCh); err != nil { + if err := libcommon.Stopped(quitCh); err != nil { return err } @@ -312,14 +313,14 @@ func recoverSenders(ctx context.Context, logPrefix string, cryptoContext *secp25 } // prevent sending to close channel - if err := common.Stopped(quit); err != nil { + if err := libcommon.Stopped(quit); err != nil { job.err = err - } else if err = common.Stopped(ctx.Done()); err != nil { + } else if err = libcommon.Stopped(ctx.Done()); err != nil { job.err = err } out <- job - if errors.Is(job.err, common.ErrStopped) { + if errors.Is(job.err, libcommon.ErrStopped) { return } } @@ -378,7 +379,7 @@ func PruneSendersStage(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Co case <-logEvery.C: log.Info(fmt.Sprintf("[%s]", s.LogPrefix()), "table", kv.Senders, "block", blockNum) case <-ctx.Done(): - return common.ErrStopped + return libcommon.ErrStopped default: } diff --git a/eth/stagedsync/stage_tevm.go b/eth/stagedsync/stage_tevm.go index 1e337e23a..8d033a055 100644 --- a/eth/stagedsync/stage_tevm.go +++ b/eth/stagedsync/stage_tevm.go @@ -8,6 +8,7 @@ import ( "time" "github.com/c2h5oh/datasize" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" @@ -141,7 +142,7 @@ func transpileBatch(logPrefix string, stageProgress, toBlock uint64, cfg Transpi select { case <-quitCh: - return 0, common.ErrStopped + return 0, libcommon.ErrStopped case <-logEvery.C: prevContract, logTime = logTEVMProgress(logPrefix, prevContract, logTime, stageProgress) tx.CollectMetrics() @@ -259,7 +260,7 @@ func logTEVMProgress(logPrefix string, prevContract uint64, prevTime time.Time, "number", currentContract, "contracts/s", speed, } - logpairs = append(logpairs, "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys)) + logpairs = append(logpairs, "alloc", libcommon.ByteCount(m.Alloc), "sys", libcommon.ByteCount(m.Sys)) log.Info(fmt.Sprintf("[%s] Translated contracts", logPrefix), logpairs...) return currentContract, currentTime diff --git a/eth/stagedsync/stage_txlookup.go b/eth/stagedsync/stage_txlookup.go index 021d2cc8d..597973b4a 100644 --- a/eth/stagedsync/stage_txlookup.go +++ b/eth/stagedsync/stage_txlookup.go @@ -7,10 +7,10 @@ import ( "fmt" "math/big" + "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/ethdb/prune" diff --git a/eth/stagedsync/stage_txpool.go b/eth/stagedsync/stage_txpool.go index a45cc9bfa..74803f0e8 100644 --- a/eth/stagedsync/stage_txpool.go +++ b/eth/stagedsync/stage_txpool.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "fmt" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" @@ -105,7 +106,7 @@ func incrementalTxPoolUpdate(logPrefix string, from, to uint64, pool *core.TxPoo if err != nil { return err } - if err := common.Stopped(quitCh); err != nil { + if err := libcommon.Stopped(quitCh); err != nil { return err } @@ -127,7 +128,7 @@ func incrementalTxPoolUpdate(logPrefix string, from, to uint64, pool *core.TxPoo if err != nil { return err } - if err := common.Stopped(quitCh); err != nil { + if err := libcommon.Stopped(quitCh); err != nil { return err } @@ -201,7 +202,7 @@ func unwindTxPoolUpdate(logPrefix string, from, to uint64, pool *core.TxPool, tx if err != nil { return err } - if err := common.Stopped(quitCh); err != nil { + if err := libcommon.Stopped(quitCh); err != nil { return err } blockNumber := binary.BigEndian.Uint64(k[:8]) @@ -223,7 +224,7 @@ func unwindTxPoolUpdate(logPrefix string, from, to uint64, pool *core.TxPool, tx if err != nil { return err } - if err := common.Stopped(quitCh); err != nil { + if err := libcommon.Stopped(quitCh); err != nil { return err } @@ -254,7 +255,7 @@ func unwindTxPoolUpdate(logPrefix string, from, to uint64, pool *core.TxPool, tx if err != nil { return err } - if err := common.Stopped(quitCh); err != nil { + if err := libcommon.Stopped(quitCh); err != nil { return err } diff --git a/ethdb/olddb/mutation.go b/ethdb/olddb/mutation.go index 128277985..179e39216 100644 --- a/ethdb/olddb/mutation.go +++ b/ethdb/olddb/mutation.go @@ -11,8 +11,8 @@ import ( "unsafe" "github.com/google/btree" + "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/ethdb" "github.com/ledgerwatch/log/v3" ) diff --git a/go.mod b/go.mod index 84f7ad046..e488948f0 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/fatih/color v1.7.0 github.com/fjl/gencodec v0.0.0-20191126094850-e283372f291f github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect + github.com/go-stack/stack v1.8.1 // indirect github.com/goccy/go-json v0.7.4 github.com/gofrs/flock v0.8.1 github.com/golang/protobuf v1.5.2 @@ -37,10 +38,11 @@ require ( github.com/julienschmidt/httprouter v1.3.0 github.com/kevinburke/go-bindata v3.21.0+incompatible github.com/kylelemons/godebug v1.1.0 // indirect - github.com/ledgerwatch/erigon-lib v0.0.0-20210908121016-58c19d9219f0 + github.com/ledgerwatch/erigon-lib v0.0.0-20210911154831-f79629b98d81 github.com/ledgerwatch/log/v3 v3.3.0 github.com/ledgerwatch/secp256k1 v0.0.0-20210626115225-cd5cd00ed72d github.com/logrusorgru/aurora/v3 v3.0.0 + github.com/mattn/go-isatty v0.0.14 // indirect github.com/pelletier/go-toml v1.9.3 github.com/petar/GoLLRB v0.0.0-20190514000832-33fb24c13b99 github.com/quasilyte/go-ruleguard/dsl v0.3.6 @@ -58,7 +60,7 @@ require ( go.uber.org/atomic v1.9.0 golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e golang.org/x/sync v0.0.0-20210220032951-036812b2e83c - golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 + golang.org/x/sys v0.0.0-20210910150752-751e447fb3d0 golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 golang.org/x/tools v0.1.2 google.golang.org/grpc v1.39.1 diff --git a/go.sum b/go.sum index 57a2de973..8b238a11d 100644 --- a/go.sum +++ b/go.sum @@ -299,8 +299,9 @@ github.com/go-playground/locales v0.12.1/go.mod h1:IUMDtCfWo/w/mtMfIE/IG2K+Ey3yg github.com/go-playground/universal-translator v0.16.0/go.mod h1:1AnU7NaIRDWWzGEKwgtJRd2xk99HeFyHw3yid4rvQIY= github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU= github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg= -github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-stack/stack v1.8.1 h1:ntEHSVwIt7PNXNpgPmVfMrNhLtgjlmnZha2kOpuRiDw= +github.com/go-stack/stack v1.8.1/go.mod h1:dcoOX6HbPZSZptuspn9bctJ+N/CnF5gGygcUP3XYfe4= github.com/goccy/go-json v0.7.4 h1:B44qRUFwz/vxPKPISQ1KhvzRi9kZ28RAf6YtjriBZ5k= github.com/goccy/go-json v0.7.4/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= @@ -492,8 +493,8 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/ledgerwatch/erigon-lib v0.0.0-20210908121016-58c19d9219f0 h1:r0pkqEmLKS2xQEMT2TXQgrh0KSLebao3Erc3CIM2DV0= -github.com/ledgerwatch/erigon-lib v0.0.0-20210908121016-58c19d9219f0/go.mod h1:q846JoG0oCWU9xTunmQAysfywjyoUzxx/5tHPo/F0t0= +github.com/ledgerwatch/erigon-lib v0.0.0-20210911154831-f79629b98d81 h1:BnV+T0n7sSnNvFOOqk/1piYfeaE2KYEEP+UiERLuhBk= +github.com/ledgerwatch/erigon-lib v0.0.0-20210911154831-f79629b98d81/go.mod h1:T3Jsfvp5YkG2anXy53dEEM1FbvQA4dXrfp5WcSypMoE= github.com/ledgerwatch/log/v3 v3.3.0 h1:k8N/3NQLILr8CKCMyza261vLFKU7VA+nMNNb0wVyQSc= github.com/ledgerwatch/log/v3 v3.3.0/go.mod h1:J58eOHHrIYHxl7LKkRsb/0YibKwtLfauUryl5SLRGm0= github.com/ledgerwatch/secp256k1 v0.0.0-20210626115225-cd5cd00ed72d h1:/IKMrJdfRsoYNc36PXqP4xMH3vhW/8IQyBKGQbKZUno= @@ -526,8 +527,9 @@ github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= -github.com/mattn/go-isatty v0.0.13 h1:qdl+GuBjcsKKDco5BsxPJlId98mSWNKqYA+Co0SC1yA= github.com/mattn/go-isatty v0.0.13/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= +github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/mattn/go-sqlite3 v1.7.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/mattn/go-sqlite3 v1.13.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= @@ -1021,8 +1023,8 @@ golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 h1:siQdpVirKtzPhKl3lZWozZraCFObP8S1v6PRp0bLrtU= -golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210910150752-751e447fb3d0 h1:xrCZDmdtoloIiooiA9q0OQb9r8HejIHYoHGhGCe1pGg= +golang.org/x/sys v0.0.0-20210910150752-751e447fb3d0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/migrations/header_prefix.go b/migrations/header_prefix.go index 240b19a0d..e87fcb2f1 100644 --- a/migrations/header_prefix.go +++ b/migrations/header_prefix.go @@ -5,10 +5,10 @@ import ( "context" "fmt" + "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/common/etl" ) var headerPrefixToSeparateBuckets = Migration{ diff --git a/turbo/cli/flags.go b/turbo/cli/flags.go index d9d9de85f..e704145ba 100644 --- a/turbo/cli/flags.go +++ b/turbo/cli/flags.go @@ -6,10 +6,10 @@ import ( "time" "github.com/c2h5oh/datasize" + "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/cmd/utils" "github.com/ledgerwatch/erigon/common" - "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/common/hexutil" "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/ethdb/prune" diff --git a/turbo/shards/state_change_accumulator.go b/turbo/shards/state_change_accumulator.go index e5b60f57a..ee3c940e9 100644 --- a/turbo/shards/state_change_accumulator.go +++ b/turbo/shards/state_change_accumulator.go @@ -3,6 +3,7 @@ package shards import ( "context" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/gointerfaces" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon/common" @@ -31,7 +32,7 @@ func (a *Accumulator) SendAndReset(ctx context.Context, c StateChangeConsumer) { return } for i := range a.changes { - if err := common.Stopped(ctx.Done()); err != nil { + if err := libcommon.Stopped(ctx.Done()); err != nil { return } c.SendStateChanges(&a.changes[i]) @@ -55,7 +56,7 @@ func (a *Accumulator) StartChange(blockHeight uint64, blockHash common.Hash, txs if txs != nil { a.latestChange.Txs = make([][]byte, len(txs)) for i := range txs { - a.latestChange.Txs[i] = common.CopyBytes(txs[i]) + a.latestChange.Txs[i] = libcommon.Copy(txs[i]) } } a.latestChange.ProtocolBaseFee = protocolBaseFee diff --git a/turbo/snapshotsync/bodies_snapshot.go b/turbo/snapshotsync/bodies_snapshot.go index 9000fd08c..7f57c4066 100644 --- a/turbo/snapshotsync/bodies_snapshot.go +++ b/turbo/snapshotsync/bodies_snapshot.go @@ -8,11 +8,11 @@ import ( "fmt" "os" + "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/ethdb" "github.com/ledgerwatch/erigon/ethdb/snapshotdb" diff --git a/turbo/snapshotsync/headers_snapshot.go b/turbo/snapshotsync/headers_snapshot.go index 18df3b423..a85bc84ec 100644 --- a/turbo/snapshotsync/headers_snapshot.go +++ b/turbo/snapshotsync/headers_snapshot.go @@ -7,6 +7,7 @@ import ( "os" "time" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon/common" @@ -63,7 +64,7 @@ func GenerateHeadersSnapshot(ctx context.Context, db kv.Tx, sntx kv.RwTx, toBloc tt := time.Now() for i := uint64(0); i <= toBlock; i++ { if common.IsCanceled(ctx) { - return common.ErrStopped + return libcommon.ErrStopped } select { case <-t.C: diff --git a/turbo/snapshotsync/postprocessing.go b/turbo/snapshotsync/postprocessing.go index d6703f8a0..ee6a92b44 100644 --- a/turbo/snapshotsync/postprocessing.go +++ b/turbo/snapshotsync/postprocessing.go @@ -8,10 +8,10 @@ import ( "math/big" "os" + "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index afdc92494..733cf3a9a 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -8,6 +8,7 @@ import ( "time" "github.com/holiman/uint256" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/cmd/sentry/download" "github.com/ledgerwatch/erigon/common" @@ -52,7 +53,7 @@ func StageLoop( // Estimate the current top height seen from the peer height := hd.TopSeenHeight() if err := StageLoopStep(ctx, db, sync, height, notifications, initialCycle, updateHead, nil); err != nil { - if errors.Is(err, common.ErrStopped) || errors.Is(err, context.Canceled) { + if errors.Is(err, libcommon.ErrStopped) || errors.Is(err, context.Canceled) { return } diff --git a/turbo/trie/trie.go b/turbo/trie/trie.go index 77a6efefd..d525ff994 100644 --- a/turbo/trie/trie.go +++ b/turbo/trie/trie.go @@ -22,6 +22,7 @@ import ( "encoding/binary" "fmt" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/debug" "github.com/ledgerwatch/erigon/core/types/accounts" @@ -473,9 +474,9 @@ func findSubTriesToLoad(nd node, nibblePath []byte, hook []byte, rl RetainDecide } return newPrefixes, newFixedBits, newHooks case hashNode: - newPrefixes = append(prefixes, common.CopyBytes(dbPrefix)) + newPrefixes = append(prefixes, libcommon.Copy(dbPrefix)) newFixedBits = append(fixedbits, bits) - newHooks = append(hooks, common.CopyBytes(hook)) + newHooks = append(hooks, libcommon.Copy(hook)) return newPrefixes, newFixedBits, newHooks } return prefixes, fixedbits, hooks @@ -528,7 +529,7 @@ func (t *Trie) insertRecursive(origNode node, key []byte, pos int, value node) ( var nn node switch n := origNode.(type) { case nil: - return true, NewShortNode(common.CopyBytes(key[pos:]), value) + return true, NewShortNode(libcommon.Copy(key[pos:]), value) case *accountNode: updated, nn = t.insertRecursive(n.storage, key, pos, value) if updated { @@ -555,13 +556,13 @@ func (t *Trie) insertRecursive(origNode node, key []byte, pos int, value node) ( if len(n.Key) == matchlen+1 { c1 = n.Val } else { - c1 = NewShortNode(common.CopyBytes(n.Key[matchlen+1:]), n.Val) + c1 = NewShortNode(libcommon.Copy(n.Key[matchlen+1:]), n.Val) } var c2 node if len(key) == pos+matchlen+1 { c2 = value } else { - c2 = NewShortNode(common.CopyBytes(key[pos+matchlen+1:]), value) + c2 = NewShortNode(libcommon.Copy(key[pos+matchlen+1:]), value) } branch := &duoNode{} if n.Key[matchlen] < key[pos+matchlen] { @@ -578,7 +579,7 @@ func (t *Trie) insertRecursive(origNode node, key []byte, pos int, value node) ( newNode = branch // current node leaves the generation, but new node branch joins it } else { // Otherwise, replace it with a short node leading up to the branch. - n.Key = common.CopyBytes(key[pos : pos+matchlen]) + n.Key = libcommon.Copy(key[pos : pos+matchlen]) n.Val = branch n.ref.len = 0 newNode = n @@ -612,7 +613,7 @@ func (t *Trie) insertRecursive(origNode node, key []byte, pos int, value node) ( if len(key) == pos+1 { child = value } else { - child = NewShortNode(common.CopyBytes(key[pos+1:]), value) + child = NewShortNode(libcommon.Copy(key[pos+1:]), value) } newnode := &fullNode{} newnode.Children[i1] = n.child1 @@ -631,7 +632,7 @@ func (t *Trie) insertRecursive(origNode node, key []byte, pos int, value node) ( if len(key) == pos+1 { n.Children[key[pos]] = value } else { - n.Children[key[pos]] = NewShortNode(common.CopyBytes(key[pos+1:]), value) + n.Children[key[pos]] = NewShortNode(libcommon.Copy(key[pos+1:]), value) } updated = true n.ref.len = 0 @@ -1236,7 +1237,7 @@ func (t *Trie) GetNodeByHash(hash common.Hash) []byte { return nil } - return common.CopyBytes(rlp) + return libcommon.Copy(rlp) } func (t *Trie) evictNodeFromHashMap(nd node) { diff --git a/turbo/trie/trie_root.go b/turbo/trie/trie_root.go index e7ec72fd3..273a49eff 100644 --- a/turbo/trie/trie_root.go +++ b/turbo/trie/trie_root.go @@ -7,6 +7,7 @@ import ( "math/bits" "time" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" @@ -946,7 +947,7 @@ func (c *AccTrieCursor) _consume() (bool, error) { func (c *AccTrieCursor) _next() (k, v []byte, hasTree bool, err error) { var ok bool - if err = common.Stopped(c.quit); err != nil { + if err = libcommon.Stopped(c.quit); err != nil { return []byte{}, nil, false, err } c.SkipState = c.SkipState && c._hasTree() @@ -1102,7 +1103,7 @@ func (c *StorageTrieCursor) _consume() (bool, error) { if ok { c.skipState = c.skipState && keyIsBefore(c.kBuf, c.nextCreated) c.nextCreated = nextCreated - c.cur = common.CopyBytes(c.kBuf[80:]) + c.cur = libcommon.Copy(c.kBuf[80:]) return true, nil } } @@ -1254,7 +1255,7 @@ func (c *StorageTrieCursor) _nextSiblingInDB() error { func (c *StorageTrieCursor) _next() (k, v []byte, hasTree bool, err error) { var ok bool - if err = common.Stopped(c.quit); err != nil { + if err = libcommon.Stopped(c.quit); err != nil { return []byte{}, nil, false, err } c.skipState = c.skipState && c._hasTree() @@ -1389,7 +1390,7 @@ func (c *StateCursor) Seek(seek []byte) ([]byte, []byte, []byte, error) { } func (c *StateCursor) Next() ([]byte, []byte, []byte, error) { - if err := common.Stopped(c.quit); err != nil { + if err := libcommon.Stopped(c.quit); err != nil { return []byte{}, nil, nil, err } k, v, err := c.c.Next()