E3: par logs (#5839)

This commit is contained in:
Alex Sharov 2022-10-23 18:51:14 +07:00 committed by GitHub
parent cd4ce6291f
commit 993fde4d92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 92 additions and 48 deletions

View File

@ -381,16 +381,14 @@ func (api *APIImpl) getLogsV3(ctx context.Context, tx kv.Tx, begin, end uint64,
if txn == nil {
continue
}
stateReader.SetTxNum(txNum)
txHash := txn.Hash()
msg, err := txn.AsMessage(*lastSigner, lastHeader.BaseFee, lastRules)
if err != nil {
return nil, err
}
blockCtx, txCtx := transactions.GetEvmContext(msg, lastHeader, true /* requireCanonical */, tx, api._blockReader)
//stateReader.SetTxNum(txNum - 1)
stateReader.SetTxNum(txNum)
vmConfig := vm.Config{}
vmConfig.SkipAnalysis = core.SkipAnalysis(chainConfig, blockNum)
vmConfig := vm.Config{SkipAnalysis: core.SkipAnalysis(chainConfig, blockNum)}
ibs := state.New(stateReader)
evm := vm.NewEVM(blockCtx, txCtx, ibs, chainConfig, vmConfig)

View File

@ -118,6 +118,7 @@ func (rw *Worker22) RunTxTask(txTask *state.TxTask) {
rules := txTask.Rules
daoForkTx := rw.chainConfig.DAOForkSupport && rw.chainConfig.DAOForkBlock != nil && rw.chainConfig.DAOForkBlock.Uint64() == txTask.BlockNum && txTask.TxIndex == -1
var err error
header := txTask.Block.HeaderNoCopy()
if txTask.BlockNum == 0 && txTask.TxIndex == -1 {
//fmt.Printf("txNum=%d, blockNum=%d, Genesis\n", txTask.TxNum, txTask.BlockNum)
// Genesis block
@ -135,20 +136,20 @@ func (rw *Worker22) RunTxTask(txTask *state.TxTask) {
// Block initialisation
//fmt.Printf("txNum=%d, blockNum=%d, initialisation of the block\n", txTask.TxNum, txTask.BlockNum)
if rw.isPoSA {
systemcontracts.UpgradeBuildInSystemContract(rw.chainConfig, txTask.Block.Number(), ibs)
systemcontracts.UpgradeBuildInSystemContract(rw.chainConfig, header.Number, ibs)
}
syscall := func(contract common.Address, data []byte) ([]byte, error) {
return core.SysCallContract(contract, data, *rw.chainConfig, ibs, txTask.Block.HeaderNoCopy(), rw.engine)
return core.SysCallContract(contract, data, *rw.chainConfig, ibs, header, rw.engine)
}
rw.engine.Initialize(rw.chainConfig, rw.chain, rw.epoch, txTask.Block.HeaderNoCopy(), txTask.Block.Transactions(), txTask.Block.Uncles(), syscall)
rw.engine.Initialize(rw.chainConfig, rw.chain, rw.epoch, header, txTask.Block.Transactions(), txTask.Block.Uncles(), syscall)
} else if txTask.Final {
if txTask.BlockNum > 0 {
//fmt.Printf("txNum=%d, blockNum=%d, finalisation of the block\n", txTask.TxNum, txTask.BlockNum)
// End of block transaction in a block
syscall := func(contract common.Address, data []byte) ([]byte, error) {
return core.SysCallContract(contract, data, *rw.chainConfig, ibs, txTask.Block.HeaderNoCopy(), rw.engine)
return core.SysCallContract(contract, data, *rw.chainConfig, ibs, header, rw.engine)
}
if _, _, err := rw.engine.Finalize(rw.chainConfig, txTask.Block.HeaderNoCopy(), ibs, txTask.Block.Transactions(), txTask.Block.Uncles(), nil /* receipts */, rw.epoch, rw.chain, syscall); err != nil {
if _, _, err := rw.engine.Finalize(rw.chainConfig, header, ibs, txTask.Block.Transactions(), txTask.Block.Uncles(), nil /* receipts */, rw.epoch, rw.chain, syscall); err != nil {
//fmt.Printf("error=%v\n", err)
txTask.Error = err
} else {
@ -160,7 +161,6 @@ func (rw *Worker22) RunTxTask(txTask *state.TxTask) {
}
}
} else {
header := txTask.Block.HeaderNoCopy()
//fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex)
if rw.isPoSA {
if isSystemTx, err := rw.posa.IsSystemTransaction(txTask.Tx, header); err != nil {

View File

@ -296,17 +296,19 @@ func (rw *ReconWorker) Run() {
}
}
var noop = state2.NewNoopWriter()
func (rw *ReconWorker) runTxTask(txTask *state2.TxTask) {
rw.lock.Lock()
defer rw.lock.Unlock()
rw.stateReader.SetTxNum(txTask.TxNum)
rw.stateReader.ResetError()
rw.stateWriter.SetTxNum(txTask.TxNum)
noop := state2.NewNoopWriter()
rules := txTask.Rules
ibs := state2.New(rw.stateReader)
daoForkTx := rw.chainConfig.DAOForkSupport && rw.chainConfig.DAOForkBlock != nil && rw.chainConfig.DAOForkBlock.Uint64() == txTask.BlockNum && txTask.TxIndex == -1
var err error
header := txTask.Block.HeaderNoCopy()
if txTask.BlockNum == 0 && txTask.TxIndex == -1 {
//fmt.Printf("txNum=%d, blockNum=%d, Genesis\n", txTask.TxNum, txTask.BlockNum)
// Genesis block
@ -325,24 +327,24 @@ func (rw *ReconWorker) runTxTask(txTask *state2.TxTask) {
//fmt.Printf("txNum=%d, blockNum=%d, finalisation of the block\n", txNum, blockNum)
// End of block transaction in a block
syscall := func(contract common.Address, data []byte) ([]byte, error) {
return core.SysCallContract(contract, data, *rw.chainConfig, ibs, txTask.Block.Header(), rw.engine)
return core.SysCallContract(contract, data, *rw.chainConfig, ibs, header, rw.engine)
}
if _, _, err := rw.engine.Finalize(rw.chainConfig, txTask.Block.Header(), ibs, txTask.Block.Transactions(), txTask.Block.Uncles(), nil /* receipts */, rw.epoch, rw.chain, syscall); err != nil {
if _, _, err := rw.engine.Finalize(rw.chainConfig, header, ibs, txTask.Block.Transactions(), txTask.Block.Uncles(), nil /* receipts */, rw.epoch, rw.chain, syscall); err != nil {
panic(fmt.Errorf("finalize of block %d failed: %w", txTask.BlockNum, err))
}
}
} else if txTask.TxIndex == -1 {
// Block initialisation
if rw.isPoSA {
systemcontracts.UpgradeBuildInSystemContract(rw.chainConfig, txTask.Block.Number(), ibs)
systemcontracts.UpgradeBuildInSystemContract(rw.chainConfig, header.Number, ibs)
}
syscall := func(contract common.Address, data []byte) ([]byte, error) {
return core.SysCallContract(contract, data, *rw.chainConfig, ibs, txTask.Block.Header(), rw.engine)
return core.SysCallContract(contract, data, *rw.chainConfig, ibs, header, rw.engine)
}
rw.engine.Initialize(rw.chainConfig, rw.chain, rw.epoch, txTask.Block.Header(), txTask.Block.Transactions(), txTask.Block.Uncles(), syscall)
rw.engine.Initialize(rw.chainConfig, rw.chain, rw.epoch, header, txTask.Block.Transactions(), txTask.Block.Uncles(), syscall)
} else {
if rw.isPoSA {
if isSystemTx, err := rw.posa.IsSystemTransaction(txTask.Tx, txTask.Block.Header()); err != nil {
if isSystemTx, err := rw.posa.IsSystemTransaction(txTask.Tx, header); err != nil {
panic(err)
} else if isSystemTx {
return
@ -350,9 +352,9 @@ func (rw *ReconWorker) runTxTask(txTask *state2.TxTask) {
}
txHash := txTask.Tx.Hash()
gp := new(core.GasPool).AddGas(txTask.Tx.GetGas())
vmConfig := vm.Config{NoReceipts: true, SkipAnalysis: core.SkipAnalysis(rw.chainConfig, txTask.BlockNum)}
getHashFn := core.GetHashFn(txTask.Block.Header(), rw.getHeader)
blockContext := core.NewEVMBlockContext(txTask.Block.Header(), getHashFn, rw.engine, nil /* author */)
vmConfig := vm.Config{NoReceipts: true, SkipAnalysis: txTask.SkipAnalysis}
getHashFn := core.GetHashFn(header, rw.getHeader)
blockContext := core.NewEVMBlockContext(header, getHashFn, rw.engine, nil /* author */)
ibs.Prepare(txHash, txTask.BlockHash, txTask.TxIndex)
msg := txTask.TxAsMessage
txContext := core.NewEVMTxContext(msg)

View File

@ -22,6 +22,21 @@ func WriteMap() bool {
return writeMap
}
var (
mdbxReaadahead bool
mdbxReaadaheadOnce sync.Once
)
func MdbxReadAhead() bool {
mdbxReaadaheadOnce.Do(func() {
v, _ := os.LookupEnv("MDBX_READAHEAD")
if v == "true" {
mdbxReaadahead = true
}
})
return mdbxReaadahead
}
var (
bigRoTx uint
getBigRoTx sync.Once

View File

@ -21,6 +21,7 @@ import (
"github.com/ledgerwatch/erigon/core/types/accounts"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/shards"
"golang.org/x/exp/slices"
)
// ReadWriteSet contains ReadSet, WriteSet and BalanceIncrease of a transaction,
@ -563,6 +564,16 @@ type KvList struct {
Keys, Vals [][]byte
}
func (l *KvList) Reset() {
for i := range l.Keys {
l.Keys[i], l.Vals[i] = nil, nil
}
l.Keys, l.Vals = l.Keys[:0], l.Vals[:0]
}
func (l *KvList) Clone() *KvList {
return &KvList{Keys: slices.Clone(l.Keys), Vals: slices.Clone(l.Vals)}
}
func (l KvList) Len() int {
return len(l.Keys)
}
@ -607,12 +618,12 @@ func (w *StateWriter22) SetTxNum(txNum uint64) {
}
func (w *StateWriter22) ResetWriteSet() {
w.writeLists = map[string]*KvList{
kv.PlainState: {},
kv.Code: {},
kv.PlainContractCode: {},
kv.IncarnationMap: {},
}
w.writeLists[kv.PlainState].Reset()
w.writeLists[kv.Code].Reset()
w.writeLists[kv.PlainContractCode].Reset()
w.writeLists[kv.IncarnationMap].Reset()
//}
w.accountPrevs = map[string][]byte{}
w.accountDels = map[string]*accounts.Account{}
w.storagePrevs = map[string][]byte{}
@ -620,7 +631,12 @@ func (w *StateWriter22) ResetWriteSet() {
}
func (w *StateWriter22) WriteSet() map[string]*KvList {
return w.writeLists
return map[string]*KvList{
kv.PlainState: w.writeLists[kv.PlainState].Clone(),
kv.Code: w.writeLists[kv.Code].Clone(),
kv.PlainContractCode: w.writeLists[kv.PlainContractCode].Clone(),
kv.IncarnationMap: w.writeLists[kv.IncarnationMap].Clone(),
}
}
func (w *StateWriter22) PrevAndDels() (map[string][]byte, map[string]*accounts.Account, map[string][]byte, map[string]uint64) {
@ -714,16 +730,19 @@ func (r *StateReader22) SetTx(tx kv.Tx) {
}
func (r *StateReader22) ResetReadSet() {
r.readLists = map[string]*KvList{
kv.PlainState: {},
kv.Code: {},
CodeSizeTable: {},
kv.IncarnationMap: {},
}
r.readLists[kv.PlainState].Reset()
r.readLists[kv.Code].Reset()
r.readLists[CodeSizeTable].Reset()
r.readLists[kv.IncarnationMap].Reset()
}
func (r *StateReader22) ReadSet() map[string]*KvList {
return r.readLists
return map[string]*KvList{
kv.PlainState: r.readLists[kv.PlainState].Clone(),
kv.Code: r.readLists[kv.Code].Clone(),
CodeSizeTable: r.readLists[CodeSizeTable].Clone(),
kv.IncarnationMap: r.readLists[kv.IncarnationMap].Clone(),
}
}
func (r *StateReader22) SetTrace(trace bool) {

View File

@ -221,7 +221,8 @@ func (w *StateReconWriter) SetTx(tx kv.Tx) {
}
func (w *StateReconWriter) UpdateAccountData(address common.Address, original, account *accounts.Account) error {
txKey, err := w.tx.GetOne(kv.XAccount, address.Bytes())
addr := address.Bytes()
txKey, err := w.tx.GetOne(kv.XAccount, addr)
if err != nil {
return err
}
@ -237,12 +238,13 @@ func (w *StateReconWriter) UpdateAccountData(address common.Address, original, a
}
account.EncodeForStorage(value)
//fmt.Printf("account [%x]=>{Balance: %d, Nonce: %d, Root: %x, CodeHash: %x} txNum: %d\n", address, &account.Balance, account.Nonce, account.Root, account.CodeHash, w.txNum)
w.rs.Put(kv.PlainStateR, address[:], nil, value, w.txNum)
w.rs.Put(kv.PlainStateR, addr, nil, value, w.txNum)
return nil
}
func (w *StateReconWriter) UpdateAccountCode(address common.Address, incarnation uint64, codeHash common.Hash, code []byte) error {
txKey, err := w.tx.GetOne(kv.XCode, address.Bytes())
addr, codeHashBytes := address.Bytes(), codeHash.Bytes()
txKey, err := w.tx.GetOne(kv.XCode, addr)
if err != nil {
return err
}
@ -252,10 +254,10 @@ func (w *StateReconWriter) UpdateAccountCode(address common.Address, incarnation
if stateTxNum := binary.BigEndian.Uint64(txKey); stateTxNum != w.txNum {
return nil
}
w.rs.Put(kv.CodeR, codeHash[:], nil, common.CopyBytes(code), w.txNum)
w.rs.Put(kv.CodeR, codeHashBytes, nil, common.CopyBytes(code), w.txNum)
if len(code) > 0 {
//fmt.Printf("code [%x] => %d CodeHash: %x, txNum: %d\n", address, len(code), codeHash, w.txNum)
w.rs.Put(kv.PlainContractR, dbutils.PlainGenerateStoragePrefix(address[:], FirstContractIncarnation), nil, codeHash[:], w.txNum)
w.rs.Put(kv.PlainContractR, dbutils.PlainGenerateStoragePrefix(addr, FirstContractIncarnation), nil, codeHashBytes, w.txNum)
}
return nil
}
@ -270,8 +272,10 @@ func (w *StateReconWriter) WriteAccountStorage(address common.Address, incarnati
} else {
w.composite = w.composite[:20+32]
}
copy(w.composite, address.Bytes())
copy(w.composite[20:], key.Bytes())
addr, k := address.Bytes(), key.Bytes()
copy(w.composite, addr)
copy(w.composite[20:], k)
txKey, err := w.tx.GetOne(kv.XStorage, w.composite)
if err != nil {
return err
@ -284,7 +288,7 @@ func (w *StateReconWriter) WriteAccountStorage(address common.Address, incarnati
}
if !value.IsZero() {
//fmt.Printf("storage [%x] [%x] => [%x], txNum: %d\n", address, *key, value.Bytes(), w.txNum)
w.rs.Put(kv.PlainStateR, address.Bytes(), key.Bytes(), value.Bytes(), w.txNum)
w.rs.Put(kv.PlainStateR, addr, k, value.Bytes(), w.txNum)
}
return nil
}

4
go.mod
View File

@ -4,8 +4,8 @@ go 1.18
require (
github.com/gballet/go-verkle v0.0.0-20220829125900-a702d458d33c
github.com/ledgerwatch/erigon-lib v0.0.0-20221022051842-1e9417905a20
github.com/ledgerwatch/erigon-snapshot v1.1.0
github.com/ledgerwatch/erigon-lib v0.0.0-20221023074755-556ff6a55357
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20221023043405-d157dec75e9a
github.com/ledgerwatch/log/v3 v3.6.0
github.com/ledgerwatch/secp256k1 v1.0.0
github.com/ledgerwatch/trackerslist v1.0.0

8
go.sum
View File

@ -556,10 +556,10 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20221022051842-1e9417905a20 h1:zVzBZ4zbgXJ8ZtsHjPwtbZRQ9ZBcZsJRRpAD1H5PQLQ=
github.com/ledgerwatch/erigon-lib v0.0.0-20221022051842-1e9417905a20/go.mod h1:9GQV2rtD63oUSu5X0/0tB+fXxYwN33NkKdb5T8rLSio=
github.com/ledgerwatch/erigon-snapshot v1.1.0 h1:86gC++am90swu5MtcUHYbnly/cCoR1ceAmYaIFxmM1k=
github.com/ledgerwatch/erigon-snapshot v1.1.0/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
github.com/ledgerwatch/erigon-lib v0.0.0-20221023074755-556ff6a55357 h1:L1QVgFHbqwaZQGit5Qm0PTEb6E9ijYcpgBnY19H8jdA=
github.com/ledgerwatch/erigon-lib v0.0.0-20221023074755-556ff6a55357/go.mod h1:jv4jmffliy8Us1wXdT6Q1f/nrrmC6Xrr6V70JxZRp48=
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20221023043405-d157dec75e9a h1:5Lw7NR/KWxtAokY11DKaMnrI6Zb04OzX8BQMbF8Lu/w=
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20221023043405-d157dec75e9a/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
github.com/ledgerwatch/log/v3 v3.6.0 h1:JBUSK1epPyutUrz7KYDTcJtQLEHnehECRpKbM1ugy5M=
github.com/ledgerwatch/log/v3 v3.6.0/go.mod h1:L+Sp+ma/h205EdCjviZECjGEvYUYEyXSdiuHNZzg+xQ=
github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ=

View File

@ -31,6 +31,7 @@ import (
"github.com/ledgerwatch/erigon/common/debug"
"github.com/ledgerwatch/erigon/node/nodecfg"
"github.com/ledgerwatch/erigon/params"
mdbx2 "github.com/torquem-ch/mdbx-go/mdbx"
"golang.org/x/sync/semaphore"
"github.com/gofrs/flock"
@ -341,6 +342,11 @@ func OpenDatabase(config *nodecfg.Config, logger log.Logger, label kv.Label) (kv
if debug.WriteMap() {
opts = opts.WriteMap()
}
if debug.MdbxReadAhead() {
opts = opts.Flags(func(u uint) uint {
return u &^ mdbx2.NoReadahead
})
}
return opts.Open()
}
var err error