package main import ( "context" "encoding/binary" "sync" "time" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/changeset" "github.com/ledgerwatch/erigon/common/dbutils" "github.com/ledgerwatch/erigon/common/debug" "github.com/ledgerwatch/erigon/core/types/accounts" "github.com/ledgerwatch/log/v3" ) func incrementAccount(vTx kv.RwTx, tx kv.Tx, cfg optionsCfg, verkleWriter *VerkleTreeWriter, from, to uint64) error { logInterval := time.NewTicker(30 * time.Second) logPrefix := "IncrementVerkleAccount" jobs := make(chan *regenerateIncrementalPedersenAccountsJob, batchSize) out := make(chan *regenerateIncrementalPedersenAccountsOut, batchSize) wg := new(sync.WaitGroup) wg.Add(int(cfg.workersCount)) ctx, cancelWorkers := context.WithCancel(context.Background()) for i := 0; i < int(cfg.workersCount); i++ { go func(threadNo int) { defer debug.LogPanic() defer wg.Done() incrementalAccountWorker(ctx, logPrefix, jobs, out) }(i) } defer cancelWorkers() accountCursor, err := tx.CursorDupSort(kv.AccountChangeSet) if err != nil { return err } defer accountCursor.Close() // Start Goroutine for collection go func() { defer debug.LogPanic() defer cancelWorkers() for o := range out { if o.absentInState { if err := verkleWriter.DeleteAccount(o.versionHash, o.isContract); err != nil { panic(err) } continue } if err := verkleWriter.UpdateAccount(o.versionHash, o.codeSize, o.isContract, o.account); err != nil { panic(err) } if err := verkleWriter.WriteContractCodeChunks(o.codeKeys, o.codeChunks); err != nil { panic(err) } } }() marker := NewVerkleMarker() defer marker.Rollback() for k, v, err := accountCursor.Seek(dbutils.EncodeBlockNumber(from)); k != nil; k, v, err = accountCursor.Next() { if err != nil { return err } blockNumber, addressBytes, _, err := changeset.DecodeAccounts(k, v) if err != nil { return err } if blockNumber > to { break } address := common.BytesToAddress(addressBytes) marked, err := marker.IsMarked(addressBytes) if err != nil { return err } if marked { continue } encodedAccount, err := tx.GetOne(kv.PlainState, addressBytes) if err != nil { return err } incarnationBytes, err := tx.GetOne(kv.IncarnationMap, addressBytes) if err != nil { return err } isContract := len(incarnationBytes) > 0 && binary.BigEndian.Uint64(incarnationBytes) != 0 // Start if len(encodedAccount) == 0 { jobs <- ®enerateIncrementalPedersenAccountsJob{ address: address, isContract: isContract, absentInState: true, } } else { var acc accounts.Account if err := acc.DecodeForStorage(encodedAccount); err != nil { return err } // We need to update code. code, err := tx.GetOne(kv.Code, acc.CodeHash[:]) if err != nil { return err } jobs <- ®enerateIncrementalPedersenAccountsJob{ address: address, account: acc, code: code, absentInState: false, isContract: isContract, } } if err := marker.MarkAsDone(addressBytes); err != nil { return err } select { case <-logInterval.C: log.Info("Creating Verkle Trie Incrementally", "phase", "account", "blockNum", blockNumber) default: } } close(jobs) wg.Wait() close(out) return nil }