Sn: parallel files indexing progress logs (#4925)

This commit is contained in:
Alex Sharov 2022-08-04 12:31:25 +07:00 committed by GitHub
parent eb07869065
commit 1413b1c37c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 58 additions and 33 deletions

2
go.mod
View File

@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon
go 1.18
require (
github.com/ledgerwatch/erigon-lib v0.0.0-20220803080736-0c9ada1ab0e8
github.com/ledgerwatch/erigon-lib v0.0.0-20220804052017-e00fde454008
github.com/ledgerwatch/erigon-snapshot v1.0.0
github.com/ledgerwatch/log/v3 v3.4.1
github.com/ledgerwatch/secp256k1 v1.0.0

4
go.sum
View File

@ -390,8 +390,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20220803080736-0c9ada1ab0e8 h1:1VEKTAZH1NSsTo1RQntMKB6OhYB6y8wdJ3tDVlMygLY=
github.com/ledgerwatch/erigon-lib v0.0.0-20220803080736-0c9ada1ab0e8/go.mod h1:fErfZoeVWbOKpClW3RL8jsmoHzSxbr4RRCHhDamzbE4=
github.com/ledgerwatch/erigon-lib v0.0.0-20220804052017-e00fde454008 h1:tYXcq502Lrvu2ALN5fuA9jkmeg4rnOcv57vsdE8jcYE=
github.com/ledgerwatch/erigon-lib v0.0.0-20220804052017-e00fde454008/go.mod h1:fErfZoeVWbOKpClW3RL8jsmoHzSxbr4RRCHhDamzbE4=
github.com/ledgerwatch/erigon-snapshot v1.0.0 h1:bp/7xoPdM5lK7LFdqEMH008RZmqxMZV0RUVEQiWs7v4=
github.com/ledgerwatch/erigon-snapshot v1.0.0/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
github.com/ledgerwatch/log/v3 v3.4.1 h1:/xGwlVulXnsO9Uq+tzaExc8OWmXXHU0dnLalpbnY5Bc=

View File

@ -16,6 +16,7 @@ import (
"github.com/holiman/uint256"
common2 "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/background"
"github.com/ledgerwatch/erigon-lib/common/cmp"
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/erigon-lib/compress"
@ -728,19 +729,19 @@ func (s *RoSnapshots) ViewTxs(blockNum uint64, f func(sn *TxnSegment) error) (fo
return s.Txs.ViewSegment(blockNum, f)
}
func buildIdx(ctx context.Context, sn snap.FileInfo, chainID uint256.Int, tmpDir string, progress *atomic.Uint64, lvl log.Lvl) error {
func buildIdx(ctx context.Context, sn snap.FileInfo, chainID uint256.Int, tmpDir string, p *background.Progress, lvl log.Lvl) error {
switch sn.T {
case snap.Headers:
if err := HeadersIdx(ctx, sn.Path, sn.From, tmpDir, lvl); err != nil {
if err := HeadersIdx(ctx, sn.Path, sn.From, tmpDir, p, lvl); err != nil {
return err
}
case snap.Bodies:
if err := BodiesIdx(ctx, sn.Path, sn.From, tmpDir, lvl); err != nil {
if err := BodiesIdx(ctx, sn.Path, sn.From, tmpDir, p, lvl); err != nil {
return err
}
case snap.Transactions:
dir, _ := filepath.Split(sn.Path)
if err := TransactionsIdx(ctx, chainID, sn.From, sn.To, dir, tmpDir, progress, lvl); err != nil {
if err := TransactionsIdx(ctx, chainID, sn.From, sn.To, dir, tmpDir, p, lvl); err != nil {
return err
}
}
@ -749,7 +750,7 @@ func buildIdx(ctx context.Context, sn snap.FileInfo, chainID uint256.Int, tmpDir
func BuildMissedIndices(ctx context.Context, dir string, chainID uint256.Int, tmpDir string, workers int, lvl log.Lvl) error {
//log.Log(lvl, "[snapshots] Build indices", "from", min)
logEvery := time.NewTicker(20 * time.Second)
logEvery := time.NewTicker(60 * time.Second)
defer logEvery.Stop()
segments, _, err := Segments(dir)
if err != nil {
@ -757,9 +758,8 @@ func BuildMissedIndices(ctx context.Context, dir string, chainID uint256.Int, tm
}
errs := make(chan error, 1024)
wg := &sync.WaitGroup{}
currentFile := atomic.NewString("")
ps := background.NewProgressSet()
sem := semaphore.NewWeighted(int64(workers))
progress := atomic.NewUint64(0)
go func() {
for _, t := range snap.AllSnapshotTypes {
for _, sn := range segments {
@ -769,28 +769,28 @@ func BuildMissedIndices(ctx context.Context, dir string, chainID uint256.Int, tm
if hasIdxFile(&sn) {
continue
}
wg.Add(1)
if err := sem.Acquire(ctx, 1); err != nil {
errs <- err
return
}
wg.Add(1)
go func(sn snap.FileInfo) {
defer sem.Release(1)
defer wg.Done()
currentFile.Store(snap.SegmentFileName(sn.From, sn.To, sn.T))
if err := buildIdx(ctx, sn, chainID, tmpDir, progress, lvl); err != nil {
p := &background.Progress{}
ps.Add(p)
defer ps.Delete(p)
if err := buildIdx(ctx, sn, chainID, tmpDir, p, lvl); err != nil {
errs <- err
}
}(sn)
}
}
}()
go func() {
wg.Wait()
close(errs)
}()
for {
select {
case err := <-errs:
@ -804,7 +804,7 @@ func BuildMissedIndices(ctx context.Context, dir string, chainID uint256.Int, tm
if lvl >= log.LvlInfo {
common2.ReadMemStats(&m)
}
log.Log(lvl, "[snapshots] Indexing", "file", currentFile.Load(), "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys))
log.Log(lvl, "[snapshots] Indexing", "progress", ps.String(), "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys))
}
}
}
@ -1136,28 +1136,33 @@ func DumpBlocks(ctx context.Context, blockFrom, blockTo, blocksPerFile uint64, t
}
func dumpBlocksRange(ctx context.Context, blockFrom, blockTo uint64, tmpDir, snapDir string, chainDB kv.RoDB, chainID uint256.Int, workers int, lvl log.Lvl) error {
progress := atomic.NewUint64(0)
f, _ := snap.ParseFileName(snapDir, snap.SegmentFileName(blockFrom, blockTo, snap.Headers))
segName := snap.SegmentFileName(blockFrom, blockTo, snap.Headers)
f, _ := snap.ParseFileName(snapDir, segName)
if err := DumpHeaders(ctx, chainDB, f.Path, tmpDir, blockFrom, blockTo, workers, lvl); err != nil {
return fmt.Errorf("DumpHeaders: %w", err)
}
if err := buildIdx(ctx, f, chainID, tmpDir, progress, lvl); err != nil {
p := &background.Progress{}
if err := buildIdx(ctx, f, chainID, tmpDir, p, lvl); err != nil {
return err
}
f, _ = snap.ParseFileName(snapDir, snap.SegmentFileName(blockFrom, blockTo, snap.Bodies))
segName = snap.SegmentFileName(blockFrom, blockTo, snap.Bodies)
f, _ = snap.ParseFileName(snapDir, segName)
if err := DumpBodies(ctx, chainDB, f.Path, tmpDir, blockFrom, blockTo, workers, lvl); err != nil {
return fmt.Errorf("DumpBodies: %w", err)
}
if err := buildIdx(ctx, f, chainID, tmpDir, progress, lvl); err != nil {
p = &background.Progress{}
if err := buildIdx(ctx, f, chainID, tmpDir, p, lvl); err != nil {
return err
}
f, _ = snap.ParseFileName(snapDir, snap.SegmentFileName(blockFrom, blockTo, snap.Transactions))
segName = snap.SegmentFileName(blockFrom, blockTo, snap.Transactions)
f, _ = snap.ParseFileName(snapDir, segName)
if _, err := DumpTxs(ctx, chainDB, f.Path, tmpDir, blockFrom, blockTo, workers, lvl); err != nil {
return fmt.Errorf("DumpTxs: %w", err)
}
if err := buildIdx(ctx, f, chainID, tmpDir, progress, lvl); err != nil {
p = &background.Progress{}
if err := buildIdx(ctx, f, chainID, tmpDir, p, lvl); err != nil {
return err
}
@ -1535,7 +1540,7 @@ func expectedTxsAmount(snapDir string, blockFrom, blockTo uint64) (firstTxID, ex
return
}
func TransactionsIdx(ctx context.Context, chainID uint256.Int, blockFrom, blockTo uint64, snapDir string, tmpDir string, progress *atomic.Uint64, lvl log.Lvl) (err error) {
func TransactionsIdx(ctx context.Context, chainID uint256.Int, blockFrom, blockTo uint64, snapDir string, tmpDir string, p *background.Progress, lvl log.Lvl) (err error) {
defer func() {
if rec := recover(); rec != nil {
err = fmt.Errorf("TransactionsIdx: at=%d-%d, %v, %s", blockFrom, blockTo, rec, dbg.Stack())
@ -1553,7 +1558,8 @@ func TransactionsIdx(ctx context.Context, chainID uint256.Int, blockFrom, blockT
}
defer bodiesSegment.Close()
segmentFilePath := filepath.Join(snapDir, snap.SegmentFileName(blockFrom, blockTo, snap.Transactions))
segFileName := snap.SegmentFileName(blockFrom, blockTo, snap.Transactions)
segmentFilePath := filepath.Join(snapDir, segFileName)
d, err := compress.NewDecompressor(segmentFilePath)
if err != nil {
return err
@ -1562,6 +1568,8 @@ func TransactionsIdx(ctx context.Context, chainID uint256.Int, blockFrom, blockT
if uint64(d.Count()) != expectedCount {
panic(fmt.Errorf("expect: %d, got %d\n", expectedCount, d.Count()))
}
p.Name.Store(segFileName)
p.Total.Store(uint64(d.Count() * 2))
txnHashIdx, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{
KeyCount: d.Count(),
@ -1589,7 +1597,11 @@ func TransactionsIdx(ctx context.Context, chainID uint256.Int, blockFrom, blockT
if err != nil {
return err
}
txnHashIdx.LogLvl(log.LvlDebug)
idxLogLvl := log.LvlDebug
if d.Count() > 1_000_000 {
idxLogLvl = log.LvlInfo
}
txnHashIdx.LogLvl(idxLogLvl)
txnHash2BlockNumIdx.LogLvl(log.LvlDebug)
parseCtx := types2.NewTxParseContext(chainID)
@ -1617,13 +1629,13 @@ RETRY:
}
for g.HasNext() {
p.Processed.Inc()
word, nextPos = g.Next(word[:0])
select {
case <-ctx.Done():
return ctx.Err()
default:
}
progress.Store(blockNum)
for body.BaseTxId+uint64(body.TxAmount) <= firstTxID+i { // skip empty blocks
if !bodyGetter.HasNext() {
@ -1654,6 +1666,7 @@ RETRY:
i++
offset = nextPos
}
if i != expectedCount {
@ -1677,12 +1690,13 @@ RETRY:
}
return err
}
p.Processed.Store(p.Total.Load())
return nil
}
// HeadersIdx - headerHash -> offset (analog of kv.HeaderNumber)
func HeadersIdx(ctx context.Context, segmentFilePath string, firstBlockNumInSegment uint64, tmpDir string, lvl log.Lvl) (err error) {
func HeadersIdx(ctx context.Context, segmentFilePath string, firstBlockNumInSegment uint64, tmpDir string, p *background.Progress, lvl log.Lvl) (err error) {
defer func() {
if rec := recover(); rec != nil {
_, fName := filepath.Split(segmentFilePath)
@ -1696,9 +1710,14 @@ func HeadersIdx(ctx context.Context, segmentFilePath string, firstBlockNumInSegm
}
defer d.Close()
_, fname := filepath.Split(segmentFilePath)
p.Name.Store(fname)
p.Total.Store(uint64(d.Count()))
hasher := crypto.NewKeccakState()
var h common.Hash
if err := Idx(ctx, d, firstBlockNumInSegment, tmpDir, log.LvlDebug, func(idx *recsplit.RecSplit, i, offset uint64, word []byte) error {
p.Processed.Inc()
headerRlp := word[1:]
hasher.Reset()
hasher.Write(headerRlp)
@ -1713,7 +1732,7 @@ func HeadersIdx(ctx context.Context, segmentFilePath string, firstBlockNumInSegm
return nil
}
func BodiesIdx(ctx context.Context, segmentFilePath string, firstBlockNumInSegment uint64, tmpDir string, lvl log.Lvl) (err error) {
func BodiesIdx(ctx context.Context, segmentFilePath string, firstBlockNumInSegment uint64, tmpDir string, p *background.Progress, lvl log.Lvl) (err error) {
defer func() {
if rec := recover(); rec != nil {
_, fName := filepath.Split(segmentFilePath)
@ -1729,7 +1748,12 @@ func BodiesIdx(ctx context.Context, segmentFilePath string, firstBlockNumInSegme
}
defer d.Close()
_, fname := filepath.Split(segmentFilePath)
p.Name.Store(fname)
p.Total.Store(uint64(d.Count()))
if err := Idx(ctx, d, firstBlockNumInSegment, tmpDir, log.LvlDebug, func(idx *recsplit.RecSplit, i, offset uint64, word []byte) error {
p.Processed.Inc()
n := binary.PutUvarint(num, i)
if err := idx.AddKey(num[:n], offset); err != nil {
return err
@ -1939,13 +1963,14 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges
return err
}
for _, t := range snap.AllSnapshotTypes {
f, _ := snap.ParseFileName(snapDir, snap.SegmentFileName(r.from, r.to, t))
segName := snap.SegmentFileName(r.from, r.to, t)
f, _ := snap.ParseFileName(snapDir, segName)
if err := m.merge(ctx, toMerge[t], f.Path, logEvery); err != nil {
return fmt.Errorf("mergeByAppendSegments: %w", err)
}
if doIndex {
progress := atomic.NewUint64(0)
if err := buildIdx(ctx, f, m.chainID, m.tmpDir, progress, m.lvl); err != nil {
p := &background.Progress{}
if err := buildIdx(ctx, f, m.chainID, m.tmpDir, p, m.lvl); err != nil {
return err
}
}