mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-22 03:30:37 +00:00
Nuke old miner and mux (#1666)
This commit is contained in:
parent
e0a2d47139
commit
dbc0db0856
@ -815,8 +815,7 @@ type filterBackend struct {
|
||||
b *SimulatedBackend
|
||||
}
|
||||
|
||||
func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db }
|
||||
func (fb *filterBackend) EventMux() *event.TypeMux { panic("not supported") }
|
||||
func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db }
|
||||
|
||||
func (fb *filterBackend) HeaderByNumber(ctx context.Context, block rpc.BlockNumber) (*types.Header, error) {
|
||||
if block == rpc.LatestBlockNumber {
|
||||
|
@ -24,9 +24,6 @@ import (
|
||||
// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
|
||||
type NewTxsEvent struct{ Txs []*types.Transaction }
|
||||
|
||||
// NewMinedBlockEvent is posted when a block has been imported.
|
||||
type NewMinedBlockEvent struct{ Block *types.Block }
|
||||
|
||||
// RemovedLogsEvent is posted when a reorg happens
|
||||
type RemovedLogsEvent struct{ Logs []*types.Log }
|
||||
|
||||
|
@ -34,7 +34,6 @@ import (
|
||||
"github.com/ledgerwatch/turbo-geth/eth/gasprice"
|
||||
"github.com/ledgerwatch/turbo-geth/ethdb"
|
||||
"github.com/ledgerwatch/turbo-geth/event"
|
||||
"github.com/ledgerwatch/turbo-geth/miner"
|
||||
"github.com/ledgerwatch/turbo-geth/params"
|
||||
"github.com/ledgerwatch/turbo-geth/rpc"
|
||||
)
|
||||
@ -62,12 +61,6 @@ func (b *EthAPIBackend) SetHead(number uint64) {
|
||||
}
|
||||
|
||||
func (b *EthAPIBackend) resolveBlockNumber(blockNr rpc.BlockNumber) uint64 {
|
||||
// Pending block is only known by the miner
|
||||
if blockNr == rpc.PendingBlockNumber {
|
||||
block := b.eth.miner.PendingBlock()
|
||||
return block.NumberU64()
|
||||
}
|
||||
|
||||
if blockNr == rpc.LatestBlockNumber {
|
||||
return b.eth.blockchain.CurrentBlock().NumberU64()
|
||||
}
|
||||
@ -75,12 +68,6 @@ func (b *EthAPIBackend) resolveBlockNumber(blockNr rpc.BlockNumber) uint64 {
|
||||
}
|
||||
|
||||
func (b *EthAPIBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) {
|
||||
// Pending block is only known by the miner
|
||||
if blockNr == rpc.PendingBlockNumber {
|
||||
block := b.eth.miner.PendingBlock()
|
||||
return block.Header(), nil
|
||||
}
|
||||
// Otherwise resolve and return the block
|
||||
bn := b.resolveBlockNumber(blockNr)
|
||||
return b.eth.blockchain.GetHeaderByNumber(bn), nil
|
||||
}
|
||||
@ -107,12 +94,6 @@ func (b *EthAPIBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*ty
|
||||
}
|
||||
|
||||
func (b *EthAPIBackend) BlockByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Block, error) {
|
||||
// Pending block is only known by the miner
|
||||
if blockNr == rpc.PendingBlockNumber {
|
||||
block := b.eth.miner.PendingBlock()
|
||||
return block, nil
|
||||
}
|
||||
// Otherwise resolve and return the block
|
||||
bn := b.resolveBlockNumber(blockNr)
|
||||
return b.eth.blockchain.GetBlockByNumber(bn), nil
|
||||
}
|
||||
@ -143,12 +124,6 @@ func (b *EthAPIBackend) BlockByNumberOrHash(ctx context.Context, blockNrOrHash r
|
||||
}
|
||||
|
||||
func (b *EthAPIBackend) StateAndHeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*state.IntraBlockState, *types.Header, error) {
|
||||
// Pending state is only known by the miner
|
||||
if blockNr == rpc.PendingBlockNumber {
|
||||
block, state, _ := b.eth.miner.Pending()
|
||||
return state, block.Header(), nil
|
||||
}
|
||||
// Otherwise resolve the block number and return its state
|
||||
bn := b.resolveBlockNumber(blockNr)
|
||||
header, err := b.HeaderByNumber(ctx, blockNr)
|
||||
if err != nil {
|
||||
@ -265,10 +240,6 @@ func (b *EthAPIBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEven
|
||||
return b.eth.BlockChain().SubscribeRemovedLogsEvent(ch)
|
||||
}
|
||||
|
||||
func (b *EthAPIBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription {
|
||||
return b.eth.miner.SubscribePendingLogs(ch)
|
||||
}
|
||||
|
||||
func (b *EthAPIBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
|
||||
return b.eth.BlockChain().SubscribeChainEvent(ch)
|
||||
}
|
||||
@ -342,10 +313,6 @@ func (b *EthAPIBackend) ChainDb() ethdb.Database {
|
||||
return b.eth.ChainDb()
|
||||
}
|
||||
|
||||
func (b *EthAPIBackend) EventMux() *event.TypeMux {
|
||||
return b.eth.EventMux()
|
||||
}
|
||||
|
||||
func (b *EthAPIBackend) ExtRPCEnabled() bool {
|
||||
return b.extRPCEnabled
|
||||
}
|
||||
@ -380,10 +347,6 @@ func (b *EthAPIBackend) CurrentHeader() *types.Header {
|
||||
return b.eth.blockchain.CurrentHeader()
|
||||
}
|
||||
|
||||
func (b *EthAPIBackend) Miner() *miner.Miner {
|
||||
return b.eth.Miner()
|
||||
}
|
||||
|
||||
func (b *EthAPIBackend) StartMining(threads int) error {
|
||||
return b.eth.StartMining(threads)
|
||||
}
|
||||
|
@ -56,10 +56,8 @@ import (
|
||||
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
|
||||
"github.com/ledgerwatch/turbo-geth/ethdb"
|
||||
"github.com/ledgerwatch/turbo-geth/ethdb/remote/remotedbserver"
|
||||
"github.com/ledgerwatch/turbo-geth/event"
|
||||
"github.com/ledgerwatch/turbo-geth/internal/ethapi"
|
||||
"github.com/ledgerwatch/turbo-geth/log"
|
||||
"github.com/ledgerwatch/turbo-geth/miner"
|
||||
"github.com/ledgerwatch/turbo-geth/node"
|
||||
"github.com/ledgerwatch/turbo-geth/p2p"
|
||||
"github.com/ledgerwatch/turbo-geth/p2p/enode"
|
||||
@ -91,14 +89,12 @@ type Ethereum struct {
|
||||
chainKV ethdb.RwKV // Same as chainDb, but different interface
|
||||
privateAPI *grpc.Server
|
||||
|
||||
eventMux *event.TypeMux
|
||||
engine consensus.Engine
|
||||
engine consensus.Engine
|
||||
|
||||
bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
|
||||
|
||||
APIBackend *EthAPIBackend
|
||||
|
||||
miner *miner.Miner
|
||||
gasPrice *uint256.Int
|
||||
etherbase common.Address
|
||||
signer *ecdsa.PrivateKey
|
||||
@ -288,7 +284,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
|
||||
config: config,
|
||||
chainDb: chainDb,
|
||||
chainKV: chainDb.(ethdb.HasRwKV).RwKV(),
|
||||
eventMux: stack.EventMux(),
|
||||
engine: ethconfig.CreateConsensusEngine(chainConfig, &config.Ethash, config.Miner.Notify, config.Miner.Noverify, chainDb),
|
||||
networkID: config.NetworkID,
|
||||
etherbase: config.Miner.Etherbase,
|
||||
@ -436,7 +431,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
|
||||
Chain: eth.blockchain,
|
||||
TxPool: eth.txPool,
|
||||
Network: config.NetworkID,
|
||||
EventMux: eth.eventMux,
|
||||
Checkpoint: checkpoint,
|
||||
|
||||
Whitelist: config.Whitelist,
|
||||
@ -638,8 +632,6 @@ func (s *Ethereum) SetEtherbase(etherbase common.Address) {
|
||||
s.lock.Lock()
|
||||
s.etherbase = etherbase
|
||||
s.lock.Unlock()
|
||||
|
||||
s.miner.SetEtherbase(etherbase)
|
||||
}
|
||||
|
||||
// StartMining starts the miner with the given number of CPU threads. If mining
|
||||
@ -684,7 +676,6 @@ func (s *Ethereum) StartMining(threads int) error {
|
||||
// If mining is started, we can disable the transaction rejection mechanism
|
||||
// introduced to speed sync times.
|
||||
atomic.StoreUint32(&s.handler.acceptTxs, 1)
|
||||
//go s.miner.Start(eb)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -701,12 +692,10 @@ func (s *Ethereum) StopMining() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Ethereum) IsMining() bool { return s.config.Miner.Enabled }
|
||||
func (s *Ethereum) Miner() *miner.Miner { return s.miner }
|
||||
func (s *Ethereum) IsMining() bool { return s.config.Miner.Enabled }
|
||||
|
||||
func (s *Ethereum) BlockChain() *core.BlockChain { return s.blockchain }
|
||||
func (s *Ethereum) TxPool() *core.TxPool { return s.txPool }
|
||||
func (s *Ethereum) EventMux() *event.TypeMux { return s.eventMux }
|
||||
func (s *Ethereum) Engine() consensus.Engine { return s.engine }
|
||||
func (s *Ethereum) ChainDb() ethdb.Database { return s.chainDb }
|
||||
func (s *Ethereum) ChainKV() ethdb.RwKV { return s.chainKV }
|
||||
@ -760,7 +749,6 @@ func (s *Ethereum) Stop() error {
|
||||
//s.miner.Stop()
|
||||
s.blockchain.Stop()
|
||||
s.engine.Close()
|
||||
s.eventMux.Stop()
|
||||
if s.txPool != nil {
|
||||
s.txPool.Stop()
|
||||
}
|
||||
|
@ -1,166 +0,0 @@
|
||||
// Copyright 2015 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package downloader
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
ethereum "github.com/ledgerwatch/turbo-geth"
|
||||
"github.com/ledgerwatch/turbo-geth/event"
|
||||
"github.com/ledgerwatch/turbo-geth/rpc"
|
||||
)
|
||||
|
||||
// PublicDownloaderAPI provides an API which gives information about the current synchronisation status.
|
||||
// It offers only methods that operates on data that can be available to anyone without security risks.
|
||||
type PublicDownloaderAPI struct {
|
||||
d *Downloader
|
||||
mux *event.TypeMux
|
||||
installSyncSubscription chan chan interface{}
|
||||
uninstallSyncSubscription chan *uninstallSyncSubscriptionRequest
|
||||
}
|
||||
|
||||
// NewPublicDownloaderAPI create a new PublicDownloaderAPI. The API has an internal event loop that
|
||||
// listens for events from the downloader through the global event mux. In case it receives one of
|
||||
// these events it broadcasts it to all syncing subscriptions that are installed through the
|
||||
// installSyncSubscription channel.
|
||||
func NewPublicDownloaderAPI(d *Downloader, m *event.TypeMux) *PublicDownloaderAPI {
|
||||
api := &PublicDownloaderAPI{
|
||||
d: d,
|
||||
mux: m,
|
||||
installSyncSubscription: make(chan chan interface{}),
|
||||
uninstallSyncSubscription: make(chan *uninstallSyncSubscriptionRequest),
|
||||
}
|
||||
|
||||
go api.eventLoop()
|
||||
|
||||
return api
|
||||
}
|
||||
|
||||
// eventLoop runs a loop until the event mux closes. It will install and uninstall new
|
||||
// sync subscriptions and broadcasts sync status updates to the installed sync subscriptions.
|
||||
func (api *PublicDownloaderAPI) eventLoop() {
|
||||
var (
|
||||
sub = api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{})
|
||||
syncSubscriptions = make(map[chan interface{}]struct{})
|
||||
)
|
||||
|
||||
for {
|
||||
select {
|
||||
case i := <-api.installSyncSubscription:
|
||||
syncSubscriptions[i] = struct{}{}
|
||||
case u := <-api.uninstallSyncSubscription:
|
||||
delete(syncSubscriptions, u.c)
|
||||
close(u.uninstalled)
|
||||
case event := <-sub.Chan():
|
||||
if event == nil {
|
||||
return
|
||||
}
|
||||
|
||||
var notification interface{}
|
||||
switch event.Data.(type) {
|
||||
case StartEvent:
|
||||
notification = &SyncingResult{
|
||||
Syncing: true,
|
||||
Status: api.d.Progress(),
|
||||
}
|
||||
case DoneEvent, FailedEvent:
|
||||
notification = false
|
||||
}
|
||||
// broadcast
|
||||
for c := range syncSubscriptions {
|
||||
c <- notification
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished.
|
||||
func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (*rpc.Subscription, error) {
|
||||
notifier, supported := rpc.NotifierFromContext(ctx)
|
||||
if !supported {
|
||||
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
|
||||
}
|
||||
|
||||
rpcSub := notifier.CreateSubscription()
|
||||
|
||||
go func() {
|
||||
statuses := make(chan interface{})
|
||||
sub := api.SubscribeSyncStatus(statuses)
|
||||
|
||||
for {
|
||||
select {
|
||||
case status := <-statuses:
|
||||
_ = notifier.Notify(rpcSub.ID, status)
|
||||
case <-rpcSub.Err():
|
||||
sub.Unsubscribe()
|
||||
return
|
||||
case <-notifier.Closed():
|
||||
sub.Unsubscribe()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return rpcSub, nil
|
||||
}
|
||||
|
||||
// SyncingResult provides information about the current synchronisation status for this node.
|
||||
type SyncingResult struct {
|
||||
Syncing bool `json:"syncing"`
|
||||
Status ethereum.SyncProgress `json:"status"`
|
||||
}
|
||||
|
||||
// uninstallSyncSubscriptionRequest uninstalles a syncing subscription in the API event loop.
|
||||
type uninstallSyncSubscriptionRequest struct {
|
||||
c chan interface{}
|
||||
uninstalled chan interface{}
|
||||
}
|
||||
|
||||
// SyncStatusSubscription represents a syncing subscription.
|
||||
type SyncStatusSubscription struct {
|
||||
api *PublicDownloaderAPI // register subscription in event loop of this api instance
|
||||
c chan interface{} // channel where events are broadcasted to
|
||||
unsubOnce sync.Once // make sure unsubscribe logic is executed once
|
||||
}
|
||||
|
||||
// Unsubscribe uninstalls the subscription from the DownloadAPI event loop.
|
||||
// The status channel that was passed to subscribeSyncStatus isn't used anymore
|
||||
// after this method returns.
|
||||
func (s *SyncStatusSubscription) Unsubscribe() {
|
||||
s.unsubOnce.Do(func() {
|
||||
req := uninstallSyncSubscriptionRequest{s.c, make(chan interface{})}
|
||||
s.api.uninstallSyncSubscription <- &req
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.c:
|
||||
// drop new status events until uninstall confirmation
|
||||
continue
|
||||
case <-req.uninstalled:
|
||||
return
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// SubscribeSyncStatus creates a subscription that will broadcast new synchronisation updates.
|
||||
// The given channel must receive interface values, the result can either
|
||||
func (api *PublicDownloaderAPI) SubscribeSyncStatus(status chan interface{}) *SyncStatusSubscription {
|
||||
api.installSyncSubscription <- status
|
||||
return &SyncStatusSubscription{api: api, c: status}
|
||||
}
|
@ -37,7 +37,6 @@ import (
|
||||
"github.com/ledgerwatch/turbo-geth/eth/stagedsync"
|
||||
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
|
||||
"github.com/ledgerwatch/turbo-geth/ethdb"
|
||||
"github.com/ledgerwatch/turbo-geth/event"
|
||||
"github.com/ledgerwatch/turbo-geth/log"
|
||||
"github.com/ledgerwatch/turbo-geth/params"
|
||||
"github.com/ledgerwatch/turbo-geth/turbo/shards"
|
||||
@ -105,8 +104,6 @@ type Downloader struct {
|
||||
rttEstimate uint64 // Round trip time to target for download requests
|
||||
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
|
||||
|
||||
mux *event.TypeMux // Event multiplexer to announce sync operation events
|
||||
|
||||
queue *queue // Scheduler for selecting the hashes to download
|
||||
peers *peerSet // Set of active peers from which download can proceed
|
||||
|
||||
@ -234,10 +231,9 @@ type BlockChain interface {
|
||||
}
|
||||
|
||||
// New creates a new downloader to fetch hashes and blocks from remote peers.
|
||||
func New(stateDB ethdb.Database, mux *event.TypeMux, chainConfig *params.ChainConfig, miningConfig *params.MiningConfig, chain BlockChain, dropPeer peerDropFn, sm ethdb.StorageMode) *Downloader {
|
||||
func New(stateDB ethdb.Database, chainConfig *params.ChainConfig, miningConfig *params.MiningConfig, chain BlockChain, dropPeer peerDropFn, sm ethdb.StorageMode) *Downloader {
|
||||
dl := &Downloader{
|
||||
stateDB: stateDB,
|
||||
mux: mux,
|
||||
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
|
||||
peers: newPeerSet(),
|
||||
rttEstimate: uint64(rttMaxEstimate),
|
||||
@ -444,16 +440,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, blockNumber uint64
|
||||
// syncWithPeer starts a block synchronization based on the hash chain from the
|
||||
// specified peer and head hash.s
|
||||
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, blockNumber uint64, txPool *core.TxPool, poolStart func() error) (err error) {
|
||||
_ = d.mux.Post(StartEvent{})
|
||||
defer func() {
|
||||
// reset on error
|
||||
if err != nil {
|
||||
_ = d.mux.Post(FailedEvent{err})
|
||||
} else {
|
||||
latest := d.blockchain.CurrentHeader()
|
||||
_ = d.mux.Post(DoneEvent{latest})
|
||||
}
|
||||
}()
|
||||
if p.version < 64 {
|
||||
return fmt.Errorf("%w: advertized %d < required %d", errTooOld, p.version, 64)
|
||||
}
|
||||
|
@ -18,7 +18,6 @@ import (
|
||||
"github.com/ledgerwatch/turbo-geth/core/vm"
|
||||
"github.com/ledgerwatch/turbo-geth/eth/stagedsync"
|
||||
"github.com/ledgerwatch/turbo-geth/ethdb"
|
||||
"github.com/ledgerwatch/turbo-geth/event"
|
||||
"github.com/ledgerwatch/turbo-geth/log"
|
||||
"github.com/ledgerwatch/turbo-geth/params"
|
||||
)
|
||||
@ -45,7 +44,7 @@ func newStagedSyncTester() (*stagedSyncTester, func()) {
|
||||
if err := rawdb.WriteBlock(context.Background(), tester.db, testGenesis); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
tester.downloader = New(tester.db, new(event.TypeMux), params.TestChainConfig, nil, tester, tester.dropPeer, ethdb.DefaultStorageMode)
|
||||
tester.downloader = New(tester.db, params.TestChainConfig, nil, tester, tester.dropPeer, ethdb.DefaultStorageMode)
|
||||
//tester.downloader.SetBatchSize(32*1024 /* cacheSize */, 16*1024 /* batchSize */)
|
||||
tester.downloader.SetBatchSize(0 /* cacheSize */, 16*1024 /* batchSize */)
|
||||
tester.downloader.SetStagedSync(
|
||||
|
@ -80,7 +80,6 @@ type handlerConfig struct {
|
||||
TxPool txPool // Transaction pool to propagate from
|
||||
Network uint64 // Network identifier to adfvertise
|
||||
BloomCache uint64 // Megabytes to alloc for fast sync bloom
|
||||
EventMux *event.TypeMux // Legacy event mux, deprecate for `feed`
|
||||
Checkpoint *params.TrustedCheckpoint // Hard coded checkpoint for sync challenges
|
||||
Whitelist map[uint64]common.Hash // Hard coded whitelist for sync challenged
|
||||
Mining *params.MiningConfig
|
||||
@ -102,10 +101,8 @@ type handler struct {
|
||||
txFetcher *fetcher.TxFetcher
|
||||
peers *peerSet
|
||||
|
||||
eventMux *event.TypeMux
|
||||
txsCh chan core.NewTxsEvent
|
||||
txsSub event.Subscription
|
||||
minedBlockSub *event.TypeMuxSubscription
|
||||
txsCh chan core.NewTxsEvent
|
||||
txsSub event.Subscription
|
||||
|
||||
whitelist map[uint64]common.Hash
|
||||
|
||||
@ -127,13 +124,8 @@ type handler struct {
|
||||
|
||||
// newHandler returns a handler for all Ethereum chain management protocol.
|
||||
func newHandler(config *handlerConfig) (*handler, error) { //nolint:unparam
|
||||
// Create the protocol manager with the base fields
|
||||
if config.EventMux == nil {
|
||||
config.EventMux = new(event.TypeMux) // Nicety initialization for tests
|
||||
}
|
||||
h := &handler{
|
||||
networkID: config.Network,
|
||||
eventMux: config.EventMux,
|
||||
database: config.Database,
|
||||
txpool: config.TxPool,
|
||||
chain: config.Chain,
|
||||
@ -158,7 +150,7 @@ func newHandler(config *handlerConfig) (*handler, error) { //nolint:unparam
|
||||
if err != nil {
|
||||
log.Error("Get storage mode", "err", err)
|
||||
}
|
||||
h.downloader = downloader.New(config.Database, h.eventMux, config.Chain.Config(), config.Mining, config.Chain, h.removePeer, sm)
|
||||
h.downloader = downloader.New(config.Database, config.Chain.Config(), config.Mining, config.Chain, h.removePeer, sm)
|
||||
h.downloader.SetTmpDir(h.tmpdir)
|
||||
h.downloader.SetBatchSize(h.cacheSize, h.batchSize)
|
||||
|
||||
@ -325,11 +317,6 @@ func (h *handler) Start(maxPeers int) {
|
||||
h.txsSub = h.txpool.SubscribeNewTxsEvent(h.txsCh)
|
||||
go h.txBroadcastLoop()
|
||||
|
||||
// broadcast mined blocks
|
||||
h.wg.Add(1)
|
||||
h.minedBlockSub = h.eventMux.Subscribe(core.NewMinedBlockEvent{})
|
||||
go h.minedBroadcastLoop()
|
||||
|
||||
// start sync handlers
|
||||
h.wg.Add(2)
|
||||
go h.chainSync.loop()
|
||||
@ -337,8 +324,7 @@ func (h *handler) Start(maxPeers int) {
|
||||
}
|
||||
|
||||
func (h *handler) Stop() {
|
||||
h.txsSub.Unsubscribe() // quits txBroadcastLoop
|
||||
h.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
|
||||
h.txsSub.Unsubscribe() // quits txBroadcastLoop
|
||||
|
||||
// Quit chainSync and txsync64.
|
||||
// After this is done, no new peers will be accepted.
|
||||
@ -435,18 +421,6 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
|
||||
"tx packs", directPeers, "broadcast txs", directCount)
|
||||
}
|
||||
|
||||
// minedBroadcastLoop sends mined blocks to connected peers.
|
||||
func (h *handler) minedBroadcastLoop() {
|
||||
defer h.wg.Done()
|
||||
|
||||
for obj := range h.minedBlockSub.Chan() {
|
||||
if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
|
||||
h.BroadcastBlock(ev.Block, true) // First propagate block to peers
|
||||
h.BroadcastBlock(ev.Block, false) // Only then announce to the rest
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// txBroadcastLoop announces new transactions to connected peers.
|
||||
func (h *handler) txBroadcastLoop() {
|
||||
defer h.wg.Done()
|
||||
|
215
event/event.go
215
event/event.go
@ -1,215 +0,0 @@
|
||||
// Copyright 2014 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
// Package event deals with subscriptions to real-time events.
|
||||
package event
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TypeMuxEvent is a time-tagged notification pushed to subscribers.
|
||||
type TypeMuxEvent struct {
|
||||
Time time.Time
|
||||
Data interface{}
|
||||
}
|
||||
|
||||
// A TypeMux dispatches events to registered receivers. Receivers can be
|
||||
// registered to handle events of certain type. Any operation
|
||||
// called after mux is stopped will return ErrMuxClosed.
|
||||
//
|
||||
// The zero value is ready to use.
|
||||
type TypeMux struct {
|
||||
mutex sync.RWMutex
|
||||
subm map[reflect.Type][]*TypeMuxSubscription
|
||||
stopped bool
|
||||
}
|
||||
|
||||
// ErrMuxClosed is returned when Posting on a closed TypeMux.
|
||||
var ErrMuxClosed = errors.New("event: mux closed")
|
||||
|
||||
// Subscribe creates a subscription for events of the given types. The
|
||||
// subscription's channel is closed when it is unsubscribed
|
||||
// or the mux is closed.
|
||||
func (mux *TypeMux) Subscribe(types ...interface{}) *TypeMuxSubscription {
|
||||
sub := newsub(mux)
|
||||
mux.mutex.Lock()
|
||||
defer mux.mutex.Unlock()
|
||||
if mux.stopped {
|
||||
// set the status to closed so that calling Unsubscribe after this
|
||||
// call will short circuit.
|
||||
sub.closed = true
|
||||
close(sub.postC)
|
||||
} else {
|
||||
if mux.subm == nil {
|
||||
mux.subm = make(map[reflect.Type][]*TypeMuxSubscription)
|
||||
}
|
||||
for _, t := range types {
|
||||
rtyp := reflect.TypeOf(t)
|
||||
oldsubs := mux.subm[rtyp]
|
||||
if find(oldsubs, sub) != -1 {
|
||||
panic(fmt.Sprintf("event: duplicate type %s in Subscribe", rtyp))
|
||||
}
|
||||
subs := make([]*TypeMuxSubscription, len(oldsubs)+1)
|
||||
copy(subs, oldsubs)
|
||||
subs[len(oldsubs)] = sub
|
||||
mux.subm[rtyp] = subs
|
||||
}
|
||||
}
|
||||
return sub
|
||||
}
|
||||
|
||||
// Post sends an event to all receivers registered for the given type.
|
||||
// It returns ErrMuxClosed if the mux has been stopped.
|
||||
func (mux *TypeMux) Post(ev interface{}) error {
|
||||
event := &TypeMuxEvent{
|
||||
Time: time.Now(),
|
||||
Data: ev,
|
||||
}
|
||||
rtyp := reflect.TypeOf(ev)
|
||||
mux.mutex.RLock()
|
||||
if mux.stopped {
|
||||
mux.mutex.RUnlock()
|
||||
return ErrMuxClosed
|
||||
}
|
||||
subs := mux.subm[rtyp]
|
||||
mux.mutex.RUnlock()
|
||||
for _, sub := range subs {
|
||||
sub.deliver(event)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop closes a mux. The mux can no longer be used.
|
||||
// Future Post calls will fail with ErrMuxClosed.
|
||||
// Stop blocks until all current deliveries have finished.
|
||||
func (mux *TypeMux) Stop() {
|
||||
mux.mutex.Lock()
|
||||
defer mux.mutex.Unlock()
|
||||
for _, subs := range mux.subm {
|
||||
for _, sub := range subs {
|
||||
sub.closewait()
|
||||
}
|
||||
}
|
||||
mux.subm = nil
|
||||
mux.stopped = true
|
||||
}
|
||||
|
||||
func (mux *TypeMux) del(s *TypeMuxSubscription) {
|
||||
mux.mutex.Lock()
|
||||
defer mux.mutex.Unlock()
|
||||
for typ, subs := range mux.subm {
|
||||
if pos := find(subs, s); pos >= 0 {
|
||||
if len(subs) == 1 {
|
||||
delete(mux.subm, typ)
|
||||
} else {
|
||||
mux.subm[typ] = posdelete(subs, pos)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func find(slice []*TypeMuxSubscription, item *TypeMuxSubscription) int {
|
||||
for i, v := range slice {
|
||||
if v == item {
|
||||
return i
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
func posdelete(slice []*TypeMuxSubscription, pos int) []*TypeMuxSubscription {
|
||||
news := make([]*TypeMuxSubscription, len(slice)-1)
|
||||
copy(news[:pos], slice[:pos])
|
||||
copy(news[pos:], slice[pos+1:])
|
||||
return news
|
||||
}
|
||||
|
||||
// TypeMuxSubscription is a subscription established through TypeMux.
|
||||
type TypeMuxSubscription struct {
|
||||
mux *TypeMux
|
||||
created time.Time
|
||||
closeMu sync.Mutex
|
||||
closing chan struct{}
|
||||
closed bool
|
||||
|
||||
// these two are the same channel. they are stored separately so
|
||||
// postC can be set to nil without affecting the return value of
|
||||
// Chan.
|
||||
postMu sync.RWMutex
|
||||
readC <-chan *TypeMuxEvent
|
||||
postC chan<- *TypeMuxEvent
|
||||
}
|
||||
|
||||
func newsub(mux *TypeMux) *TypeMuxSubscription {
|
||||
c := make(chan *TypeMuxEvent)
|
||||
return &TypeMuxSubscription{
|
||||
mux: mux,
|
||||
created: time.Now(),
|
||||
readC: c,
|
||||
postC: c,
|
||||
closing: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *TypeMuxSubscription) Chan() <-chan *TypeMuxEvent {
|
||||
return s.readC
|
||||
}
|
||||
|
||||
func (s *TypeMuxSubscription) Unsubscribe() {
|
||||
s.mux.del(s)
|
||||
s.closewait()
|
||||
}
|
||||
|
||||
func (s *TypeMuxSubscription) Closed() bool {
|
||||
s.closeMu.Lock()
|
||||
defer s.closeMu.Unlock()
|
||||
return s.closed
|
||||
}
|
||||
|
||||
func (s *TypeMuxSubscription) closewait() {
|
||||
s.closeMu.Lock()
|
||||
defer s.closeMu.Unlock()
|
||||
if s.closed {
|
||||
return
|
||||
}
|
||||
close(s.closing)
|
||||
s.closed = true
|
||||
|
||||
s.postMu.Lock()
|
||||
defer s.postMu.Unlock()
|
||||
close(s.postC)
|
||||
s.postC = nil
|
||||
}
|
||||
|
||||
func (s *TypeMuxSubscription) deliver(event *TypeMuxEvent) {
|
||||
// Short circuit delivery if stale event
|
||||
if s.created.After(event.Time) {
|
||||
return
|
||||
}
|
||||
// Otherwise deliver the event
|
||||
s.postMu.RLock()
|
||||
defer s.postMu.RUnlock()
|
||||
|
||||
select {
|
||||
case s.postC <- event:
|
||||
case <-s.closing:
|
||||
}
|
||||
}
|
@ -1,219 +0,0 @@
|
||||
// Copyright 2014 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package event
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
type testEvent int
|
||||
|
||||
func TestSubCloseUnsub(t *testing.T) {
|
||||
// the point of this test is **not** to panic
|
||||
var mux TypeMux
|
||||
mux.Stop()
|
||||
sub := mux.Subscribe(0)
|
||||
sub.Unsubscribe()
|
||||
}
|
||||
|
||||
func TestSub(t *testing.T) {
|
||||
mux := new(TypeMux)
|
||||
defer mux.Stop()
|
||||
|
||||
sub := mux.Subscribe(testEvent(0))
|
||||
go func() {
|
||||
if err := mux.Post(testEvent(5)); err != nil {
|
||||
t.Errorf("Post returned unexpected error: %v", err)
|
||||
}
|
||||
}()
|
||||
ev := <-sub.Chan()
|
||||
|
||||
if ev.Data.(testEvent) != testEvent(5) {
|
||||
t.Errorf("Got %v (%T), expected event %v (%T)",
|
||||
ev, ev, testEvent(5), testEvent(5))
|
||||
}
|
||||
}
|
||||
|
||||
func TestMuxErrorAfterStop(t *testing.T) {
|
||||
mux := new(TypeMux)
|
||||
mux.Stop()
|
||||
|
||||
sub := mux.Subscribe(testEvent(0))
|
||||
if _, isopen := <-sub.Chan(); isopen {
|
||||
t.Errorf("subscription channel was not closed")
|
||||
}
|
||||
if err := mux.Post(testEvent(0)); err != ErrMuxClosed {
|
||||
t.Errorf("Post error mismatch, got: %s, expected: %s", err, ErrMuxClosed)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnsubscribeUnblockPost(t *testing.T) {
|
||||
mux := new(TypeMux)
|
||||
defer mux.Stop()
|
||||
|
||||
sub := mux.Subscribe(testEvent(0))
|
||||
unblocked := make(chan bool)
|
||||
go func() {
|
||||
mux.Post(testEvent(5))
|
||||
unblocked <- true
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-unblocked:
|
||||
t.Errorf("Post returned before Unsubscribe")
|
||||
default:
|
||||
sub.Unsubscribe()
|
||||
<-unblocked
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubscribeDuplicateType(t *testing.T) {
|
||||
mux := new(TypeMux)
|
||||
expected := "event: duplicate type event.testEvent in Subscribe"
|
||||
|
||||
defer func() {
|
||||
err := recover()
|
||||
if err == nil {
|
||||
t.Errorf("Subscribe didn't panic for duplicate type")
|
||||
} else if err != expected {
|
||||
t.Errorf("panic mismatch: got %#v, expected %#v", err, expected)
|
||||
}
|
||||
}()
|
||||
mux.Subscribe(testEvent(1), testEvent(2))
|
||||
}
|
||||
|
||||
func TestMuxConcurrent(t *testing.T) {
|
||||
rand.Seed(time.Now().Unix())
|
||||
mux := new(TypeMux)
|
||||
defer mux.Stop()
|
||||
|
||||
recv := make(chan int)
|
||||
poster := func() {
|
||||
for {
|
||||
err := mux.Post(testEvent(0))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
sub := func(i int) {
|
||||
time.Sleep(time.Duration(rand.Intn(99)) * time.Millisecond)
|
||||
sub := mux.Subscribe(testEvent(0))
|
||||
<-sub.Chan()
|
||||
sub.Unsubscribe()
|
||||
recv <- i
|
||||
}
|
||||
|
||||
go poster()
|
||||
go poster()
|
||||
go poster()
|
||||
nsubs := 1000
|
||||
for i := 0; i < nsubs; i++ {
|
||||
go sub(i)
|
||||
}
|
||||
|
||||
// wait until everyone has been served
|
||||
counts := make(map[int]int, nsubs)
|
||||
for i := 0; i < nsubs; i++ {
|
||||
counts[<-recv]++
|
||||
}
|
||||
for i, count := range counts {
|
||||
if count != 1 {
|
||||
t.Errorf("receiver %d called %d times, expected only 1 call", i, count)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func emptySubscriber(mux *TypeMux) {
|
||||
s := mux.Subscribe(testEvent(0))
|
||||
go func() {
|
||||
for range s.Chan() {
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func BenchmarkPost1000(b *testing.B) {
|
||||
var (
|
||||
mux = new(TypeMux)
|
||||
subscribed, done sync.WaitGroup
|
||||
nsubs = 1000
|
||||
)
|
||||
subscribed.Add(nsubs)
|
||||
done.Add(nsubs)
|
||||
for i := 0; i < nsubs; i++ {
|
||||
go func() {
|
||||
s := mux.Subscribe(testEvent(0))
|
||||
subscribed.Done()
|
||||
for range s.Chan() {
|
||||
}
|
||||
done.Done()
|
||||
}()
|
||||
}
|
||||
subscribed.Wait()
|
||||
|
||||
// The actual benchmark.
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
mux.Post(testEvent(0))
|
||||
}
|
||||
|
||||
b.StopTimer()
|
||||
mux.Stop()
|
||||
done.Wait()
|
||||
}
|
||||
|
||||
func BenchmarkPostConcurrent(b *testing.B) {
|
||||
var mux = new(TypeMux)
|
||||
defer mux.Stop()
|
||||
emptySubscriber(mux)
|
||||
emptySubscriber(mux)
|
||||
emptySubscriber(mux)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
poster := func() {
|
||||
for i := 0; i < b.N; i++ {
|
||||
mux.Post(testEvent(0))
|
||||
}
|
||||
wg.Done()
|
||||
}
|
||||
wg.Add(5)
|
||||
for i := 0; i < 5; i++ {
|
||||
go poster()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// for comparison
|
||||
func BenchmarkChanSend(b *testing.B) {
|
||||
c := make(chan interface{})
|
||||
defer close(c)
|
||||
closed := make(chan struct{})
|
||||
go func() {
|
||||
for range c {
|
||||
}
|
||||
}()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
select {
|
||||
case c <- i:
|
||||
case <-closed:
|
||||
}
|
||||
}
|
||||
}
|
@ -1,58 +0,0 @@
|
||||
// Copyright 2014 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package event
|
||||
|
||||
import "fmt"
|
||||
|
||||
func ExampleTypeMux() {
|
||||
type someEvent struct{ I int }
|
||||
type otherEvent struct{ S string }
|
||||
type yetAnotherEvent struct{ X, Y int }
|
||||
|
||||
var mux TypeMux
|
||||
|
||||
// Start a subscriber.
|
||||
done := make(chan struct{})
|
||||
sub := mux.Subscribe(someEvent{}, otherEvent{})
|
||||
go func() {
|
||||
for event := range sub.Chan() {
|
||||
fmt.Printf("Received: %#v\n", event.Data)
|
||||
}
|
||||
fmt.Println("done")
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// Post some events.
|
||||
mux.Post(someEvent{5})
|
||||
mux.Post(yetAnotherEvent{X: 3, Y: 4})
|
||||
mux.Post(someEvent{6})
|
||||
mux.Post(otherEvent{"whoa"})
|
||||
|
||||
// Stop closes all subscription channels.
|
||||
// The subscriber goroutine will print "done"
|
||||
// and exit.
|
||||
mux.Stop()
|
||||
|
||||
// Wait for subscriber to return.
|
||||
<-done
|
||||
|
||||
// Output:
|
||||
// Received: event.someEvent{I:5}
|
||||
// Received: event.someEvent{I:6}
|
||||
// Received: event.otherEvent{S:"whoa"}
|
||||
// done
|
||||
}
|
@ -82,7 +82,6 @@ type Backend interface {
|
||||
GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error)
|
||||
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
|
||||
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
|
||||
SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
|
||||
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
|
||||
|
||||
ChainConfig() *params.ChainConfig
|
||||
|
@ -1,82 +0,0 @@
|
||||
package miner
|
||||
|
||||
import (
|
||||
"math/big"
|
||||
"sync"
|
||||
|
||||
mapset "github.com/deckarep/golang-set"
|
||||
|
||||
"github.com/ledgerwatch/turbo-geth/common"
|
||||
"github.com/ledgerwatch/turbo-geth/consensus"
|
||||
"github.com/ledgerwatch/turbo-geth/core"
|
||||
"github.com/ledgerwatch/turbo-geth/core/state"
|
||||
"github.com/ledgerwatch/turbo-geth/core/types"
|
||||
)
|
||||
|
||||
// environment is the worker's current environment and holds all of the current state information.
|
||||
type environment struct {
|
||||
signer types.Signer
|
||||
|
||||
state *state.IntraBlockState // apply state changes here
|
||||
tds *state.TrieDbState
|
||||
ancestors mapset.Set // ancestor set (used for checking uncle parent validity)
|
||||
family mapset.Set // family set (used for checking uncle invalidity)
|
||||
uncles mapset.Set // uncle set
|
||||
tcount int // tx count in cycle
|
||||
gasPool *core.GasPool // available gas used to pack transactions
|
||||
|
||||
*sync.RWMutex
|
||||
header *types.Header
|
||||
txs []*types.Transaction
|
||||
receipts []*types.Receipt
|
||||
ctx consensus.Cancel
|
||||
}
|
||||
|
||||
func (e *environment) Number() *big.Int {
|
||||
e.RLock()
|
||||
defer e.RUnlock()
|
||||
|
||||
return big.NewInt(0).Set(e.header.Number)
|
||||
}
|
||||
|
||||
func (e *environment) Hash() common.Hash {
|
||||
e.RLock()
|
||||
defer e.RUnlock()
|
||||
|
||||
return e.header.Hash()
|
||||
}
|
||||
|
||||
func (e *environment) ParentHash() common.Hash {
|
||||
e.RLock()
|
||||
defer e.RUnlock()
|
||||
|
||||
return e.header.ParentHash
|
||||
}
|
||||
|
||||
func (e *environment) SetHeader(h *types.Header) {
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
|
||||
e.header = h
|
||||
}
|
||||
|
||||
func (e *environment) GetHeader() *types.Header {
|
||||
e.RLock()
|
||||
defer e.RUnlock()
|
||||
|
||||
return types.CopyHeader(e.header)
|
||||
}
|
||||
|
||||
func (e *environment) Set(env *environment) {
|
||||
em := e.RWMutex
|
||||
envm := env.RWMutex
|
||||
em.Lock()
|
||||
envm.Lock()
|
||||
defer func() {
|
||||
em.Unlock()
|
||||
envm.Unlock()
|
||||
}()
|
||||
|
||||
*e = *env
|
||||
e.RWMutex = envm
|
||||
}
|
209
miner/miner.go
209
miner/miner.go
@ -1,209 +0,0 @@
|
||||
// Copyright 2014 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
// Package miner implements Ethereum block creation and mining.
|
||||
package miner
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/ledgerwatch/turbo-geth/common"
|
||||
"github.com/ledgerwatch/turbo-geth/consensus"
|
||||
"github.com/ledgerwatch/turbo-geth/core"
|
||||
"github.com/ledgerwatch/turbo-geth/core/state"
|
||||
"github.com/ledgerwatch/turbo-geth/core/types"
|
||||
"github.com/ledgerwatch/turbo-geth/eth/downloader"
|
||||
"github.com/ledgerwatch/turbo-geth/event"
|
||||
"github.com/ledgerwatch/turbo-geth/log"
|
||||
"github.com/ledgerwatch/turbo-geth/params"
|
||||
)
|
||||
|
||||
// Backend wraps all methods required for mining.
|
||||
type Backend interface {
|
||||
BlockChain() *core.BlockChain
|
||||
TxPool() *core.TxPool
|
||||
}
|
||||
|
||||
// Miner creates blocks and searches for proof-of-work values.
|
||||
type Miner struct {
|
||||
mux *event.TypeMux
|
||||
worker *worker
|
||||
coinbase common.Address
|
||||
eth Backend
|
||||
engine consensus.Engine
|
||||
exitCh chan struct{}
|
||||
startCh chan common.Address
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
func New(eth Backend, config *params.MiningConfig, chainConfig *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, isLocalBlock func(block *types.Block) bool) *Miner {
|
||||
miner := &Miner{
|
||||
eth: eth,
|
||||
mux: mux,
|
||||
engine: engine,
|
||||
exitCh: make(chan struct{}),
|
||||
startCh: make(chan common.Address),
|
||||
stopCh: make(chan struct{}),
|
||||
worker: newWorker(config, chainConfig, engine, eth, mux, hooks{isLocalBlock: isLocalBlock}, false),
|
||||
}
|
||||
go miner.update()
|
||||
|
||||
return miner
|
||||
}
|
||||
|
||||
// update keeps track of the downloader events. Please be aware that this is a one shot type of update loop.
|
||||
// It's entered once and as soon as `Done` or `Failed` has been broadcasted the events are unregistered and
|
||||
// the loop is exited. This to prevent a major security vuln where external parties can DOS you with blocks
|
||||
// and halt your mining operation for as long as the DOS continues.
|
||||
func (miner *Miner) update() {
|
||||
events := miner.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
|
||||
defer func() {
|
||||
if !events.Closed() {
|
||||
events.Unsubscribe()
|
||||
}
|
||||
}()
|
||||
|
||||
shouldStart := false
|
||||
canStart := true
|
||||
dlEventCh := events.Chan()
|
||||
for {
|
||||
select {
|
||||
case ev := <-dlEventCh:
|
||||
if ev == nil {
|
||||
// Unsubscription done, stop listening
|
||||
dlEventCh = nil
|
||||
continue
|
||||
}
|
||||
switch ev.Data.(type) {
|
||||
case downloader.StartEvent:
|
||||
wasMining := miner.Mining()
|
||||
miner.worker.stop()
|
||||
canStart = false
|
||||
if wasMining {
|
||||
// Resume mining after sync was finished
|
||||
shouldStart = true
|
||||
log.Info("Mining aborted due to sync")
|
||||
}
|
||||
case downloader.FailedEvent:
|
||||
canStart = true
|
||||
if shouldStart {
|
||||
miner.SetEtherbase(miner.coinbase)
|
||||
miner.worker.start()
|
||||
}
|
||||
case downloader.DoneEvent:
|
||||
canStart = true
|
||||
if shouldStart {
|
||||
miner.SetEtherbase(miner.coinbase)
|
||||
miner.worker.start()
|
||||
}
|
||||
// Stop reacting to downloader events
|
||||
events.Unsubscribe()
|
||||
}
|
||||
case addr := <-miner.startCh:
|
||||
miner.SetEtherbase(addr)
|
||||
if canStart {
|
||||
miner.worker.start()
|
||||
}
|
||||
shouldStart = true
|
||||
case <-miner.stopCh:
|
||||
shouldStart = false
|
||||
miner.worker.stop()
|
||||
case <-miner.exitCh:
|
||||
miner.worker.close()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (miner *Miner) Start(coinbase common.Address) {
|
||||
miner.startCh <- coinbase
|
||||
}
|
||||
|
||||
func (miner *Miner) Stop() {
|
||||
miner.stopCh <- struct{}{}
|
||||
}
|
||||
|
||||
func (miner *Miner) Close() {
|
||||
close(miner.exitCh)
|
||||
}
|
||||
|
||||
func (miner *Miner) Mining() bool {
|
||||
return miner.worker.isRunning()
|
||||
}
|
||||
|
||||
func (miner *Miner) HashRate() uint64 {
|
||||
if pow, ok := miner.engine.(consensus.PoW); ok {
|
||||
return uint64(pow.Hashrate())
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (miner *Miner) SetExtra(extra []byte) error {
|
||||
if uint64(len(extra)) > params.MaximumExtraDataSize {
|
||||
return fmt.Errorf("extra exceeds max length. %d > %v", len(extra), params.MaximumExtraDataSize)
|
||||
}
|
||||
miner.worker.setExtra(extra)
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetRecommitInterval sets the interval for sealing work resubmitting.
|
||||
func (miner *Miner) SetRecommitInterval(interval time.Duration) {
|
||||
miner.worker.setRecommitInterval(interval)
|
||||
}
|
||||
|
||||
// Pending returns the currently pending block and associated state.
|
||||
func (miner *Miner) Pending() (*types.Block, *state.IntraBlockState, *state.TrieDbState) {
|
||||
return miner.worker.pending()
|
||||
}
|
||||
|
||||
// PendingBlock returns the currently pending block.
|
||||
//
|
||||
// Note, to access both the pending block and the pending state
|
||||
// simultaneously, please use Pending(), as the pending state can
|
||||
// change between multiple method calls
|
||||
|
||||
func (miner *Miner) PendingBlock() *types.Block {
|
||||
return miner.worker.pendingBlock()
|
||||
}
|
||||
|
||||
func (miner *Miner) SetEtherbase(addr common.Address) {
|
||||
miner.coinbase = addr
|
||||
miner.worker.setEtherbase(addr)
|
||||
}
|
||||
|
||||
// EnablePreseal turns on the preseal mining feature. It's enabled by default.
|
||||
// Note this function shouldn't be exposed to API, it's unnecessary for users
|
||||
// (miners) to actually know the underlying detail. It's only for outside project
|
||||
// which uses this library.
|
||||
func (miner *Miner) EnablePreseal() {
|
||||
miner.worker.enablePreseal()
|
||||
}
|
||||
|
||||
// DisablePreseal turns off the preseal mining feature. It's necessary for some
|
||||
// fake consensus engine which can seal blocks instantaneously.
|
||||
// Note this function shouldn't be exposed to API, it's unnecessary for users
|
||||
// (miners) to actually know the underlying detail. It's only for outside project
|
||||
// which uses this library.
|
||||
func (miner *Miner) DisablePreseal() {
|
||||
miner.worker.disablePreseal()
|
||||
}
|
||||
|
||||
// SubscribePendingLogs starts delivering logs from pending transactions
|
||||
// to the given channel.
|
||||
func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscription {
|
||||
return miner.worker.pendingLogsFeed.Subscribe(ch)
|
||||
}
|
@ -1,177 +0,0 @@
|
||||
// Copyright 2020 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
// Package miner implements Ethereum block creation and mining.
|
||||
package miner
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ledgerwatch/turbo-geth/common"
|
||||
"github.com/ledgerwatch/turbo-geth/consensus/clique"
|
||||
"github.com/ledgerwatch/turbo-geth/core"
|
||||
"github.com/ledgerwatch/turbo-geth/core/vm"
|
||||
"github.com/ledgerwatch/turbo-geth/eth/downloader"
|
||||
"github.com/ledgerwatch/turbo-geth/ethdb"
|
||||
"github.com/ledgerwatch/turbo-geth/event"
|
||||
"github.com/ledgerwatch/turbo-geth/params"
|
||||
)
|
||||
|
||||
type mockBackend struct {
|
||||
bc *core.BlockChain
|
||||
txPool *core.TxPool
|
||||
}
|
||||
|
||||
func NewMockBackend(bc *core.BlockChain, txPool *core.TxPool) *mockBackend {
|
||||
return &mockBackend{
|
||||
bc: bc,
|
||||
txPool: txPool,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockBackend) BlockChain() *core.BlockChain {
|
||||
return m.bc
|
||||
}
|
||||
|
||||
func (m *mockBackend) TxPool() *core.TxPool {
|
||||
return m.txPool
|
||||
}
|
||||
|
||||
// TestMinerDownloaderFirstFails tests that mining is only
|
||||
// permitted to run indefinitely once the downloader sees a DoneEvent (success).
|
||||
// An initial FailedEvent should allow mining to stop on a subsequent
|
||||
// downloader StartEvent.
|
||||
func TestMinerDownloaderFirstFails(t *testing.T) {
|
||||
miner, mux := createMiner(t)
|
||||
miner.Start(common.HexToAddress("0x12345"))
|
||||
waitForMiningState(t, miner, true)
|
||||
// Start the downloader
|
||||
mux.Post(downloader.StartEvent{}) //nolint:errcheck
|
||||
waitForMiningState(t, miner, false)
|
||||
|
||||
// Stop the downloader and wait for the update loop to run
|
||||
mux.Post(downloader.FailedEvent{}) //nolint:errcheck
|
||||
waitForMiningState(t, miner, true)
|
||||
|
||||
// Since the downloader hasn't yet emitted a successful DoneEvent,
|
||||
// we expect the miner to stop on next StartEvent.
|
||||
mux.Post(downloader.StartEvent{}) //nolint:errcheck
|
||||
waitForMiningState(t, miner, false)
|
||||
|
||||
// Downloader finally succeeds.
|
||||
mux.Post(downloader.DoneEvent{}) //nolint:errcheck
|
||||
waitForMiningState(t, miner, true)
|
||||
|
||||
// Downloader starts again.
|
||||
// Since it has achieved a DoneEvent once, we expect miner
|
||||
// state to be unchanged.
|
||||
mux.Post(downloader.StartEvent{}) //nolint:errcheck
|
||||
waitForMiningState(t, miner, true)
|
||||
|
||||
mux.Post(downloader.FailedEvent{}) //nolint:errcheck
|
||||
waitForMiningState(t, miner, true)
|
||||
}
|
||||
|
||||
func TestMinerStartStopAfterDownloaderEvents(t *testing.T) {
|
||||
miner, mux := createMiner(t)
|
||||
|
||||
miner.Start(common.HexToAddress("0x12345"))
|
||||
waitForMiningState(t, miner, true)
|
||||
// Start the downloader
|
||||
mux.Post(downloader.StartEvent{}) //nolint:errcheck
|
||||
waitForMiningState(t, miner, false)
|
||||
|
||||
// Downloader finally succeeds.
|
||||
mux.Post(downloader.DoneEvent{}) //nolint:errcheck
|
||||
waitForMiningState(t, miner, true)
|
||||
|
||||
miner.Stop()
|
||||
waitForMiningState(t, miner, false)
|
||||
|
||||
miner.Start(common.HexToAddress("0x678910"))
|
||||
waitForMiningState(t, miner, true)
|
||||
|
||||
miner.Stop()
|
||||
waitForMiningState(t, miner, false)
|
||||
}
|
||||
|
||||
// TestMinerSetEtherbase checks that etherbase becomes set even if mining isn't
|
||||
// possible at the moment
|
||||
func TestMinerSetEtherbase(t *testing.T) {
|
||||
miner, mux := createMiner(t)
|
||||
// Start with a 'bad' mining address
|
||||
miner.Start(common.HexToAddress("0xdead"))
|
||||
waitForMiningState(t, miner, true)
|
||||
// Start the downloader
|
||||
mux.Post(downloader.StartEvent{}) //nolint:errcheck
|
||||
waitForMiningState(t, miner, false)
|
||||
// Now user tries to configure proper mining address
|
||||
miner.Start(common.HexToAddress("0x1337"))
|
||||
// Stop the downloader and wait for the update loop to run
|
||||
mux.Post(downloader.DoneEvent{}) //nolint:errcheck
|
||||
|
||||
waitForMiningState(t, miner, true)
|
||||
// The miner should now be using the good address
|
||||
if got, exp := miner.coinbase, common.HexToAddress("0x1337"); got != exp {
|
||||
t.Fatalf("Wrong coinbase, got %x expected %x", got, exp)
|
||||
}
|
||||
}
|
||||
|
||||
// waitForMiningState waits until either
|
||||
// * the desired mining state was reached
|
||||
// * a timeout was reached which fails the test
|
||||
func waitForMiningState(t *testing.T, m *Miner, mining bool) {
|
||||
t.Helper()
|
||||
|
||||
var state bool
|
||||
for i := 0; i < 100; i++ {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
if state = m.Mining(); state == mining {
|
||||
return
|
||||
}
|
||||
}
|
||||
t.Fatalf("Mining() == %t, want %t", state, mining)
|
||||
}
|
||||
|
||||
func createMiner(t *testing.T) (*Miner, *event.TypeMux) {
|
||||
// Create Ethash config
|
||||
config := params.MiningConfig{
|
||||
Etherbase: common.HexToAddress("123456789"),
|
||||
}
|
||||
// Create chainConfig
|
||||
chainDB := ethdb.NewMemDatabase()
|
||||
genesis := core.DeveloperGenesisBlock(15, common.HexToAddress("12345"))
|
||||
chainConfig, _, err := core.SetupGenesisBlock(chainDB, genesis, false, false)
|
||||
if err != nil {
|
||||
t.Fatalf("can't create new chain config: %v", err)
|
||||
}
|
||||
// Create consensus engine
|
||||
engine := clique.New(chainConfig.Clique, chainDB)
|
||||
// Create Ethereum backend
|
||||
bc, err := core.NewBlockChain(chainDB, new(core.CacheConfig), chainConfig, engine, vm.Config{}, nil /*isLocalBlock*/, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("can't create new chain %v", err)
|
||||
}
|
||||
cfg := core.DefaultTxPoolConfig
|
||||
cfg.Journal = ""
|
||||
pool := core.NewTxPool(cfg, params.TestChainConfig, ethdb.NewMemDatabase(), nil)
|
||||
backend := NewMockBackend(bc, pool)
|
||||
// Create event Mux
|
||||
mux := new(event.TypeMux)
|
||||
// Create Miner
|
||||
return New(backend, &config, chainConfig, mux, engine, nil), mux
|
||||
}
|
@ -1,196 +0,0 @@
|
||||
// Copyright 2018 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
// +build none
|
||||
|
||||
// This file contains a miner stress test based on the Clique consensus engine.
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/ecdsa"
|
||||
"io/ioutil"
|
||||
"math/big"
|
||||
"math/rand"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/ledgerwatch/turbo-geth/common"
|
||||
"github.com/ledgerwatch/turbo-geth/common/fdlimit"
|
||||
"github.com/ledgerwatch/turbo-geth/core"
|
||||
"github.com/ledgerwatch/turbo-geth/core/types"
|
||||
"github.com/ledgerwatch/turbo-geth/crypto"
|
||||
"github.com/ledgerwatch/turbo-geth/eth"
|
||||
"github.com/ledgerwatch/turbo-geth/eth/ethconfig"
|
||||
"github.com/ledgerwatch/turbo-geth/log"
|
||||
"github.com/ledgerwatch/turbo-geth/node"
|
||||
"github.com/ledgerwatch/turbo-geth/p2p"
|
||||
"github.com/ledgerwatch/turbo-geth/p2p/enode"
|
||||
"github.com/ledgerwatch/turbo-geth/params"
|
||||
)
|
||||
|
||||
func main() {
|
||||
log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
|
||||
fdlimit.Raise(2048)
|
||||
|
||||
// Generate a batch of accounts to seal and fund with
|
||||
faucets := make([]*ecdsa.PrivateKey, 128)
|
||||
for i := 0; i < len(faucets); i++ {
|
||||
faucets[i], _ = crypto.GenerateKey()
|
||||
}
|
||||
sealers := make([]*ecdsa.PrivateKey, 4)
|
||||
for i := 0; i < len(sealers); i++ {
|
||||
sealers[i], _ = crypto.GenerateKey()
|
||||
}
|
||||
// Create a Clique network based off of the Rinkeby config
|
||||
genesis := makeGenesis(faucets, sealers)
|
||||
|
||||
var (
|
||||
nodes []*eth.Ethereum
|
||||
enodes []*enode.Node
|
||||
)
|
||||
|
||||
for _, sealer := range sealers {
|
||||
// Start the node and wait until it's up
|
||||
stack, ethBackend, err := makeSealer(genesis)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer stack.Close()
|
||||
|
||||
for stack.Server().NodeInfo().Ports.Listener == 0 {
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
}
|
||||
// Connect the node to all the previous ones
|
||||
for _, n := range enodes {
|
||||
stack.Server().AddPeer(n)
|
||||
}
|
||||
// Start tracking the node and its enode
|
||||
nodes = append(nodes, ethBackend)
|
||||
enodes = append(enodes, stack.Server().Self())
|
||||
}
|
||||
|
||||
// Iterate over all the nodes and start signing on them
|
||||
time.Sleep(3 * time.Second)
|
||||
for _, node := range nodes {
|
||||
if err := node.StartMining(1); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
// Start injecting transactions from the faucet like crazy
|
||||
nonces := make([]uint64, len(faucets))
|
||||
for {
|
||||
// Pick a random signer node
|
||||
index := rand.Intn(len(faucets))
|
||||
backend := nodes[index%len(nodes)]
|
||||
|
||||
// Create a self transaction and inject into the pool
|
||||
tx, err := types.SignTx(types.NewTransaction(nonces[index], crypto.PubkeyToAddress(faucets[index].PublicKey), new(big.Int), 21000, big.NewInt(100000000000), nil), types.HomesteadSigner{}, faucets[index])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := backend.TxPool().AddLocal(tx); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
nonces[index]++
|
||||
|
||||
// Wait if we're too saturated
|
||||
if pend, _ := backend.TxPool().Stats(); pend > 2048 {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// makeGenesis creates a custom Clique genesis block based on some pre-defined
|
||||
// signer and faucet accounts.
|
||||
func makeGenesis(faucets []*ecdsa.PrivateKey, sealers []*ecdsa.PrivateKey) *core.Genesis {
|
||||
// Create a Clique network based off of the Rinkeby config
|
||||
genesis := core.DefaultRinkebyGenesisBlock()
|
||||
genesis.GasLimit = 25000000
|
||||
|
||||
genesis.Config.ChainID = big.NewInt(18)
|
||||
genesis.Config.Clique.Period = 1
|
||||
genesis.Config.EIP150Hash = common.Hash{}
|
||||
|
||||
genesis.Alloc = core.GenesisAlloc{}
|
||||
for _, faucet := range faucets {
|
||||
genesis.Alloc[crypto.PubkeyToAddress(faucet.PublicKey)] = core.GenesisAccount{
|
||||
Balance: new(big.Int).Exp(big.NewInt(2), big.NewInt(128), nil),
|
||||
}
|
||||
}
|
||||
// Sort the signers and embed into the extra-data section
|
||||
signers := make([]common.Address, len(sealers))
|
||||
for i, sealer := range sealers {
|
||||
signers[i] = crypto.PubkeyToAddress(sealer.PublicKey)
|
||||
}
|
||||
for i := 0; i < len(signers); i++ {
|
||||
for j := i + 1; j < len(signers); j++ {
|
||||
if bytes.Compare(signers[i][:], signers[j][:]) > 0 {
|
||||
signers[i], signers[j] = signers[j], signers[i]
|
||||
}
|
||||
}
|
||||
}
|
||||
genesis.ExtraData = make([]byte, 32+len(signers)*common.AddressLength+65)
|
||||
for i, signer := range signers {
|
||||
copy(genesis.ExtraData[32+i*common.AddressLength:], signer[:])
|
||||
}
|
||||
// Return the genesis block for initialization
|
||||
return genesis
|
||||
}
|
||||
|
||||
func makeSealer(genesis *core.Genesis) (*node.Node, *eth.Ethereum, error) {
|
||||
// Define the basic configurations for the Ethereum node
|
||||
datadir, _ := ioutil.TempDir("", "")
|
||||
|
||||
config := &node.Config{
|
||||
Name: "turbo-geth",
|
||||
Version: params.Version,
|
||||
DataDir: datadir,
|
||||
P2P: p2p.Config{
|
||||
ListenAddr: "0.0.0.0:0",
|
||||
NoDiscovery: true,
|
||||
MaxPeers: 25,
|
||||
},
|
||||
}
|
||||
// Start the node and configure a full Ethereum node on it
|
||||
stack, err := node.New(config)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
// Create and register the backend
|
||||
ethBackend, err := eth.New(stack, ðconfig.Config{
|
||||
Genesis: genesis,
|
||||
NetworkId: genesis.Config.ChainID.Uint64(),
|
||||
DatabaseCache: 256,
|
||||
DatabaseHandles: 256,
|
||||
TxPool: core.DefaultTxPoolConfig,
|
||||
GPO: eth.DefaultConfig.GPO,
|
||||
Miner: params.MiningConfig{
|
||||
GasFloor: genesis.GasLimit * 9 / 10,
|
||||
GasCeil: genesis.GasLimit * 11 / 10,
|
||||
GasPrice: big.NewInt(1),
|
||||
Recommit: time.Second,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
err = stack.Start()
|
||||
return stack, ethBackend, err
|
||||
}
|
@ -1,193 +0,0 @@
|
||||
// Copyright 2018 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
// +build none
|
||||
|
||||
// This file contains a miner stress test based on the Ethash consensus engine.
|
||||
package main
|
||||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"io/ioutil"
|
||||
"math/big"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/ledgerwatch/turbo-geth/common"
|
||||
"github.com/ledgerwatch/turbo-geth/common/fdlimit"
|
||||
"github.com/ledgerwatch/turbo-geth/consensus/ethash"
|
||||
"github.com/ledgerwatch/turbo-geth/core"
|
||||
"github.com/ledgerwatch/turbo-geth/core/types"
|
||||
"github.com/ledgerwatch/turbo-geth/crypto"
|
||||
"github.com/ledgerwatch/turbo-geth/eth"
|
||||
"github.com/ledgerwatch/turbo-geth/eth/downloader"
|
||||
"github.com/ledgerwatch/turbo-geth/log"
|
||||
"github.com/ledgerwatch/turbo-geth/node"
|
||||
"github.com/ledgerwatch/turbo-geth/p2p"
|
||||
"github.com/ledgerwatch/turbo-geth/p2p/enode"
|
||||
"github.com/ledgerwatch/turbo-geth/params"
|
||||
)
|
||||
|
||||
func main() {
|
||||
const n = 5
|
||||
log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
|
||||
fdlimit.Raise(512 * n)
|
||||
|
||||
// Generate a batch of accounts to seal and fund with
|
||||
faucets := make([]*ecdsa.PrivateKey, 128)
|
||||
for i := 0; i < len(faucets); i++ {
|
||||
faucets[i], _ = crypto.GenerateKey()
|
||||
}
|
||||
// Pre-generate the ethash mining DAG so we don't race
|
||||
ethash.MakeDataset(1, filepath.Join(os.Getenv("HOME"), ".ethash"))
|
||||
|
||||
// Create an Ethash network based off of the Ropsten config
|
||||
genesis := makeGenesis(faucets)
|
||||
|
||||
var (
|
||||
nodes []*eth.Ethereum
|
||||
enodes []*enode.Node
|
||||
)
|
||||
for i := 0; i < n; i++ {
|
||||
// Start the node and wait until it's up
|
||||
stack, ethBackend, err := makeMiner(genesis)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer stack.Close()
|
||||
|
||||
for stack.Server().NodeInfo().Ports.Listener == 0 {
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
}
|
||||
// Connect the node to all the previous ones
|
||||
for _, n := range enodes {
|
||||
stack.Server().AddPeer(n)
|
||||
}
|
||||
// Start tracking the node and its enode
|
||||
nodes = append(nodes, ethBackend)
|
||||
enodes = append(enodes, stack.Server().Self())
|
||||
|
||||
// Inject the signer key and start sealing with it
|
||||
key, keyErr := crypto.GenerateKey()
|
||||
if keyErr != nil {
|
||||
panic(keyErr)
|
||||
}
|
||||
ethBackend.SetSigner(key)
|
||||
}
|
||||
|
||||
// Iterate over all the nodes and start mining
|
||||
time.Sleep(3 * time.Second)
|
||||
for _, node := range nodes {
|
||||
if err := node.StartMining(1); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
// Start injecting transactions from the faucets like crazy
|
||||
nonces := make([]uint64, len(faucets))
|
||||
for {
|
||||
// Pick a random mining node
|
||||
index := rand.Intn(len(faucets))
|
||||
backend := nodes[index%len(nodes)]
|
||||
|
||||
// Create a self transaction and inject into the pool
|
||||
tx, err := types.SignTx(types.NewTransaction(nonces[index], crypto.PubkeyToAddress(faucets[index].PublicKey), new(big.Int), 21000, big.NewInt(100000000000+rand.Int63n(65536)), nil), types.HomesteadSigner{}, faucets[index])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := backend.TxPool().AddLocal(tx); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
nonces[index]++
|
||||
|
||||
// Wait if we're too saturated
|
||||
if pend, _ := ethereum.TxPool().Stats(); pend > n*512 {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// makeGenesis creates a custom Ethash genesis block based on some pre-defined
|
||||
// faucet accounts.
|
||||
func makeGenesis(faucets []*ecdsa.PrivateKey) *core.Genesis {
|
||||
genesis := core.DefaultRopstenGenesisBlock()
|
||||
genesis.Difficulty = params.MinimumDifficulty
|
||||
genesis.GasLimit = 25000000
|
||||
|
||||
genesis.Config.ChainID = big.NewInt(18)
|
||||
genesis.Config.EIP150Hash = common.Hash{}
|
||||
|
||||
genesis.Alloc = core.GenesisAlloc{}
|
||||
for _, faucet := range faucets {
|
||||
genesis.Alloc[crypto.PubkeyToAddress(faucet.PublicKey)] = core.GenesisAccount{
|
||||
Balance: new(big.Int).Exp(big.NewInt(2), big.NewInt(128), nil),
|
||||
}
|
||||
}
|
||||
return genesis
|
||||
}
|
||||
|
||||
func makeMiner(genesis *core.Genesis) (*node.Node, *eth.Ethereum, error) {
|
||||
// Define the basic configurations for the Ethereum node
|
||||
datadir, _ := ioutil.TempDir("", "")
|
||||
|
||||
config := &node.Config{
|
||||
Name: "turbo-geth",
|
||||
Version: params.Version,
|
||||
DataDir: datadir,
|
||||
P2P: p2p.Config{
|
||||
ListenAddr: "0.0.0.0:0",
|
||||
NoDiscovery: true,
|
||||
MaxPeers: 25,
|
||||
},
|
||||
}
|
||||
// Create the node and configure a full Ethereum node on it
|
||||
stack, err := node.New(config)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
ethBackend, err := eth.New(stack, ðconfig.Config{
|
||||
Genesis: genesis,
|
||||
NetworkID: genesis.Config.ChainID.Uint64(),
|
||||
DatabaseCache: 256,
|
||||
DatabaseHandles: 256,
|
||||
TxPool: core.DefaultTxPoolConfig,
|
||||
GPO: eth.DefaultConfig.GPO,
|
||||
Ethash: eth.DefaultConfig.Ethash,
|
||||
Miner: params.MiningConfig{
|
||||
GasFloor: genesis.GasLimit * 9 / 10,
|
||||
GasCeil: genesis.GasLimit * 11 / 10,
|
||||
GasPrice: big.NewInt(1),
|
||||
Recommit: time.Second,
|
||||
},
|
||||
BlocksBeforePruning: 100,
|
||||
BlocksToPrune: 10,
|
||||
PruningTimeout: time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
err = stack.Start()
|
||||
return stack, ethBackend, err
|
||||
}); err != nil {
|
||||
return nil, fmt.Errorf("cannot register stress test miner. config %v", ethConfig)
|
||||
}
|
||||
// Start the node and return if successful
|
||||
return stack, stack.Start()
|
||||
}
|
@ -1,77 +0,0 @@
|
||||
package miner
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/ledgerwatch/turbo-geth/common"
|
||||
"github.com/ledgerwatch/turbo-geth/core/types"
|
||||
)
|
||||
|
||||
type miningUncles struct {
|
||||
localUncles map[common.Hash]*types.Block // A set of side blocks generated locally as the possible uncle blocks.
|
||||
remoteUncles map[common.Hash]*types.Block // A set of side blocks as the possible uncle blocks.
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
func newUncles() *miningUncles {
|
||||
return &miningUncles{
|
||||
localUncles: make(map[common.Hash]*types.Block),
|
||||
remoteUncles: make(map[common.Hash]*types.Block),
|
||||
}
|
||||
}
|
||||
|
||||
func (u *miningUncles) getLocal(hash common.Hash) (*types.Block, bool) {
|
||||
u.RLock()
|
||||
defer u.RUnlock()
|
||||
b, ok := u.localUncles[hash]
|
||||
return b, ok
|
||||
}
|
||||
|
||||
func (u *miningUncles) setLocal(b *types.Block) {
|
||||
u.Lock()
|
||||
defer u.Unlock()
|
||||
u.localUncles[b.Hash()] = b
|
||||
}
|
||||
|
||||
func (u *miningUncles) getRemote(hash common.Hash) (*types.Block, bool) {
|
||||
u.RLock()
|
||||
defer u.RUnlock()
|
||||
b, ok := u.remoteUncles[hash]
|
||||
return b, ok
|
||||
}
|
||||
|
||||
func (u *miningUncles) setRemote(b *types.Block) {
|
||||
u.Lock()
|
||||
defer u.Unlock()
|
||||
u.remoteUncles[b.Hash()] = b
|
||||
}
|
||||
|
||||
func (u *miningUncles) get(hash common.Hash) (*types.Block, bool) {
|
||||
u.RLock()
|
||||
defer u.RUnlock()
|
||||
|
||||
uncle, exist := u.getLocal(hash)
|
||||
if !exist {
|
||||
uncle, exist = u.getRemote(hash)
|
||||
}
|
||||
if !exist {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
return uncle, true
|
||||
}
|
||||
|
||||
func (u *miningUncles) has(hash common.Hash) bool {
|
||||
u.RLock()
|
||||
defer u.RUnlock()
|
||||
_, ok := u.localUncles[hash]
|
||||
if ok {
|
||||
return true
|
||||
}
|
||||
|
||||
_, ok = u.remoteUncles[hash]
|
||||
if ok {
|
||||
return ok
|
||||
}
|
||||
return ok
|
||||
}
|
1126
miner/worker.go
1126
miner/worker.go
File diff suppressed because it is too large
Load Diff
@ -28,7 +28,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/ledgerwatch/turbo-geth/ethdb"
|
||||
"github.com/ledgerwatch/turbo-geth/event"
|
||||
"github.com/ledgerwatch/turbo-geth/log"
|
||||
"github.com/ledgerwatch/turbo-geth/migrations"
|
||||
"github.com/ledgerwatch/turbo-geth/p2p"
|
||||
@ -38,7 +37,6 @@ import (
|
||||
|
||||
// Node is a container on which services can be registered.
|
||||
type Node struct {
|
||||
eventmux *event.TypeMux
|
||||
config *Config
|
||||
log log.Logger
|
||||
dirLock fileutil.Releaser // prevents concurrent use of instance directory
|
||||
@ -95,7 +93,6 @@ func New(conf *Config) (*Node, error) {
|
||||
node := &Node{
|
||||
config: conf,
|
||||
inprocHandler: rpc.NewServer(),
|
||||
eventmux: new(event.TypeMux),
|
||||
log: conf.Logger,
|
||||
stop: make(chan struct{}),
|
||||
server: &p2p.Server{Config: conf.P2P},
|
||||
@ -538,12 +535,6 @@ func (n *Node) WSEndpoint() string {
|
||||
return "ws://" + n.ws.listenAddr() + n.ws.wsConfig.prefix
|
||||
}
|
||||
|
||||
// EventMux retrieves the event multiplexer used by all the network services in
|
||||
// the current protocol stack.
|
||||
func (n *Node) EventMux() *event.TypeMux {
|
||||
return n.eventmux
|
||||
}
|
||||
|
||||
// OpenDatabase opens an existing database with the given name (or creates one if no
|
||||
// previous can be found) from within the node's instance directory. If the node is
|
||||
// ephemeral, a memory database is returned.
|
||||
|
Loading…
Reference in New Issue
Block a user