Snapshots: nil indices on p2p fix (#3772)

This commit is contained in:
Alex Sharov 2022-03-27 10:07:58 +07:00 committed by GitHub
parent 9ea6398524
commit 89d4477df3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 188 additions and 57 deletions

View File

@ -229,7 +229,7 @@ func resetFinish(tx kv.RwTx) error {
return nil
}
func printStages(db kv.Getter) error {
func printStages(db kv.Tx) error {
var err error
var progress uint64
w := new(tabwriter.Writer)
@ -253,5 +253,16 @@ func printStages(db kv.Getter) error {
}
fmt.Fprintf(w, "--\n")
fmt.Fprintf(w, "prune distance: %s\n\n", pm.String())
s1, err := db.ReadSequence(kv.EthTx)
if err != nil {
return err
}
s2, err := db.ReadSequence(kv.NonCanonicalTxs)
if err != nil {
return err
}
fmt.Fprintf(w, "sequence: EthTx=%d, NonCanonicalTx=%d\n\n", s1, s2)
return nil
}

View File

@ -1063,10 +1063,7 @@ func allSnapshots(cc *params.ChainConfig) *snapshotsync.RoSnapshots {
snapshotCfg := ethconfig.NewSnapshotCfg(enableSnapshot, true)
dir.MustExist(filepath.Join(datadir, "snapshots"))
_allSnapshotsSingleton = snapshotsync.NewRoSnapshots(snapshotCfg, filepath.Join(datadir, "snapshots"))
if err := _allSnapshotsSingleton.ReopenSegments(); err != nil {
panic(err)
}
if err := _allSnapshotsSingleton.ReopenSomeIndices(snapshotsync.AllSnapshotTypes...); err != nil {
if err := _allSnapshotsSingleton.Reopen(); err != nil {
panic(err)
}
}

View File

@ -345,8 +345,7 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
allSnapshots := snapshotsync.NewRoSnapshots(cfg.Snapshot, filepath.Join(cfg.DataDir, "snapshots"))
allSnapshots.AsyncOpenAll(ctx)
onNewSnapshot = func() {
allSnapshots.ReopenSegments()
allSnapshots.ReopenIndices()
allSnapshots.Reopen()
}
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
} else {

View File

@ -358,13 +358,19 @@ func WriteTransactions(db kv.RwTx, txs []types.Transaction, baseTxId uint64) err
return nil
}
func WriteRawTransactions(db kv.StatelessWriteTx, txs [][]byte, baseTxId uint64) error {
func WriteRawTransactions(db kv.RwTx, txs [][]byte, baseTxId uint64) error {
txId := baseTxId
for _, tx := range txs {
txIdKey := make([]byte, 8)
binary.BigEndian.PutUint64(txIdKey, txId)
// If next Append returns KeyExists error - it means you need to open transaction in App code before calling this func. Batch is also fine.
if err := db.Append(kv.EthTx, txIdKey, tx); err != nil {
c, err := db.Cursor(kv.EthTx)
if err != nil {
kk, _, _ := c.Last()
c.Close()
return fmt.Errorf("txId=%d, baseTxId=%d, lastInDb=%d, %w", txId, baseTxId, binary.BigEndian.Uint64(kk), err)
}
return err
}
txId++
@ -473,6 +479,18 @@ func RawTransactionsRange(db kv.Getter, from, to uint64) (res [][]byte, err erro
// ResetSequence - allow set arbitrary value to sequence (for example to decrement it to exact value)
func ResetSequence(tx kv.RwTx, bucket string, newValue uint64) error {
c, err := tx.Cursor(bucket)
if err != nil {
return err
}
k, _, err := c.Last()
if err != nil {
return err
}
if k != nil && binary.BigEndian.Uint64(k) >= newValue {
panic(fmt.Sprintf("must not happen. ResetSequence: %s, %d < lastInDB: %d\n", bucket, newValue, binary.BigEndian.Uint64(k)))
}
newVBytes := make([]byte, 8)
binary.BigEndian.PutUint64(newVBytes, newValue)
if err := tx.Put(kv.Sequence, []byte(bucket), newVBytes); err != nil {
@ -529,7 +547,7 @@ func ReadSenders(db kv.Getter, hash common.Hash, number uint64) ([]common.Addres
return senders, nil
}
func WriteRawBodyIfNotExists(db kv.StatelessRwTx, hash common.Hash, number uint64, body *types.RawBody) error {
func WriteRawBodyIfNotExists(db kv.RwTx, hash common.Hash, number uint64, body *types.RawBody) error {
exists, err := db.Has(kv.BlockBody, dbutils.BlockBodyKey(number, hash))
if err != nil {
return err
@ -540,7 +558,7 @@ func WriteRawBodyIfNotExists(db kv.StatelessRwTx, hash common.Hash, number uint6
return WriteRawBody(db, hash, number, body)
}
func WriteRawBody(db kv.StatelessRwTx, hash common.Hash, number uint64, body *types.RawBody) error {
func WriteRawBody(db kv.RwTx, hash common.Hash, number uint64, body *types.RawBody) error {
baseTxId, err := db.IncrementSequence(kv.EthTx, uint64(len(body.Transactions))+2)
if err != nil {
return err
@ -551,10 +569,10 @@ func WriteRawBody(db kv.StatelessRwTx, hash common.Hash, number uint64, body *ty
Uncles: body.Uncles,
}
if err = WriteBodyForStorage(db, hash, number, &data); err != nil {
return fmt.Errorf("failed to write body: %w", err)
return fmt.Errorf("WriteBodyForStorage: %w", err)
}
if err = WriteRawTransactions(db, body.Transactions, baseTxId+1); err != nil {
return fmt.Errorf("failed to WriteRawTransactions: %w", err)
return fmt.Errorf("WriteRawTransactions: %w", err)
}
return nil
}
@ -600,7 +618,7 @@ func DeleteBody(db kv.Deleter, hash common.Hash, number uint64) {
}
// MakeBodiesCanonical - move all txs of non-canonical blocks from NonCanonicalTxs table to EthTx table
func MakeBodiesCanonical(tx kv.StatelessRwTx, from uint64, ctx context.Context, logPrefix string, logEvery *time.Ticker) error {
func MakeBodiesCanonical(tx kv.RwTx, from uint64, ctx context.Context, logPrefix string, logEvery *time.Ticker) error {
for blockNum := from; ; blockNum++ {
h, err := ReadCanonicalHash(tx, blockNum)
if err != nil {
@ -717,6 +735,18 @@ func MakeBodiesNonCanonical(tx kv.RwTx, from uint64, ctx context.Context, logPre
// EthTx must have canonical id's - means need decrement it's sequence on unwind
if firstMovedTxnIDIsSet {
c, err := tx.Cursor(kv.EthTx)
if err != nil {
return err
}
k, _, err := c.Last()
if err != nil {
return err
}
if k != nil && binary.BigEndian.Uint64(k) >= firstMovedTxnID {
panic(fmt.Sprintf("must not happen, ResetSequence: %d, lastInDB: %d\n", firstMovedTxnID, binary.BigEndian.Uint64(k)))
}
if err := ResetSequence(tx, kv.EthTx, firstMovedTxnID); err != nil {
return err
}

View File

@ -189,7 +189,7 @@ Loop:
// Check existence before write - because WriteRawBody isn't idempotent (it allocates new sequence range for transactions on every call)
if err = rawdb.WriteRawBodyIfNotExists(tx, header.Hash(), blockHeight, rawBody); err != nil {
return fmt.Errorf("writing block body: %w", err)
return fmt.Errorf("WriteRawBodyIfNotExists: %w", err)
}
if blockHeight > bodyProgress {

View File

@ -952,14 +952,14 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R
if err := WaitForDownloader(ctx, tx, cfg); err != nil {
return err
}
if err := cfg.snapshots.ReopenSegments(); err != nil {
if err := cfg.snapshots.Reopen(); err != nil {
return fmt.Errorf("ReopenSegments: %w", err)
}
expect := cfg.snapshotHashesCfg.ExpectBlocks
if cfg.snapshots.SegmentsAvailable() < expect {
return fmt.Errorf("not enough snapshots available: %d > %d", expect, cfg.snapshots.SegmentsAvailable())
}
if err := cfg.snapshots.ReopenIndices(); err != nil {
if err := cfg.snapshots.Reopen(); err != nil {
return fmt.Errorf("ReopenIndices: %w", err)
}
@ -977,7 +977,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R
}
}
if err := cfg.snapshots.ReopenIndices(); err != nil {
if err := cfg.snapshots.Reopen(); err != nil {
return fmt.Errorf("ReopenIndices: %w", err)
}
}

View File

@ -224,7 +224,7 @@ var txsBeginEnd = Migration{
},
}
func writeRawBodyDeprecated(db kv.StatelessRwTx, hash common.Hash, number uint64, body *types.RawBody) error {
func writeRawBodyDeprecated(db kv.RwTx, hash common.Hash, number uint64, body *types.RawBody) error {
baseTxId, err := db.IncrementSequence(kv.EthTx, uint64(len(body.Transactions)))
if err != nil {
return err

View File

@ -155,7 +155,7 @@ func doRetireCommand(cliCtx *cli.Context) error {
chainConfig := tool.ChainConfigFromDB(chainDB)
chainID, _ := uint256.FromBig(chainConfig.ChainID)
snapshots := snapshotsync.NewRoSnapshots(cfg, snapshotDir)
snapshots.ReopenSegments()
snapshots.Reopen()
br := snapshotsync.NewBlockRetire(runtime.NumCPU()/2, tmpDir, snapshots, chainDB, nil, nil)
@ -215,7 +215,7 @@ func rebuildIndices(ctx context.Context, chainDB kv.RoDB, cfg ethconfig.Snapshot
chainID, _ := uint256.FromBig(chainConfig.ChainID)
allSnapshots := snapshotsync.NewRoSnapshots(cfg, snapshotDir.Path)
if err := allSnapshots.ReopenSegments(); err != nil {
if err := allSnapshots.Reopen(); err != nil {
return err
}
if err := snapshotsync.BuildIndices(ctx, allSnapshots, snapshotDir, *chainID, tmpDir, from, log.LvlInfo); err != nil {
@ -281,8 +281,7 @@ func checkBlockSnapshot(chaindata string) error {
cfg := ethconfig.NewSnapshotCfg(true, true)
snapshots := snapshotsync.NewRoSnapshots(cfg, filepath.Join(datadir, "snapshots"))
snapshots.ReopenSegments()
snapshots.ReopenIndices()
snapshots.Reopen()
//if err := snapshots.BuildIndices(context.Background(), *chainID); err != nil {
// panic(err)
//}

View File

@ -192,6 +192,13 @@ func (back *BlockReaderWithSnapshots) HeaderByNumber(ctx context.Context, tx kv.
ok, err := back.sn.ViewHeaders(blockHeight, func(segment *HeaderSegment) error {
if segment.idxHeaderHash == nil {
fmt.Printf("why? %d, %d, %d, %d, %d\n", blockHeight, segment.From, segment.To, back.sn.segmentsAvailable.Load(), back.sn.idxAvailable.Load())
back.sn.PrintDebug()
for _, sn := range back.sn.Headers.segments {
if sn.idxHeaderHash == nil {
fmt.Printf("seg with nil idx: %d,%d\n", segment.From, segment.To)
}
}
fmt.Printf("==== end debug print ====\n")
}
h, err = back.headerFromSnapshot(blockHeight, segment, nil)
if err != nil {

View File

@ -447,10 +447,7 @@ func (s *RoSnapshots) AsyncOpenAll(ctx context.Context) {
return
default:
}
if err := s.ReopenSegments(); err != nil && !errors.Is(err, os.ErrNotExist) && !errors.Is(err, ErrSnapshotMissed) {
log.Error("AsyncOpenAll", "err", err)
}
if err := s.ReopenIndices(); err != nil && !errors.Is(err, os.ErrNotExist) && !errors.Is(err, ErrSnapshotMissed) {
if err := s.Reopen(); err != nil && !errors.Is(err, os.ErrNotExist) && !errors.Is(err, ErrSnapshotMissed) {
log.Error("AsyncOpenAll", "err", err)
}
time.Sleep(15 * time.Second)
@ -458,6 +455,96 @@ func (s *RoSnapshots) AsyncOpenAll(ctx context.Context) {
}()
}
func (s *RoSnapshots) Reopen() error {
s.Headers.lock.Lock()
defer s.Headers.lock.Unlock()
s.Bodies.lock.Lock()
defer s.Bodies.lock.Unlock()
s.Txs.lock.Lock()
defer s.Txs.lock.Unlock()
s.closeSegmentsLocked()
files, err := segments2(s.dir)
if err != nil {
return err
}
for _, f := range files {
{
seg := &BodySegment{From: f.From, To: f.To}
fileName := SegmentFileName(f.From, f.To, Bodies)
seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName))
if err != nil {
if errors.Is(err, os.ErrNotExist) {
break
}
return err
}
s.Bodies.segments = append(s.Bodies.segments, seg)
}
{
seg := &HeaderSegment{From: f.From, To: f.To}
fileName := SegmentFileName(f.From, f.To, Headers)
seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName))
if err != nil {
if errors.Is(err, os.ErrNotExist) {
break
}
return err
}
s.Headers.segments = append(s.Headers.segments, seg)
}
{
seg := &TxnSegment{From: f.From, To: f.To}
fileName := SegmentFileName(f.From, f.To, Transactions)
seg.Seg, err = compress.NewDecompressor(path.Join(s.dir, fileName))
if err != nil {
if errors.Is(err, os.ErrNotExist) {
break
}
return err
}
s.Txs.segments = append(s.Txs.segments, seg)
}
if f.To > 0 {
s.segmentsAvailable.Store(f.To - 1)
} else {
s.segmentsAvailable.Store(0)
}
}
s.segmentsReady.Store(true)
for _, sn := range s.Headers.segments {
sn.idxHeaderHash, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(sn.From, sn.To, Headers.String())))
if err != nil && !errors.Is(err, os.ErrNotExist) {
return err
}
}
for _, sn := range s.Bodies.segments {
sn.idxBodyNumber, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(sn.From, sn.To, Bodies.String())))
if err != nil && !errors.Is(err, os.ErrNotExist) {
return err
}
}
for _, sn := range s.Txs.segments {
sn.IdxTxnHash, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(sn.From, sn.To, Transactions.String())))
if err != nil && !errors.Is(err, os.ErrNotExist) {
return err
}
sn.IdxTxnId, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(sn.From, sn.To, TransactionsId.String())))
if err != nil && !errors.Is(err, os.ErrNotExist) {
return err
}
sn.IdxTxnHash2BlockNum, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(sn.From, sn.To, Transactions2Block.String())))
if err != nil && !errors.Is(err, os.ErrNotExist) {
return err
}
}
s.idxAvailable.Store(s.idxAvailability())
s.indicesReady.Store(true)
return nil
}
func (s *RoSnapshots) ReopenSegments() error {
s.Headers.lock.Lock()
defer s.Headers.lock.Unlock()
@ -484,6 +571,7 @@ func (s *RoSnapshots) ReopenSegments() error {
s.Bodies.segments = append(s.Bodies.segments, seg)
}
{
fmt.Printf("reopen segment: %d-%d\n", f.From, f.To)
seg := &HeaderSegment{From: f.From, To: f.To}
fileName := SegmentFileName(f.From, f.To, Headers)
seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName))
@ -542,12 +630,12 @@ func (s *RoSnapshots) closeSegmentsLocked() {
}
}
func (s *RoSnapshots) PrintDebug() {
s.Headers.lock.Lock()
defer s.Headers.lock.Unlock()
s.Bodies.lock.Lock()
defer s.Bodies.lock.Unlock()
s.Txs.lock.Lock()
defer s.Txs.lock.Unlock()
s.Headers.lock.RLock()
defer s.Headers.lock.RUnlock()
s.Bodies.lock.RLock()
defer s.Bodies.lock.RUnlock()
s.Txs.lock.RLock()
defer s.Txs.lock.RUnlock()
fmt.Printf("sn: %d, %d\n", s.segmentsAvailable.Load(), s.idxAvailable.Load())
fmt.Println(" == Snapshots, Header")
for _, sn := range s.Headers.segments {
@ -956,7 +1044,7 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25
if err := DumpBlocks(ctx, blockFrom, blockTo, DEFAULT_SEGMENT_SIZE, tmpDir, snapshots.Dir(), db, workers, lvl); err != nil {
return fmt.Errorf("DumpBlocks: %w", err)
}
if err := snapshots.ReopenSegments(); err != nil {
if err := snapshots.Reopen(); err != nil {
return fmt.Errorf("ReopenSegments: %w", err)
}
merger := NewMerger(tmpDir, workers, lvl, chainID)
@ -1799,7 +1887,7 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges
if len(mergeRanges) == 0 {
return nil
}
logEvery := time.NewTicker(20 * time.Second)
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
log.Log(m.lvl, "[snapshots] Merge segments", "ranges", fmt.Sprintf("%v", mergeRanges))
for _, r := range mergeRanges {
@ -1809,7 +1897,7 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges
}
{
segFilePath := filepath.Join(snapshotDir.Path, SegmentFileName(r.from, r.to, Bodies))
if err := m.merge(ctx, toMergeBodies, segFilePath); err != nil {
if err := m.merge(ctx, toMergeBodies, segFilePath, logEvery); err != nil {
return fmt.Errorf("mergeByAppendSegments: %w", err)
}
if doIndex {
@ -1821,7 +1909,7 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges
{
segFilePath := filepath.Join(snapshotDir.Path, SegmentFileName(r.from, r.to, Headers))
if err := m.merge(ctx, toMergeHeaders, segFilePath); err != nil {
if err := m.merge(ctx, toMergeHeaders, segFilePath, logEvery); err != nil {
return fmt.Errorf("mergeByAppendSegments: %w", err)
}
if doIndex {
@ -1833,7 +1921,7 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges
{
segFilePath := filepath.Join(snapshotDir.Path, SegmentFileName(r.from, r.to, Transactions))
if err := m.merge(ctx, toMergeTxs, segFilePath); err != nil {
if err := m.merge(ctx, toMergeTxs, segFilePath, logEvery); err != nil {
return fmt.Errorf("mergeByAppendSegments: %w", err)
}
if doIndex {
@ -1843,10 +1931,7 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges
}
}
if err := snapshots.ReopenSegments(); err != nil {
return fmt.Errorf("ReopenSegments: %w", err)
}
if err := snapshots.ReopenIndices(); err != nil {
if err := snapshots.Reopen(); err != nil {
return fmt.Errorf("ReopenSegments: %w", err)
}
if err := m.removeOldFiles(toMergeHeaders, snapshotDir); err != nil {
@ -1865,27 +1950,30 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges
return nil
}
func (m *Merger) merge(ctx context.Context, toMerge []string, targetFile string) error {
fileNames := make([]string, len(toMerge))
for i, f := range toMerge {
_, fName := filepath.Split(f)
fileNames[i] = fName
}
func (m *Merger) merge(ctx context.Context, toMerge []string, targetFile string, logEvery *time.Ticker) error {
f, err := compress.NewCompressor(ctx, "merge", targetFile, m.tmpDir, compress.MinPatternScore, m.workers)
if err != nil {
return err
}
defer f.Close()
var word = make([]byte, 0, 4096)
for _, cFile := range toMerge {
var cnt, total int
cList := make([]*compress.Decompressor, len(toMerge))
for i, cFile := range toMerge {
d, err := compress.NewDecompressor(cFile)
if err != nil {
return err
}
defer d.Close()
cList[i] = d
total += d.Count()
}
for _, d := range cList {
if err := d.WithReadAhead(func() error {
g := d.MakeGetter()
for g.HasNext() {
cnt++
word, _ = g.Next(word[:0])
if err := f.AddWord(word); err != nil {
return err
@ -1893,6 +1981,9 @@ func (m *Merger) merge(ctx context.Context, toMerge []string, targetFile string)
select {
case <-ctx.Done():
return ctx.Err()
case <-logEvery.C:
_, fName := filepath.Split(targetFile)
log.Info("[snapshots] Merge", "progress", fmt.Sprintf("%.2f%%", 100*float64(cnt)/float64(total)), "to", fName)
default:
}
}

View File

@ -87,7 +87,7 @@ func TestMergeSnapshots(t *testing.T) {
cfg := ethconfig.Snapshot{Enabled: true}
s := NewRoSnapshots(cfg, dir)
defer s.Close()
require.NoError(s.ReopenSegments())
require.NoError(s.Reopen())
{
merger := NewMerger(dir, 1, log.LvlInfo, uint256.Int{})
@ -161,7 +161,7 @@ func TestOpenAllSnapshot(t *testing.T) {
createFile := func(from, to uint64, name Type) { createTestSegmentFile(t, from, to, name, dir) }
s := NewRoSnapshots(cfg, dir)
defer s.Close()
err := s.ReopenSegments()
err := s.Reopen()
require.NoError(err)
require.Equal(0, len(s.Headers.segments))
s.Close()
@ -175,7 +175,7 @@ func TestOpenAllSnapshot(t *testing.T) {
createFile(500_000, 1_000_000, Headers)
createFile(500_000, 1_000_000, Transactions)
s = NewRoSnapshots(cfg, dir)
err = s.ReopenSegments()
err = s.Reopen()
require.Error(err)
require.Equal(0, len(s.Headers.segments)) //because, no gaps are allowed (expect snapshots from block 0)
s.Close()
@ -186,11 +186,8 @@ func TestOpenAllSnapshot(t *testing.T) {
s = NewRoSnapshots(cfg, dir)
defer s.Close()
err = s.ReopenSegments()
err = s.Reopen()
require.NoError(err)
err = s.ReopenIndices()
require.NoError(err)
s.indicesReady.Store(true)
require.Equal(2, len(s.Headers.segments))
ok, err := s.ViewTxs(10, func(sn *TxnSegment) error {
@ -217,7 +214,7 @@ func TestOpenAllSnapshot(t *testing.T) {
// ExpectedBlocks - says only how much block must come from Torrent
chainSnapshotCfg.ExpectBlocks = 500_000 - 1
s = NewRoSnapshots(cfg, dir)
err = s.ReopenSegments()
err = s.Reopen()
require.NoError(err)
defer s.Close()
require.Equal(2, len(s.Headers.segments))
@ -228,7 +225,7 @@ func TestOpenAllSnapshot(t *testing.T) {
chainSnapshotCfg.ExpectBlocks = math.MaxUint64
s = NewRoSnapshots(cfg, dir)
defer s.Close()
err = s.ReopenSegments()
err = s.Reopen()
require.NoError(err)
}