diff --git a/go.mod b/go.mod index d71a9376f..c3aaafc5c 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon go 1.18 require ( - github.com/ledgerwatch/erigon-lib v0.0.0-20220803080736-0c9ada1ab0e8 + github.com/ledgerwatch/erigon-lib v0.0.0-20220804052017-e00fde454008 github.com/ledgerwatch/erigon-snapshot v1.0.0 github.com/ledgerwatch/log/v3 v3.4.1 github.com/ledgerwatch/secp256k1 v1.0.0 diff --git a/go.sum b/go.sum index 2e5c753c4..81ca94927 100644 --- a/go.sum +++ b/go.sum @@ -390,8 +390,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/ledgerwatch/erigon-lib v0.0.0-20220803080736-0c9ada1ab0e8 h1:1VEKTAZH1NSsTo1RQntMKB6OhYB6y8wdJ3tDVlMygLY= -github.com/ledgerwatch/erigon-lib v0.0.0-20220803080736-0c9ada1ab0e8/go.mod h1:fErfZoeVWbOKpClW3RL8jsmoHzSxbr4RRCHhDamzbE4= +github.com/ledgerwatch/erigon-lib v0.0.0-20220804052017-e00fde454008 h1:tYXcq502Lrvu2ALN5fuA9jkmeg4rnOcv57vsdE8jcYE= +github.com/ledgerwatch/erigon-lib v0.0.0-20220804052017-e00fde454008/go.mod h1:fErfZoeVWbOKpClW3RL8jsmoHzSxbr4RRCHhDamzbE4= github.com/ledgerwatch/erigon-snapshot v1.0.0 h1:bp/7xoPdM5lK7LFdqEMH008RZmqxMZV0RUVEQiWs7v4= github.com/ledgerwatch/erigon-snapshot v1.0.0/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo= github.com/ledgerwatch/log/v3 v3.4.1 h1:/xGwlVulXnsO9Uq+tzaExc8OWmXXHU0dnLalpbnY5Bc= diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go index 714f6ae49..950e317ef 100644 --- a/turbo/snapshotsync/block_snapshots.go +++ b/turbo/snapshotsync/block_snapshots.go @@ -16,6 +16,7 @@ import ( "github.com/holiman/uint256" common2 "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/common/background" "github.com/ledgerwatch/erigon-lib/common/cmp" "github.com/ledgerwatch/erigon-lib/common/dbg" "github.com/ledgerwatch/erigon-lib/compress" @@ -728,19 +729,19 @@ func (s *RoSnapshots) ViewTxs(blockNum uint64, f func(sn *TxnSegment) error) (fo return s.Txs.ViewSegment(blockNum, f) } -func buildIdx(ctx context.Context, sn snap.FileInfo, chainID uint256.Int, tmpDir string, progress *atomic.Uint64, lvl log.Lvl) error { +func buildIdx(ctx context.Context, sn snap.FileInfo, chainID uint256.Int, tmpDir string, p *background.Progress, lvl log.Lvl) error { switch sn.T { case snap.Headers: - if err := HeadersIdx(ctx, sn.Path, sn.From, tmpDir, lvl); err != nil { + if err := HeadersIdx(ctx, sn.Path, sn.From, tmpDir, p, lvl); err != nil { return err } case snap.Bodies: - if err := BodiesIdx(ctx, sn.Path, sn.From, tmpDir, lvl); err != nil { + if err := BodiesIdx(ctx, sn.Path, sn.From, tmpDir, p, lvl); err != nil { return err } case snap.Transactions: dir, _ := filepath.Split(sn.Path) - if err := TransactionsIdx(ctx, chainID, sn.From, sn.To, dir, tmpDir, progress, lvl); err != nil { + if err := TransactionsIdx(ctx, chainID, sn.From, sn.To, dir, tmpDir, p, lvl); err != nil { return err } } @@ -749,7 +750,7 @@ func buildIdx(ctx context.Context, sn snap.FileInfo, chainID uint256.Int, tmpDir func BuildMissedIndices(ctx context.Context, dir string, chainID uint256.Int, tmpDir string, workers int, lvl log.Lvl) error { //log.Log(lvl, "[snapshots] Build indices", "from", min) - logEvery := time.NewTicker(20 * time.Second) + logEvery := time.NewTicker(60 * time.Second) defer logEvery.Stop() segments, _, err := Segments(dir) if err != nil { @@ -757,9 +758,8 @@ func BuildMissedIndices(ctx context.Context, dir string, chainID uint256.Int, tm } errs := make(chan error, 1024) wg := &sync.WaitGroup{} - currentFile := atomic.NewString("") + ps := background.NewProgressSet() sem := semaphore.NewWeighted(int64(workers)) - progress := atomic.NewUint64(0) go func() { for _, t := range snap.AllSnapshotTypes { for _, sn := range segments { @@ -769,28 +769,28 @@ func BuildMissedIndices(ctx context.Context, dir string, chainID uint256.Int, tm if hasIdxFile(&sn) { continue } - wg.Add(1) if err := sem.Acquire(ctx, 1); err != nil { errs <- err return } + wg.Add(1) go func(sn snap.FileInfo) { defer sem.Release(1) defer wg.Done() - currentFile.Store(snap.SegmentFileName(sn.From, sn.To, sn.T)) - if err := buildIdx(ctx, sn, chainID, tmpDir, progress, lvl); err != nil { + p := &background.Progress{} + ps.Add(p) + defer ps.Delete(p) + if err := buildIdx(ctx, sn, chainID, tmpDir, p, lvl); err != nil { errs <- err } }(sn) } } - }() - - go func() { wg.Wait() close(errs) }() + for { select { case err := <-errs: @@ -804,7 +804,7 @@ func BuildMissedIndices(ctx context.Context, dir string, chainID uint256.Int, tm if lvl >= log.LvlInfo { common2.ReadMemStats(&m) } - log.Log(lvl, "[snapshots] Indexing", "file", currentFile.Load(), "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys)) + log.Log(lvl, "[snapshots] Indexing", "progress", ps.String(), "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys)) } } } @@ -1136,28 +1136,33 @@ func DumpBlocks(ctx context.Context, blockFrom, blockTo, blocksPerFile uint64, t } func dumpBlocksRange(ctx context.Context, blockFrom, blockTo uint64, tmpDir, snapDir string, chainDB kv.RoDB, chainID uint256.Int, workers int, lvl log.Lvl) error { - progress := atomic.NewUint64(0) - f, _ := snap.ParseFileName(snapDir, snap.SegmentFileName(blockFrom, blockTo, snap.Headers)) + segName := snap.SegmentFileName(blockFrom, blockTo, snap.Headers) + f, _ := snap.ParseFileName(snapDir, segName) if err := DumpHeaders(ctx, chainDB, f.Path, tmpDir, blockFrom, blockTo, workers, lvl); err != nil { return fmt.Errorf("DumpHeaders: %w", err) } - if err := buildIdx(ctx, f, chainID, tmpDir, progress, lvl); err != nil { + p := &background.Progress{} + if err := buildIdx(ctx, f, chainID, tmpDir, p, lvl); err != nil { return err } - f, _ = snap.ParseFileName(snapDir, snap.SegmentFileName(blockFrom, blockTo, snap.Bodies)) + segName = snap.SegmentFileName(blockFrom, blockTo, snap.Bodies) + f, _ = snap.ParseFileName(snapDir, segName) if err := DumpBodies(ctx, chainDB, f.Path, tmpDir, blockFrom, blockTo, workers, lvl); err != nil { return fmt.Errorf("DumpBodies: %w", err) } - if err := buildIdx(ctx, f, chainID, tmpDir, progress, lvl); err != nil { + p = &background.Progress{} + if err := buildIdx(ctx, f, chainID, tmpDir, p, lvl); err != nil { return err } - f, _ = snap.ParseFileName(snapDir, snap.SegmentFileName(blockFrom, blockTo, snap.Transactions)) + segName = snap.SegmentFileName(blockFrom, blockTo, snap.Transactions) + f, _ = snap.ParseFileName(snapDir, segName) if _, err := DumpTxs(ctx, chainDB, f.Path, tmpDir, blockFrom, blockTo, workers, lvl); err != nil { return fmt.Errorf("DumpTxs: %w", err) } - if err := buildIdx(ctx, f, chainID, tmpDir, progress, lvl); err != nil { + p = &background.Progress{} + if err := buildIdx(ctx, f, chainID, tmpDir, p, lvl); err != nil { return err } @@ -1535,7 +1540,7 @@ func expectedTxsAmount(snapDir string, blockFrom, blockTo uint64) (firstTxID, ex return } -func TransactionsIdx(ctx context.Context, chainID uint256.Int, blockFrom, blockTo uint64, snapDir string, tmpDir string, progress *atomic.Uint64, lvl log.Lvl) (err error) { +func TransactionsIdx(ctx context.Context, chainID uint256.Int, blockFrom, blockTo uint64, snapDir string, tmpDir string, p *background.Progress, lvl log.Lvl) (err error) { defer func() { if rec := recover(); rec != nil { err = fmt.Errorf("TransactionsIdx: at=%d-%d, %v, %s", blockFrom, blockTo, rec, dbg.Stack()) @@ -1553,7 +1558,8 @@ func TransactionsIdx(ctx context.Context, chainID uint256.Int, blockFrom, blockT } defer bodiesSegment.Close() - segmentFilePath := filepath.Join(snapDir, snap.SegmentFileName(blockFrom, blockTo, snap.Transactions)) + segFileName := snap.SegmentFileName(blockFrom, blockTo, snap.Transactions) + segmentFilePath := filepath.Join(snapDir, segFileName) d, err := compress.NewDecompressor(segmentFilePath) if err != nil { return err @@ -1562,6 +1568,8 @@ func TransactionsIdx(ctx context.Context, chainID uint256.Int, blockFrom, blockT if uint64(d.Count()) != expectedCount { panic(fmt.Errorf("expect: %d, got %d\n", expectedCount, d.Count())) } + p.Name.Store(segFileName) + p.Total.Store(uint64(d.Count() * 2)) txnHashIdx, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{ KeyCount: d.Count(), @@ -1589,7 +1597,11 @@ func TransactionsIdx(ctx context.Context, chainID uint256.Int, blockFrom, blockT if err != nil { return err } - txnHashIdx.LogLvl(log.LvlDebug) + idxLogLvl := log.LvlDebug + if d.Count() > 1_000_000 { + idxLogLvl = log.LvlInfo + } + txnHashIdx.LogLvl(idxLogLvl) txnHash2BlockNumIdx.LogLvl(log.LvlDebug) parseCtx := types2.NewTxParseContext(chainID) @@ -1617,13 +1629,13 @@ RETRY: } for g.HasNext() { + p.Processed.Inc() word, nextPos = g.Next(word[:0]) select { case <-ctx.Done(): return ctx.Err() default: } - progress.Store(blockNum) for body.BaseTxId+uint64(body.TxAmount) <= firstTxID+i { // skip empty blocks if !bodyGetter.HasNext() { @@ -1654,6 +1666,7 @@ RETRY: i++ offset = nextPos + } if i != expectedCount { @@ -1677,12 +1690,13 @@ RETRY: } return err } + p.Processed.Store(p.Total.Load()) return nil } // HeadersIdx - headerHash -> offset (analog of kv.HeaderNumber) -func HeadersIdx(ctx context.Context, segmentFilePath string, firstBlockNumInSegment uint64, tmpDir string, lvl log.Lvl) (err error) { +func HeadersIdx(ctx context.Context, segmentFilePath string, firstBlockNumInSegment uint64, tmpDir string, p *background.Progress, lvl log.Lvl) (err error) { defer func() { if rec := recover(); rec != nil { _, fName := filepath.Split(segmentFilePath) @@ -1696,9 +1710,14 @@ func HeadersIdx(ctx context.Context, segmentFilePath string, firstBlockNumInSegm } defer d.Close() + _, fname := filepath.Split(segmentFilePath) + p.Name.Store(fname) + p.Total.Store(uint64(d.Count())) + hasher := crypto.NewKeccakState() var h common.Hash if err := Idx(ctx, d, firstBlockNumInSegment, tmpDir, log.LvlDebug, func(idx *recsplit.RecSplit, i, offset uint64, word []byte) error { + p.Processed.Inc() headerRlp := word[1:] hasher.Reset() hasher.Write(headerRlp) @@ -1713,7 +1732,7 @@ func HeadersIdx(ctx context.Context, segmentFilePath string, firstBlockNumInSegm return nil } -func BodiesIdx(ctx context.Context, segmentFilePath string, firstBlockNumInSegment uint64, tmpDir string, lvl log.Lvl) (err error) { +func BodiesIdx(ctx context.Context, segmentFilePath string, firstBlockNumInSegment uint64, tmpDir string, p *background.Progress, lvl log.Lvl) (err error) { defer func() { if rec := recover(); rec != nil { _, fName := filepath.Split(segmentFilePath) @@ -1729,7 +1748,12 @@ func BodiesIdx(ctx context.Context, segmentFilePath string, firstBlockNumInSegme } defer d.Close() + _, fname := filepath.Split(segmentFilePath) + p.Name.Store(fname) + p.Total.Store(uint64(d.Count())) + if err := Idx(ctx, d, firstBlockNumInSegment, tmpDir, log.LvlDebug, func(idx *recsplit.RecSplit, i, offset uint64, word []byte) error { + p.Processed.Inc() n := binary.PutUvarint(num, i) if err := idx.AddKey(num[:n], offset); err != nil { return err @@ -1939,13 +1963,14 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges return err } for _, t := range snap.AllSnapshotTypes { - f, _ := snap.ParseFileName(snapDir, snap.SegmentFileName(r.from, r.to, t)) + segName := snap.SegmentFileName(r.from, r.to, t) + f, _ := snap.ParseFileName(snapDir, segName) if err := m.merge(ctx, toMerge[t], f.Path, logEvery); err != nil { return fmt.Errorf("mergeByAppendSegments: %w", err) } if doIndex { - progress := atomic.NewUint64(0) - if err := buildIdx(ctx, f, m.chainID, m.tmpDir, progress, m.lvl); err != nil { + p := &background.Progress{} + if err := buildIdx(ctx, f, m.chainID, m.tmpDir, p, m.lvl); err != nil { return err } }