Mining data races (#361)

* data races

* log cencelation
This commit is contained in:
Evgeny Danilenko 2020-02-10 17:28:30 +03:00 committed by GitHub
parent aafacd04d7
commit c7a10934d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 136 additions and 38 deletions

View File

@ -3,7 +3,9 @@ package consensus
import (
"context"
"github.com/ledgerwatch/turbo-geth/common/debug"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/log"
)
type ResultWithContext struct {
@ -13,7 +15,12 @@ type ResultWithContext struct {
type Cancel struct {
context.Context
context.CancelFunc
cancel context.CancelFunc
}
func (c *Cancel) CancelFunc() {
log.Debug("cancel mining task", "callers", debug.Callers(10))
c.cancel()
}
func NewCancel(ctxs ...context.Context) Cancel {

82
miner/environment.go Normal file
View File

@ -0,0 +1,82 @@
package miner
import (
"math/big"
"sync"
mapset "github.com/deckarep/golang-set"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/consensus"
"github.com/ledgerwatch/turbo-geth/core"
"github.com/ledgerwatch/turbo-geth/core/state"
"github.com/ledgerwatch/turbo-geth/core/types"
)
// environment is the worker's current environment and holds all of the current state information.
type environment struct {
signer types.Signer
state *state.IntraBlockState // apply state changes here
tds *state.TrieDbState
ancestors mapset.Set // ancestor set (used for checking uncle parent validity)
family mapset.Set // family set (used for checking uncle invalidity)
uncles mapset.Set // uncle set
tcount int // tx count in cycle
gasPool *core.GasPool // available gas used to pack transactions
*sync.RWMutex
header *types.Header
txs []*types.Transaction
receipts []*types.Receipt
ctx consensus.Cancel
}
func (e *environment) Number() *big.Int {
e.RLock()
defer e.RUnlock()
return big.NewInt(0).Set(e.header.Number)
}
func (e *environment) Hash() common.Hash {
e.RLock()
defer e.RUnlock()
return e.header.Hash()
}
func (e *environment) ParentHash() common.Hash {
e.RLock()
defer e.RUnlock()
return e.header.ParentHash
}
func (e *environment) SetHeader(h *types.Header) {
e.Lock()
defer e.Unlock()
e.header = h
}
func (e *environment) GetHeader() *types.Header {
e.RLock()
defer e.RUnlock()
return types.CopyHeader(e.header)
}
func (e *environment) Set(env *environment) {
em := e.RWMutex
envm := env.RWMutex
em.Lock()
envm.Lock()
defer func() {
em.Unlock()
envm.Unlock()
}()
*e = *env
e.RWMutex = envm
}

View File

@ -81,23 +81,6 @@ const (
staleThreshold = 7
)
// environment is the worker's current environment and holds all of the current state information.
type environment struct {
signer types.Signer
state *state.IntraBlockState // apply state changes here
tds *state.TrieDbState
ancestors mapset.Set // ancestor set (used for checking uncle parent validity)
family mapset.Set // family set (used for checking uncle invalidity)
uncles mapset.Set // uncle set
tcount int // tx count in cycle
gasPool *core.GasPool // available gas used to pack transactions
header *types.Header
txs []*types.Transaction
receipts []*types.Receipt
}
// task contains all information for consensus engine sealing and result submitting.
type task struct {
receipts []*types.Receipt
@ -380,16 +363,20 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
}
func (w *worker) getCommit() (func(ctx consensus.Cancel, noempty bool, s int32), *int64) {
interrupt := new(*int32)
var interrupt atomic.Value
timestamp := new(int64) // timestamp for each round of mining.
return func(ctx consensus.Cancel, noempty bool, s int32) {
if *interrupt != nil {
atomic.StoreInt32(*interrupt, s)
if v := interrupt.Load(); v != nil {
stored := v.(*int32)
atomic.StoreInt32(stored, s)
} else {
interrupt.Store(new(int32))
}
*interrupt = new(int32)
w.newWorkCh <- &newWorkReq{interrupt: *interrupt, noempty: noempty, timestamp: atomic.LoadInt64(timestamp), cancel: consensus.NewCancel()}
v := interrupt.Load().(*int32)
w.newWorkCh <- &newWorkReq{interrupt: v, noempty: noempty, timestamp: atomic.LoadInt64(timestamp), cancel: consensus.NewCancel()}
atomic.StoreInt32(&w.newTxs, 0)
}, timestamp
}
@ -475,14 +462,15 @@ func (w *worker) chainEvents(timestamp *int64, commit func(ctx consensus.Cancel,
"parentHash", head.Block.ParentHash().String(),
)
if head.Block.Number().Cmp(w.current.header.Number) < 0 {
currentNumber := w.current.Number()
if head.Block.Number().Cmp(currentNumber) < 0 {
log.Warn("mining event for an ancestor block",
"eventBlockNumber", head.Block.Number().Uint64(),
"eventBlockHash", head.Block.Hash().String(),
"chainBlockNumber", w.chain.CurrentBlock().Number().Uint64(),
"chainBlockHash", w.chain.CurrentBlock().Hash().String(),
"minerBlockNumber", w.current.header.Number.Uint64(),
"minerBlockHash", w.current.header.Hash().String(),
"minerBlockNumber", currentNumber.Uint64(),
"minerBlockHash", w.current.Hash().String(),
)
continue
}
@ -653,7 +641,13 @@ func (w *worker) insertToChain(result consensus.ResultWithContext, createdAt tim
}
// makeCurrent creates a new environment for the current cycle.
func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
func (w *worker) makeCurrent(ctx consensus.Cancel, parent *types.Block, header *types.Header) error {
select {
case <-ctx.Done():
return errors.New("context is done")
default:
}
stateV, tds, err := GetState(w.chain, parent)
if err != nil {
return err
@ -667,6 +661,8 @@ func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
family: mapset.NewSet(),
uncles: mapset.NewSet(),
header: header,
RWMutex: new(sync.RWMutex),
ctx: ctx,
}
// when 08 is processed ancestors contain 07 (quick block)
@ -680,7 +676,12 @@ func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
// Keep track of transactions which return errors so they can be removed
env.tcount = 0
w.current = env
if w.current == nil {
w.current = env
} else {
w.current.Set(env)
}
return nil
}
@ -690,7 +691,7 @@ func (w *worker) commitUncle(env *environment, uncle *types.Header) error {
if env.uncles.Contains(hash) {
return errors.New("uncle not unique")
}
if env.header.ParentHash == uncle.ParentHash {
if env.ParentHash() == uncle.ParentHash {
return errors.New("uncle is sibling")
}
if !env.ancestors.Contains(uncle.ParentHash) {
@ -724,7 +725,7 @@ func (w *worker) updateSnapshot() {
})
w.snapshotBlock = types.NewBlock(
w.current.header,
w.current.GetHeader(),
w.current.txs,
uncles,
w.current.receipts,
@ -737,13 +738,14 @@ func (w *worker) updateSnapshot() {
func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) {
snap := w.current.state.Snapshot()
receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.tds.TrieStateWriter(), w.current.header, tx, &w.current.header.GasUsed, *w.chain.GetVMConfig())
header := w.current.GetHeader()
receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.tds.TrieStateWriter(), header, tx, &header.GasUsed, *w.chain.GetVMConfig())
if err != nil {
w.current.state.RevertToSnapshot(snap)
return nil, err
}
if !w.chainConfig.IsByzantium(w.current.header.Number) {
if !w.chainConfig.IsByzantium(w.current.Number()) {
w.current.tds.StartNewBuffer()
}
w.current.txs = append(w.current.txs, tx)
@ -758,8 +760,9 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
return true
}
header := w.current.GetHeader()
if w.current.gasPool == nil {
w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit)
w.current.gasPool = new(core.GasPool).AddGas(header.GasLimit)
}
w.current.tds.StartNewBuffer()
@ -775,7 +778,7 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
// Notify resubmit loop to increase resubmitting interval due to too frequent commits.
if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
ratio := float64(w.current.header.GasLimit-w.current.gasPool.Gas()) / float64(w.current.header.GasLimit)
ratio := float64(header.GasLimit-w.current.gasPool.Gas()) / float64(header.GasLimit)
if ratio < 0.1 {
ratio = 0.1
}
@ -803,7 +806,7 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
from, _ := types.Sender(w.current.signer, tx)
// Check whether the tx is replay protected. If we're not in the EIP155 hf
// phase, start ignoring the sender until we do.
if tx.Protected() && !w.chainConfig.IsEIP155(w.current.header.Number) {
if tx.Protected() && !w.chainConfig.IsEIP155(header.Number) {
log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block)
txs.Pop()
@ -868,6 +871,12 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
// commitNewWork generates several new sealing tasks based on the parent block.
func (w *worker) commitNewWork(ctx consensus.Cancel, interrupt *int32, noempty bool, timestamp int64) {
select {
case <-ctx.Done():
return
default:
}
w.mu.RLock()
defer w.mu.RUnlock()
@ -924,7 +933,7 @@ func (w *worker) commitNewWork(ctx consensus.Cancel, interrupt *int32, noempty b
}
}
// Could potentially happen if starting to mine in an odd state.
err := w.makeCurrent(parent, header)
err := w.makeCurrent(ctx, parent, header)
if err != nil {
log.Error("Failed to create mining context", "err", err)
ctx.CancelFunc()
@ -1031,12 +1040,12 @@ func (w *worker) commit(ctx consensus.Cancel, uncles []*types.Header, interval f
s := &(*w.current.state)
block, err := NewBlock(w.engine, s, w.current.tds, w.chain.Config(), w.current.header, w.current.txs, uncles, w.current.receipts)
block, err := NewBlock(w.engine, s, w.current.tds, w.chain.Config(), w.current.GetHeader(), w.current.txs, uncles, w.current.receipts)
if err != nil {
return err
}
w.current.header = block.Header()
w.current.SetHeader(block.Header())
if w.isRunning() {
if interval != nil {