snap: better log indexing (#4921)

This commit is contained in:
Alex Sharov 2022-08-04 09:39:29 +07:00 committed by GitHub
parent 3c7a2c4376
commit b1db36eb09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -728,7 +728,7 @@ func (s *RoSnapshots) ViewTxs(blockNum uint64, f func(sn *TxnSegment) error) (fo
return s.Txs.ViewSegment(blockNum, f)
}
func buildIdx(ctx context.Context, sn snap.FileInfo, chainID uint256.Int, tmpDir string, lvl log.Lvl) error {
func buildIdx(ctx context.Context, sn snap.FileInfo, chainID uint256.Int, tmpDir string, progress *atomic.Uint64, lvl log.Lvl) error {
switch sn.T {
case snap.Headers:
if err := HeadersIdx(ctx, sn.Path, sn.From, tmpDir, lvl); err != nil {
@ -740,7 +740,7 @@ func buildIdx(ctx context.Context, sn snap.FileInfo, chainID uint256.Int, tmpDir
}
case snap.Transactions:
dir, _ := filepath.Split(sn.Path)
if err := TransactionsIdx(ctx, chainID, sn.From, sn.To, dir, tmpDir, lvl); err != nil {
if err := TransactionsIdx(ctx, chainID, sn.From, sn.To, dir, tmpDir, progress, lvl); err != nil {
return err
}
}
@ -758,6 +758,7 @@ func BuildMissedIndices(ctx context.Context, dir string, chainID uint256.Int, tm
errs := make(chan error, 1024)
wg := &sync.WaitGroup{}
sem := semaphore.NewWeighted(int64(workers))
progress := atomic.NewUint64(0)
for _, t := range snap.AllSnapshotTypes {
for _, sn := range segments {
if sn.T != t {
@ -774,22 +775,9 @@ func BuildMissedIndices(ctx context.Context, dir string, chainID uint256.Int, tm
defer sem.Release(1)
defer wg.Done()
if err := buildIdx(ctx, sn, chainID, tmpDir, lvl); err != nil {
if err := buildIdx(ctx, sn, chainID, tmpDir, progress, lvl); err != nil {
errs <- err
}
select {
case <-ctx.Done():
errs <- ctx.Err()
return
case <-logEvery.C:
var m runtime.MemStats
if lvl >= log.LvlInfo {
common2.ReadMemStats(&m)
}
log.Log(lvl, "[snapshots] Indexing", "type", t.String(), "blockNum", sn.To, "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys))
default:
}
}(sn)
}
}
@ -797,13 +785,22 @@ func BuildMissedIndices(ctx context.Context, dir string, chainID uint256.Int, tm
wg.Wait()
close(errs)
}()
for err := range errs {
if err != nil {
return err
for {
select {
case err := <-errs:
if err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
case <-logEvery.C:
var m runtime.MemStats
if lvl >= log.LvlInfo {
common2.ReadMemStats(&m)
}
log.Log(lvl, "[snapshots] Indexing", "blockNum", progress.Load(), "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys))
}
}
return nil
}
func noGaps(in []snap.FileInfo) (out []snap.FileInfo, missingSnapshots []Range) {
@ -1133,11 +1130,12 @@ func DumpBlocks(ctx context.Context, blockFrom, blockTo, blocksPerFile uint64, t
}
func dumpBlocksRange(ctx context.Context, blockFrom, blockTo uint64, tmpDir, snapDir string, chainDB kv.RoDB, chainID uint256.Int, workers int, lvl log.Lvl) error {
progress := atomic.NewUint64(0)
f, _ := snap.ParseFileName(snapDir, snap.SegmentFileName(blockFrom, blockTo, snap.Headers))
if err := DumpHeaders(ctx, chainDB, f.Path, tmpDir, blockFrom, blockTo, workers, lvl); err != nil {
return fmt.Errorf("DumpHeaders: %w", err)
}
if err := buildIdx(ctx, f, chainID, tmpDir, lvl); err != nil {
if err := buildIdx(ctx, f, chainID, tmpDir, progress, lvl); err != nil {
return err
}
@ -1145,7 +1143,7 @@ func dumpBlocksRange(ctx context.Context, blockFrom, blockTo uint64, tmpDir, sna
if err := DumpBodies(ctx, chainDB, f.Path, tmpDir, blockFrom, blockTo, workers, lvl); err != nil {
return fmt.Errorf("DumpBodies: %w", err)
}
if err := buildIdx(ctx, f, chainID, tmpDir, lvl); err != nil {
if err := buildIdx(ctx, f, chainID, tmpDir, progress, lvl); err != nil {
return err
}
@ -1153,7 +1151,7 @@ func dumpBlocksRange(ctx context.Context, blockFrom, blockTo uint64, tmpDir, sna
if _, err := DumpTxs(ctx, chainDB, f.Path, tmpDir, blockFrom, blockTo, workers, lvl); err != nil {
return fmt.Errorf("DumpTxs: %w", err)
}
if err := buildIdx(ctx, f, chainID, tmpDir, lvl); err != nil {
if err := buildIdx(ctx, f, chainID, tmpDir, progress, lvl); err != nil {
return err
}
@ -1531,7 +1529,7 @@ func expectedTxsAmount(snapDir string, blockFrom, blockTo uint64) (firstTxID, ex
return
}
func TransactionsIdx(ctx context.Context, chainID uint256.Int, blockFrom, blockTo uint64, snapDir string, tmpDir string, lvl log.Lvl) (err error) {
func TransactionsIdx(ctx context.Context, chainID uint256.Int, blockFrom, blockTo uint64, snapDir string, tmpDir string, progress *atomic.Uint64, lvl log.Lvl) (err error) {
defer func() {
if rec := recover(); rec != nil {
err = fmt.Errorf("TransactionsIdx: at=%d-%d, %v, %s", blockFrom, blockTo, rec, dbg.Stack())
@ -1619,6 +1617,7 @@ RETRY:
return ctx.Err()
default:
}
progress.Store(blockNum)
for body.BaseTxId+uint64(body.TxAmount) <= firstTxID+i { // skip empty blocks
if !bodyGetter.HasNext() {
@ -1939,7 +1938,8 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges
return fmt.Errorf("mergeByAppendSegments: %w", err)
}
if doIndex {
if err := buildIdx(ctx, f, m.chainID, m.tmpDir, m.lvl); err != nil {
progress := atomic.NewUint64(0)
if err := buildIdx(ctx, f, m.chainID, m.tmpDir, progress, m.lvl); err != nil {
return err
}
}