erl.collector - move logPrefix to constructor (#2866)

This commit is contained in:
Alex Sharov 2021-10-25 15:09:43 +07:00 committed by GitHub
parent b3509f2dcb
commit 5d7904c133
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 98 additions and 101 deletions

View File

@ -1703,7 +1703,7 @@ func compress1(chaindata string, name string) error {
wg.Add(runtime.NumCPU())
collectors := make([]*etl.Collector, runtime.NumCPU())
for i := 0; i < runtime.NumCPU(); i++ {
collector := etl.NewCollector(tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
collector := etl.NewCollector(CompressLogPrefix, tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
collectors[i] = collector
go processSuperstring(ch, collector, &wg)
}
@ -1741,23 +1741,23 @@ func compress1(chaindata string, name string) error {
}
close(ch)
wg.Wait()
dictCollector := etl.NewCollector(tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
dictCollector := etl.NewCollector(CompressLogPrefix, tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
dictAggregator := &DictAggregator{collector: dictCollector}
for _, collector := range collectors {
if err = collector.Load(CompressLogPrefix, nil /* db */, "" /* toBucket */, dictAggregator.aggLoadFunc, etl.TransformArgs{}); err != nil {
if err = collector.Load(nil, "", dictAggregator.aggLoadFunc, etl.TransformArgs{}); err != nil {
return err
}
collector.Close(CompressLogPrefix)
collector.Close()
}
if err = dictAggregator.finish(); err != nil {
return err
}
db := &DictionaryBuilder{limit: maxDictPatterns} // Only collect 1m words with highest scores
if err = dictCollector.Load(CompressLogPrefix, nil /* db */, "" /* toBucket */, db.compressLoadFunc, etl.TransformArgs{}); err != nil {
if err = dictCollector.Load(nil, "", db.compressLoadFunc, etl.TransformArgs{}); err != nil {
return err
}
db.finish()
dictCollector.Close(CompressLogPrefix)
dictCollector.Close()
var df *os.File
df, err = os.Create(name + ".dictionary.txt")
if err != nil {
@ -2004,9 +2004,9 @@ func optimiseCluster(trace bool, numBuf []byte, input []byte, trie *patricia.Pat
func reduceDictWorker(inputCh chan []byte, completion *sync.WaitGroup, trie *patricia.PatriciaTree, collector *etl.Collector, inputSize *uint64, outputSize *uint64, posMap map[uint64]uint64) {
defer completion.Done()
var output []byte = make([]byte, 0, 256)
var uncovered []int = make([]int, 256)
var patterns []int = make([]int, 0, 256)
var output = make([]byte, 0, 256)
var uncovered = make([]int, 256)
var patterns = make([]int, 0, 256)
cellRing := NewRing()
var mf patricia.MatchFinder
numBuf := make([]byte, binary.MaxVarintLen64)
@ -2320,7 +2320,7 @@ func reducedict(name string) error {
var collectors []*etl.Collector
var posMaps []map[uint64]uint64
for i := 0; i < runtime.NumCPU(); i++ {
collector := etl.NewCollector(tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
collector := etl.NewCollector(CompressLogPrefix, tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
collectors = append(collectors, collector)
posMap := make(map[uint64]uint64)
posMaps = append(posMaps, posMap)
@ -2609,20 +2609,20 @@ func reducedict(name string) error {
}
df.Close()
aggregator := etl.NewCollector(tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
aggregator := etl.NewCollector(CompressLogPrefix, tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
for _, collector := range collectors {
if err = collector.Load(CompressLogPrefix, nil /* db */, "" /* bucket */, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
if err = collector.Load(nil, "", func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
return aggregator.Collect(k, v)
}, etl.TransformArgs{}); err != nil {
return err
}
collector.Close(CompressLogPrefix)
collector.Close()
}
wc := 0
var hc HuffmanCoder
hc.w = cw
if err = aggregator.Load(CompressLogPrefix, nil /* db */, "" /* bucket */, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
if err = aggregator.Load(nil, "", func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
// Re-encode it
r := bytes.NewReader(v)
var l uint64
@ -2692,7 +2692,7 @@ func reducedict(name string) error {
}, etl.TransformArgs{}); err != nil {
return err
}
aggregator.Close(CompressLogPrefix)
aggregator.Close()
if err = cw.Flush(); err != nil {
return err
}
@ -2713,7 +2713,7 @@ func decompress(name string) error {
return err
}
dw := bufio.NewWriter(df)
var word []byte = make([]byte, 0, 256)
var word = make([]byte, 0, 256)
numBuf := make([]byte, binary.MaxVarintLen64)
var decodeTime time.Duration
g := d.MakeGetter()

View File

@ -24,6 +24,7 @@ import (
)
func TestTxPoolContent(t *testing.T) {
t.Skipf("will restore now")
m, require := stages.Mock(t), require.New(t)
chain, err := core.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, 1, func(i int, b *core.BlockGen) {
b.SetCoinbase(common.Address{1})

View File

@ -92,10 +92,10 @@ func promoteCallTraces(logPrefix string, tx kv.RwTx, startBlock, endBlock uint64
froms := map[string]*roaring64.Bitmap{}
tos := map[string]*roaring64.Bitmap{}
collectorFrom := etl.NewCollector(tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer collectorFrom.Close(logPrefix)
collectorTo := etl.NewCollector(tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer collectorTo.Close(logPrefix)
collectorFrom := etl.NewCollector(logPrefix, tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer collectorFrom.Close()
collectorTo := etl.NewCollector(logPrefix, tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer collectorTo.Close()
checkFlushEvery := time.NewTicker(flushEvery)
defer checkFlushEvery.Stop()
@ -218,10 +218,10 @@ func finaliseCallTraces(collectorFrom, collectorTo *etl.Collector, logPrefix str
}
return nil
}
if err := collectorFrom.Load(logPrefix, tx, kv.CallFromIndex, loaderFunc, etl.TransformArgs{Quit: quit}); err != nil {
if err := collectorFrom.Load(tx, kv.CallFromIndex, loaderFunc, etl.TransformArgs{Quit: quit}); err != nil {
return err
}
if err := collectorTo.Load(logPrefix, tx, kv.CallToIndex, loaderFunc, etl.TransformArgs{Quit: quit}); err != nil {
if err := collectorTo.Load(tx, kv.CallToIndex, loaderFunc, etl.TransformArgs{Quit: quit}); err != nil {
return err
}
return nil
@ -262,8 +262,8 @@ func UnwindCallTraces(u *UnwindState, s *StageState, tx kv.RwTx, cfg CallTracesC
}
func DoUnwindCallTraces(logPrefix string, db kv.RwTx, from, to uint64, ctx context.Context, tmpdir string) error {
froms := etl.NewCollector(tmpdir, etl.NewOldestEntryBuffer(etl.BufferOptimalSize))
tos := etl.NewCollector(tmpdir, etl.NewOldestEntryBuffer(etl.BufferOptimalSize))
froms := etl.NewCollector(logPrefix, tmpdir, etl.NewOldestEntryBuffer(etl.BufferOptimalSize))
tos := etl.NewCollector(logPrefix, tmpdir, etl.NewOldestEntryBuffer(etl.BufferOptimalSize))
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
@ -315,13 +315,13 @@ func DoUnwindCallTraces(logPrefix string, db kv.RwTx, from, to uint64, ctx conte
}
}
if err = froms.Load(logPrefix, db, "", func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
if err = froms.Load(db, "", func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
return bitmapdb.TruncateRange64(db, kv.CallFromIndex, k, to+1)
}, etl.TransformArgs{}); err != nil {
return fmt.Errorf("TruncateRange: bucket=%s, %w", kv.CallFromIndex, err)
}
if err = tos.Load(logPrefix, db, "", func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
if err = tos.Load(db, "", func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
return bitmapdb.TruncateRange64(db, kv.CallToIndex, k, to+1)
}, etl.TransformArgs{}); err != nil {
return fmt.Errorf("TruncateRange: bucket=%s, %w", kv.CallFromIndex, err)
@ -418,8 +418,8 @@ func pruneCallTraces(tx kv.RwTx, logPrefix string, pruneTo uint64, ctx context.C
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()
froms := etl.NewCollector(tmpdir, etl.NewOldestEntryBuffer(etl.BufferOptimalSize))
tos := etl.NewCollector(tmpdir, etl.NewOldestEntryBuffer(etl.BufferOptimalSize))
froms := etl.NewCollector(logPrefix, tmpdir, etl.NewOldestEntryBuffer(etl.BufferOptimalSize))
tos := etl.NewCollector(logPrefix, tmpdir, etl.NewOldestEntryBuffer(etl.BufferOptimalSize))
{
traceCursor, err := tx.CursorDupSort(kv.CallTraceSet)
@ -470,7 +470,7 @@ func pruneCallTraces(tx kv.RwTx, logPrefix string, pruneTo uint64, ctx context.C
}
defer c.Close()
if err := froms.Load(logPrefix, tx, "", func(from, _ []byte, _ etl.CurrentTableReader, _ etl.LoadNextFunc) error {
if err := froms.Load(tx, "", func(from, _ []byte, _ etl.CurrentTableReader, _ etl.LoadNextFunc) error {
for k, _, err := c.Seek(from); k != nil; k, _, err = c.Next() {
if err != nil {
return err
@ -502,7 +502,7 @@ func pruneCallTraces(tx kv.RwTx, logPrefix string, pruneTo uint64, ctx context.C
}
defer c.Close()
if err := tos.Load(logPrefix, tx, "", func(to, _ []byte, _ etl.CurrentTableReader, _ etl.LoadNextFunc) error {
if err := tos.Load(tx, "", func(to, _ []byte, _ etl.CurrentTableReader, _ etl.LoadNextFunc) error {
for k, _, err := c.Seek(to); k != nil; k, _, err = c.Next() {
if err != nil {
return err

View File

@ -425,14 +425,14 @@ func unwindExecutionStage(u *UnwindState, s *StageState, tx kv.RwTx, quit <-chan
accumulator.StartChange(u.UnwindPoint, hash, txs, true)
}
changes := etl.NewCollector(cfg.tmpdir, etl.NewOldestEntryBuffer(etl.BufferOptimalSize))
defer changes.Close(logPrefix)
changes := etl.NewCollector(logPrefix, cfg.tmpdir, etl.NewOldestEntryBuffer(etl.BufferOptimalSize))
defer changes.Close()
errRewind := changeset.RewindData(tx, s.BlockNumber, u.UnwindPoint, changes, quit)
if errRewind != nil {
return fmt.Errorf("getting rewind data: %w", errRewind)
}
if err := changes.Load(logPrefix, tx, stateBucket, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
if err := changes.Load(tx, stateBucket, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
if len(k) == 20 {
if len(v) > 0 {
var acc accounts.Account

View File

@ -140,8 +140,8 @@ func promoteHistory(logPrefix string, tx kv.RwTx, changesetBucket string, start,
checkFlushEvery := time.NewTicker(cfg.flushEvery)
defer checkFlushEvery.Stop()
collectorUpdates := etl.NewCollector(cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer collectorUpdates.Close(logPrefix)
collectorUpdates := etl.NewCollector(logPrefix, cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer collectorUpdates.Close()
if err := changeset.ForRange(tx, changesetBucket, start, stop, func(blockN uint64, k, v []byte) error {
if err := libcommon.Stopped(quit); err != nil {
@ -219,7 +219,7 @@ func promoteHistory(logPrefix string, tx kv.RwTx, changesetBucket string, start,
return nil
}
if err := collectorUpdates.Load(logPrefix, tx, changeset.Mapper[changesetBucket].IndexBucket, loaderFunc, etl.TransformArgs{Quit: quit}); err != nil {
if err := collectorUpdates.Load(tx, changeset.Mapper[changesetBucket].IndexBucket, loaderFunc, etl.TransformArgs{Quit: quit}); err != nil {
return err
}
return nil
@ -416,8 +416,8 @@ func pruneHistoryIndex(tx kv.RwTx, csTable, logPrefix, tmpDir string, pruneTo ui
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()
collector := etl.NewCollector(tmpDir, etl.NewOldestEntryBuffer(etl.BufferOptimalSize))
defer collector.Close(logPrefix)
collector := etl.NewCollector(logPrefix, tmpDir, etl.NewOldestEntryBuffer(etl.BufferOptimalSize))
defer collector.Close()
if err := changeset.ForRange(tx, csTable, 0, pruneTo, func(blockNum uint64, k, _ []byte) error {
select {
@ -442,7 +442,7 @@ func pruneHistoryIndex(tx kv.RwTx, csTable, logPrefix, tmpDir string, pruneTo ui
if csTable == kv.StorageChangeSet {
prefixLen = length.Hash
}
if err := collector.Load(logPrefix, tx, "", func(addr, _ []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
if err := collector.Load(tx, "", func(addr, _ []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
select {
case <-logEvery.C:
log.Info(fmt.Sprintf("[%s]", logPrefix), "table", changeset.Mapper[csTable].IndexBucket, "key", fmt.Sprintf("%x", addr))

View File

@ -119,12 +119,12 @@ func RegenerateIntermediateHashes(logPrefix string, db kv.RwTx, cfg TrieCfg, exp
_ = db.ClearBucket(kv.TrieOfAccounts)
_ = db.ClearBucket(kv.TrieOfStorage)
accTrieCollector := etl.NewCollector(cfg.tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer accTrieCollector.Close(logPrefix)
accTrieCollector := etl.NewCollector(logPrefix, cfg.tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer accTrieCollector.Close()
accTrieCollectorFunc := accountTrieCollector(accTrieCollector)
stTrieCollector := etl.NewCollector(cfg.tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer stTrieCollector.Close(logPrefix)
stTrieCollector := etl.NewCollector(logPrefix, cfg.tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer stTrieCollector.Close()
stTrieCollectorFunc := storageTrieCollector(stTrieCollector)
loader := trie.NewFlatDBTrieLoader(logPrefix)
@ -141,10 +141,10 @@ func RegenerateIntermediateHashes(logPrefix string, db kv.RwTx, cfg TrieCfg, exp
}
log.Info(fmt.Sprintf("[%s] Trie root", logPrefix), "hash", hash.Hex())
if err := accTrieCollector.Load(logPrefix, db, kv.TrieOfAccounts, etl.IdentityLoadFunc, etl.TransformArgs{Quit: quit}); err != nil {
if err := accTrieCollector.Load(db, kv.TrieOfAccounts, etl.IdentityLoadFunc, etl.TransformArgs{Quit: quit}); err != nil {
return trie.EmptyRoot, err
}
if err := stTrieCollector.Load(logPrefix, db, kv.TrieOfStorage, etl.IdentityLoadFunc, etl.TransformArgs{Quit: quit}); err != nil {
if err := stTrieCollector.Load(db, kv.TrieOfStorage, etl.IdentityLoadFunc, etl.TransformArgs{Quit: quit}); err != nil {
return trie.EmptyRoot, err
}
return hash, nil
@ -353,12 +353,12 @@ func incrementIntermediateHashes(logPrefix string, s *StageState, db kv.RwTx, to
return trie.EmptyRoot, err
}
accTrieCollector := etl.NewCollector(cfg.tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer accTrieCollector.Close(logPrefix)
accTrieCollector := etl.NewCollector(logPrefix, cfg.tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer accTrieCollector.Close()
accTrieCollectorFunc := accountTrieCollector(accTrieCollector)
stTrieCollector := etl.NewCollector(cfg.tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer stTrieCollector.Close(logPrefix)
stTrieCollector := etl.NewCollector(logPrefix, cfg.tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer stTrieCollector.Close()
stTrieCollectorFunc := storageTrieCollector(stTrieCollector)
loader := trie.NewFlatDBTrieLoader(logPrefix)
@ -374,10 +374,10 @@ func incrementIntermediateHashes(logPrefix string, s *StageState, db kv.RwTx, to
return hash, nil
}
if err := accTrieCollector.Load(logPrefix, db, kv.TrieOfAccounts, etl.IdentityLoadFunc, etl.TransformArgs{Quit: quit}); err != nil {
if err := accTrieCollector.Load(db, kv.TrieOfAccounts, etl.IdentityLoadFunc, etl.TransformArgs{Quit: quit}); err != nil {
return trie.EmptyRoot, err
}
if err := stTrieCollector.Load(logPrefix, db, kv.TrieOfStorage, etl.IdentityLoadFunc, etl.TransformArgs{Quit: quit}); err != nil {
if err := stTrieCollector.Load(db, kv.TrieOfStorage, etl.IdentityLoadFunc, etl.TransformArgs{Quit: quit}); err != nil {
return trie.EmptyRoot, err
}
return hash, nil
@ -432,12 +432,12 @@ func unwindIntermediateHashesStageImpl(logPrefix string, u *UnwindState, s *Stag
return err
}
accTrieCollector := etl.NewCollector(cfg.tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer accTrieCollector.Close(logPrefix)
accTrieCollector := etl.NewCollector(logPrefix, cfg.tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer accTrieCollector.Close()
accTrieCollectorFunc := accountTrieCollector(accTrieCollector)
stTrieCollector := etl.NewCollector(cfg.tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer stTrieCollector.Close(logPrefix)
stTrieCollector := etl.NewCollector(logPrefix, cfg.tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer stTrieCollector.Close()
stTrieCollectorFunc := storageTrieCollector(stTrieCollector)
loader := trie.NewFlatDBTrieLoader(logPrefix)
@ -452,10 +452,10 @@ func unwindIntermediateHashesStageImpl(logPrefix string, u *UnwindState, s *Stag
return fmt.Errorf("wrong trie root: %x, expected (from header): %x", hash, expectedRootHash)
}
log.Info(fmt.Sprintf("[%s] Trie root", logPrefix), "hash", hash.Hex())
if err := accTrieCollector.Load(logPrefix, db, kv.TrieOfAccounts, etl.IdentityLoadFunc, etl.TransformArgs{Quit: quit}); err != nil {
if err := accTrieCollector.Load(db, kv.TrieOfAccounts, etl.IdentityLoadFunc, etl.TransformArgs{Quit: quit}); err != nil {
return err
}
if err := stTrieCollector.Load(logPrefix, db, kv.TrieOfStorage, etl.IdentityLoadFunc, etl.TransformArgs{Quit: quit}); err != nil {
if err := stTrieCollector.Load(db, kv.TrieOfStorage, etl.IdentityLoadFunc, etl.TransformArgs{Quit: quit}); err != nil {
return err
}
return nil

View File

@ -105,10 +105,10 @@ func promoteLogIndex(logPrefix string, tx kv.RwTx, start uint64, cfg LogIndexCfg
checkFlushEvery := time.NewTicker(cfg.flushEvery)
defer checkFlushEvery.Stop()
collectorTopics := etl.NewCollector(cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer collectorTopics.Close(logPrefix)
collectorAddrs := etl.NewCollector(cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer collectorAddrs.Close(logPrefix)
collectorTopics := etl.NewCollector(logPrefix, cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer collectorTopics.Close()
collectorAddrs := etl.NewCollector(logPrefix, cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer collectorAddrs.Close()
reader := bytes.NewReader(nil)
@ -212,11 +212,11 @@ func promoteLogIndex(logPrefix string, tx kv.RwTx, start uint64, cfg LogIndexCfg
})
}
if err := collectorTopics.Load(logPrefix, tx, kv.LogTopicIndex, loaderFunc, etl.TransformArgs{Quit: quit}); err != nil {
if err := collectorTopics.Load(tx, kv.LogTopicIndex, loaderFunc, etl.TransformArgs{Quit: quit}); err != nil {
return err
}
if err := collectorAddrs.Load(logPrefix, tx, kv.LogAddressIndex, loaderFunc, etl.TransformArgs{Quit: quit}); err != nil {
if err := collectorAddrs.Load(tx, kv.LogAddressIndex, loaderFunc, etl.TransformArgs{Quit: quit}); err != nil {
return err
}

View File

@ -133,8 +133,8 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R
}(i)
}
collectorSenders := etl.NewCollector(cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer collectorSenders.Close(logPrefix)
collectorSenders := etl.NewCollector(logPrefix, cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer collectorSenders.Close()
errCh := make(chan senderRecoveryError)
go func() {
@ -242,16 +242,12 @@ Loop:
u.UnwindTo(minBlockNum-1, minBlockHash)
}
} else {
if err := collectorSenders.Load(logPrefix, tx,
kv.Senders,
etl.IdentityLoadFunc,
etl.TransformArgs{
Quit: quitCh,
LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"block", binary.BigEndian.Uint64(k)}
},
if err := collectorSenders.Load(tx, kv.Senders, etl.IdentityLoadFunc, etl.TransformArgs{
Quit: quitCh,
LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"block", binary.BigEndian.Uint64(k)}
},
); err != nil {
}); err != nil {
return err
}
if err = s.Update(tx, to); err != nil {

2
go.mod
View File

@ -36,7 +36,7 @@ require (
github.com/json-iterator/go v1.1.12
github.com/julienschmidt/httprouter v1.3.0
github.com/kevinburke/go-bindata v3.21.0+incompatible
github.com/ledgerwatch/erigon-lib v0.0.0-20211025014904-70c39cd195c2
github.com/ledgerwatch/erigon-lib v0.0.0-20211025021021-86df933235a9
github.com/ledgerwatch/log/v3 v3.3.1
github.com/ledgerwatch/secp256k1 v0.0.0-20210626115225-cd5cd00ed72d
github.com/logrusorgru/aurora/v3 v3.0.0

4
go.sum
View File

@ -497,8 +497,8 @@ github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3P
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20211025014904-70c39cd195c2 h1:9Yf6jjh+6nvAAEHp5Fxybe8hXFf09kb7Bp5/zJ+aEtI=
github.com/ledgerwatch/erigon-lib v0.0.0-20211025014904-70c39cd195c2/go.mod h1:+Dgxlx2A74QQAp1boH34onSy3k/qDftDiWvGXGC+5FA=
github.com/ledgerwatch/erigon-lib v0.0.0-20211025021021-86df933235a9 h1:fOiHE2B2EF3cTQvIiVDyOm5a/wDQAWOQmaaitsvx7U8=
github.com/ledgerwatch/erigon-lib v0.0.0-20211025021021-86df933235a9/go.mod h1:+Dgxlx2A74QQAp1boH34onSy3k/qDftDiWvGXGC+5FA=
github.com/ledgerwatch/log/v3 v3.3.1 h1:HmvLeTEvtCtqSvtu4t/a5MAdcLfeBcbIeowXbLYuzLc=
github.com/ledgerwatch/log/v3 v3.3.1/go.mod h1:S3VJqhhVX32rbp1JyyvhJou12twtFwNEPESBgpbNkRk=
github.com/ledgerwatch/secp256k1 v0.0.0-20210626115225-cd5cd00ed72d h1:/IKMrJdfRsoYNc36PXqP4xMH3vhW/8IQyBKGQbKZUno=

View File

@ -40,15 +40,15 @@ var headerPrefixToSeparateBuckets = Migration{
logPrefix := "split_header_prefix_bucket"
const loadStep = "load"
canonicalCollector, err := etl.NewCollectorFromFiles(tmpdir + "canonical")
canonicalCollector, err := etl.NewCollectorFromFiles(logPrefix, tmpdir+"canonical")
if err != nil {
return err
}
tdCollector, err := etl.NewCollectorFromFiles(tmpdir + "td")
tdCollector, err := etl.NewCollectorFromFiles(logPrefix, tmpdir+"td")
if err != nil {
return err
}
headersCollector, err := etl.NewCollectorFromFiles(tmpdir + "headers")
headersCollector, err := etl.NewCollectorFromFiles(logPrefix, tmpdir+"headers")
if err != nil {
return err
}
@ -57,16 +57,16 @@ var headerPrefixToSeparateBuckets = Migration{
case "":
// can't use files if progress field not set, clear them
if canonicalCollector != nil {
canonicalCollector.Close(logPrefix)
canonicalCollector.Close()
canonicalCollector = nil
}
if tdCollector != nil {
tdCollector.Close(logPrefix)
tdCollector.Close()
tdCollector = nil
}
if headersCollector != nil {
headersCollector.Close(logPrefix)
headersCollector.Close()
headersCollector = nil
}
case loadStep:
@ -81,16 +81,16 @@ var headerPrefixToSeparateBuckets = Migration{
if rec := recover(); rec != nil {
panic(rec)
}
canonicalCollector.Close(logPrefix)
tdCollector.Close(logPrefix)
headersCollector.Close(logPrefix)
canonicalCollector.Close()
tdCollector.Close()
headersCollector.Close()
}()
goto LoadStep
}
canonicalCollector = etl.NewCriticalCollector(tmpdir+"canonical", etl.NewSortableBuffer(etl.BufferOptimalSize*4))
tdCollector = etl.NewCriticalCollector(tmpdir+"td", etl.NewSortableBuffer(etl.BufferOptimalSize*4))
headersCollector = etl.NewCriticalCollector(tmpdir+"headers", etl.NewSortableBuffer(etl.BufferOptimalSize*4))
canonicalCollector = etl.NewCriticalCollector(logPrefix, tmpdir+"canonical", etl.NewSortableBuffer(etl.BufferOptimalSize*4))
tdCollector = etl.NewCriticalCollector(logPrefix, tmpdir+"td", etl.NewSortableBuffer(etl.BufferOptimalSize*4))
headersCollector = etl.NewCriticalCollector(logPrefix, tmpdir+"headers", etl.NewSortableBuffer(etl.BufferOptimalSize*4))
defer func() {
// don't clean if error or panic happened
if err != nil {
@ -99,9 +99,9 @@ var headerPrefixToSeparateBuckets = Migration{
if rec := recover(); rec != nil {
panic(rec)
}
canonicalCollector.Close(logPrefix)
tdCollector.Close(logPrefix)
headersCollector.Close(logPrefix)
canonicalCollector.Close()
tdCollector.Close()
headersCollector.Close()
}()
err = tx.ForEach(kv.HeaderPrefixOld, []byte{}, func(k, v []byte) error {
@ -127,13 +127,13 @@ var headerPrefixToSeparateBuckets = Migration{
LoadStep:
// Now transaction would have been re-opened, and we should be re-using the space
if err = canonicalCollector.Load(logPrefix, tx, kv.HeaderCanonical, etl.IdentityLoadFunc, etl.TransformArgs{}); err != nil {
if err = canonicalCollector.Load(tx, kv.HeaderCanonical, etl.IdentityLoadFunc, etl.TransformArgs{}); err != nil {
return fmt.Errorf("loading the transformed data back into the storage table: %w", err)
}
if err = tdCollector.Load(logPrefix, tx, kv.HeaderTD, etl.IdentityLoadFunc, etl.TransformArgs{}); err != nil {
if err = tdCollector.Load(tx, kv.HeaderTD, etl.IdentityLoadFunc, etl.TransformArgs{}); err != nil {
return fmt.Errorf("loading the transformed data back into the acc table: %w", err)
}
if err = headersCollector.Load(logPrefix, tx, kv.Headers, etl.IdentityLoadFunc, etl.TransformArgs{}); err != nil {
if err = headersCollector.Load(tx, kv.Headers, etl.IdentityLoadFunc, etl.TransformArgs{}); err != nil {
return fmt.Errorf("loading the transformed data back into the acc table: %w", err)
}
if err := BeforeCommit(tx, nil, true); err != nil {

View File

@ -167,10 +167,10 @@ func RemoveBlocksData(db kv.RoDB, tx kv.RwTx, newSnapshot uint64) (err error) {
}
logPrefix := "RemoveBlocksData"
bodiesCollector := etl.NewCollector(os.TempDir(), etl.NewSortableBuffer(etl.BufferOptimalSize))
defer bodiesCollector.Close(logPrefix)
ethTXCollector := etl.NewCollector(os.TempDir(), etl.NewSortableBuffer(etl.BufferOptimalSize))
defer ethTXCollector.Close(logPrefix)
bodiesCollector := etl.NewCollector(logPrefix, os.TempDir(), etl.NewSortableBuffer(etl.BufferOptimalSize))
defer bodiesCollector.Close()
ethTXCollector := etl.NewCollector(logPrefix, os.TempDir(), etl.NewSortableBuffer(etl.BufferOptimalSize))
defer ethTXCollector.Close()
err = ethdb.Walk(blockBodyWriteCursor, dbutils.BlockBodyKey(0, common.Hash{}), 0, func(k, v []byte) (bool, error) {
if binary.BigEndian.Uint64(k) > newSnapshot {
return false, nil
@ -247,12 +247,12 @@ func RemoveBlocksData(db kv.RoDB, tx kv.RwTx, newSnapshot uint64) (err error) {
if err != nil {
return err
}
err = bodiesCollector.Load("bodies", writeTX, kv.BlockBody, etl.IdentityLoadFunc, etl.TransformArgs{})
err = bodiesCollector.Load(writeTX, kv.BlockBody, etl.IdentityLoadFunc, etl.TransformArgs{})
if err != nil {
return err
}
err = ethTXCollector.Load("ethtx", writeTX, kv.EthTx, etl.IdentityLoadFunc, etl.TransformArgs{})
err = ethTXCollector.Load(writeTX, kv.EthTx, etl.IdentityLoadFunc, etl.TransformArgs{})
if err != nil {
return err
}