diff --git a/go.mod b/go.mod index 56c0f3db2..b5e1b9a16 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon-lib go 1.16 require ( - github.com/VictoriaMetrics/metrics v1.17.3 + github.com/VictoriaMetrics/metrics v1.18.0 github.com/c2h5oh/datasize v0.0.0-20200825124411-48ed595a09d2 github.com/go-stack/stack v1.8.1 github.com/golang/protobuf v1.5.2 @@ -11,7 +11,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d github.com/holiman/uint256 v1.2.0 - github.com/ledgerwatch/log/v3 v3.3.0 + github.com/ledgerwatch/log/v3 v3.3.1 github.com/ledgerwatch/secp256k1 v0.0.0-20210626115225-cd5cd00ed72d github.com/matryer/moq v0.2.3 github.com/spaolacci/murmur3 v1.1.0 @@ -19,7 +19,6 @@ require ( github.com/torquem-ch/mdbx-go v0.18.1 github.com/ugorji/go/codec v1.1.13 go.uber.org/atomic v1.9.0 - golang.org/dl v0.0.0-20210909185531-e2a88a019121 // indirect golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e google.golang.org/grpc v1.39.1 google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0 diff --git a/go.sum b/go.sum index 71d894703..fdfc971e5 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/VictoriaMetrics/metrics v1.17.3 h1:QPUakR6JRy8BhL2C2kOgYKLuoPDwtJQ+7iKIZSjt1A4= -github.com/VictoriaMetrics/metrics v1.17.3/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE= +github.com/VictoriaMetrics/metrics v1.18.0 h1:vov5NxDHRSXFbdiH4dYLYEjKLoAXXSQ7hcnG8TSD9JQ= +github.com/VictoriaMetrics/metrics v1.18.0/go.mod h1:ArjwVz7WpgpegX/JpB0zpNF2h2232kErkEnzH1sxMmA= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/c2h5oh/datasize v0.0.0-20200825124411-48ed595a09d2 h1:t8KYCwSKsOEZBFELI4Pn/phbp38iJ1RRAkDFNin1aak= github.com/c2h5oh/datasize v0.0.0-20200825124411-48ed595a09d2/go.mod h1:S/7n9copUssQ56c7aAgHqftWO4LTf4xY6CGWt8Bc+3M= @@ -64,17 +64,17 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= -github.com/ledgerwatch/log/v3 v3.3.0 h1:k8N/3NQLILr8CKCMyza261vLFKU7VA+nMNNb0wVyQSc= -github.com/ledgerwatch/log/v3 v3.3.0/go.mod h1:J58eOHHrIYHxl7LKkRsb/0YibKwtLfauUryl5SLRGm0= +github.com/ledgerwatch/log/v3 v3.3.1 h1:HmvLeTEvtCtqSvtu4t/a5MAdcLfeBcbIeowXbLYuzLc= +github.com/ledgerwatch/log/v3 v3.3.1/go.mod h1:S3VJqhhVX32rbp1JyyvhJou12twtFwNEPESBgpbNkRk= github.com/ledgerwatch/secp256k1 v0.0.0-20210626115225-cd5cd00ed72d h1:/IKMrJdfRsoYNc36PXqP4xMH3vhW/8IQyBKGQbKZUno= github.com/ledgerwatch/secp256k1 v0.0.0-20210626115225-cd5cd00ed72d/go.mod h1:SPmqJFciiF/Q0mPt2jVs2dTr/1TZBTIA+kPMmKgBAak= github.com/matryer/moq v0.2.3 h1:Q06vEqnBYjjfx5KKgHfYRKE/lvlRu+Nj+xodG4YdHnU= github.com/matryer/moq v0.2.3/go.mod h1:9RtPYjTnH1bSBIkpvtHkFN7nbWAnO7oRpdJkEIn6UtE= -github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8= -github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-colorable v0.1.9 h1:sqDoxXbdeALODt0DAeJCVp38ps9ZogZEAXjus69YV3U= +github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= -github.com/mattn/go-isatty v0.0.13 h1:qdl+GuBjcsKKDco5BsxPJlId98mSWNKqYA+Co0SC1yA= -github.com/mattn/go-isatty v0.0.13/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= +github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -98,10 +98,10 @@ github.com/ugorji/go v1.1.13 h1:nB3O5kBSQGjEQAcfe1aLUYuxmXdFKmYgBZhY32rQb6Q= github.com/ugorji/go v1.1.13/go.mod h1:jxau1n+/wyTGLQoCkjok9r5zFa/FxT6eI5HiHKQszjc= github.com/ugorji/go/codec v1.1.13 h1:013LbFhocBoIqgHeIHKlV4JWYhqogATYWZhIcH0WHn4= github.com/ugorji/go/codec v1.1.13/go.mod h1:oNVt3Dq+FO91WNQ/9JnHKQP2QJxTzoN7wCBFCq1OeuU= -github.com/valyala/fastrand v1.0.0 h1:LUKT9aKer2dVQNUi3waewTbKV+7H17kvWFNKs2ObdkI= -github.com/valyala/fastrand v1.0.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= -github.com/valyala/histogram v1.1.2 h1:vOk5VrGjMBIoPR5k6wA8vBaC8toeJ8XO0yfRjFEc1h8= -github.com/valyala/histogram v1.1.2/go.mod h1:CZAr6gK9dbD7hYx2s8WSPh0p5x5wETjC+2b3PJVtEdg= +github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8= +github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= +github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ= +github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -111,8 +111,6 @@ go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= -golang.org/dl v0.0.0-20210909185531-e2a88a019121 h1:fZ6ug6KBaLGJtlXN5TVaEEZ545CrhlY0LiG2l+GMs0Q= -golang.org/dl v0.0.0-20210909185531-e2a88a019121/go.mod h1:IUMfjQLJQd4UTqG1Z90tenwKoCX93Gn3MAQJMOSBsDQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -157,8 +155,9 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210917161153-d61c044b1678 h1:J27LZFQBFoihqXoegpscI10HpjZ7B5WQLLKL2FZXQKw= +golang.org/x/sys v0.0.0-20210917161153-d61c044b1678/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= diff --git a/gointerfaces/grpcutil/utils.go b/gointerfaces/grpcutil/utils.go index 2928aef68..7445a4af4 100644 --- a/gointerfaces/grpcutil/utils.go +++ b/gointerfaces/grpcutil/utils.go @@ -60,8 +60,6 @@ func NewServer(rateLimit uint32, creds credentials.TransportCredentials) *grpc.S // unaryInterceptors = append(unaryInterceptors, grpc_prometheus.UnaryServerInterceptor) //} - var grpcServer *grpc.Server - reflection.Register(grpcServer) //cpus := uint32(runtime.GOMAXPROCS(-1)) opts := []grpc.ServerOption{ //grpc.NumStreamWorkers(cpus), // reduce amount of goroutines @@ -78,7 +76,8 @@ func NewServer(rateLimit uint32, creds credentials.TransportCredentials) *grpc.S grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)), grpc.Creds(creds), } - grpcServer = grpc.NewServer(opts...) + grpcServer := grpc.NewServer(opts...) + reflection.Register(grpcServer) //if metrics.Enabled { // grpc_prometheus.Register(grpcServer) diff --git a/txpool/grpc_server.go b/txpool/grpc_server.go index 20e1ede6a..4686c619c 100644 --- a/txpool/grpc_server.go +++ b/txpool/grpc_server.go @@ -290,8 +290,6 @@ func StartGrpc(txPoolServer txpool_proto.TxpoolServer, miningServer txpool_proto // unaryInterceptors = append(unaryInterceptors, grpc_prometheus.UnaryServerInterceptor) //} - var grpcServer *grpc.Server - reflection.Register(grpcServer) // Register reflection service on gRPC server. //cpus := uint32(runtime.GOMAXPROCS(-1)) opts := []grpc.ServerOption{ //grpc.NumStreamWorkers(cpus), // reduce amount of goroutines @@ -310,7 +308,8 @@ func StartGrpc(txPoolServer txpool_proto.TxpoolServer, miningServer txpool_proto } else { opts = append(opts, grpc.Creds(*creds)) } - grpcServer = grpc.NewServer(opts...) + grpcServer := grpc.NewServer(opts...) + reflection.Register(grpcServer) // Register reflection service on gRPC server. if txPoolServer != nil { txpool_proto.RegisterTxpoolServer(grpcServer, txPoolServer) } diff --git a/txpool/pool.go b/txpool/pool.go index d150d39c9..8ade1072f 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -222,6 +222,7 @@ type TxPool struct { senders *sendersBatch _cache kvcache.Cache byNonce *ByNonce // senderID => (sorted map of tx nonce => *metaTx) + promoted Hashes // pre-allocated buffer to write promoted to pending pool txn hashes // batch processing of remote transactions // handling works fast without batching, but batching allow: @@ -252,9 +253,9 @@ func New(newTxs chan Hashes, coreDB kv.RoDB, cfg Config, cache kvcache.Cache, ru discardReasonsLRU: discardHistory, byNonce: &ByNonce{btree.New(32)}, recentlyConnectedPeers: &recentlyConnectedPeers{}, - pending: NewPendingSubPool(PendingSubPool), - baseFee: NewSubPool(BaseFeeSubPool), - queued: NewSubPool(QueuedSubPool), + pending: NewPendingSubPool(PendingSubPool, cfg.PendingSubPoolLimit), + baseFee: NewSubPool(BaseFeeSubPool, cfg.BaseFeeSubPoolLimit), + queued: NewSubPool(QueuedSubPool, cfg.QueuedSubPoolLimit), newTxs: newTxs, _cache: cache, senders: newSendersCache(), @@ -265,6 +266,7 @@ func New(newTxs chan Hashes, coreDB kv.RoDB, cfg Config, cache kvcache.Cache, ru senderID: 1, unprocessedRemoteTxs: &TxSlots{}, unprocessedRemoteByHash: map[string]int{}, + promoted: make(Hashes, 0, 32*1024), }, nil } @@ -331,46 +333,21 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang return err } - for _, changesList := range stateChanges.ChangeBatch { - for _, change := range changesList.Changes { - switch change.Action { - case remote.Action_UPSERT, remote.Action_UPSERT_CODE: - if change.Incarnation > 0 { - continue - } - addr := gointerfaces.ConvertH160toAddress(change.Address) - id, ok := p.senders.id(string(addr[:])) - if !ok { - continue - } - nonce, balance, err := p.senders.info(cache, viewID, coreTx, id) - if err != nil { - return err - } - onSenderChange(id, nonce, balance, p.byNonce, protocolBaseFee, baseFee, p.pending, p.baseFee, p.queued) - } - } - } //log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "baseFee", baseFee, "blockHeight", blockHeight) - if err := addTxs(p.lastSeenBlock.Load(), cache, viewID, coreTx, p.cfg, p.senders, unwindTxs, protocolBaseFee, baseFee, p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.addLocked, p.discardLocked); err != nil { + + p.pending.captureAddedHashes(&p.promoted) + if err := addTxs(p.lastSeenBlock.Load(), stateChanges, cache, viewID, coreTx, p.senders, unwindTxs, protocolBaseFee, baseFee, p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.addLocked, p.discardLocked); err != nil { return err } + p.pending.added = nil if p.started.CAS(false, true) { log.Info("[txpool] Started") } - notifyNewTxs := make(Hashes, 0, 32*len(unwindTxs.txs)) - for i := range unwindTxs.txs { - _, ok := p.byHash[string(unwindTxs.txs[i].idHash[:])] - if !ok { - continue - } - notifyNewTxs = append(notifyNewTxs, unwindTxs.txs[i].idHash[:]...) - } - if len(notifyNewTxs) > 0 { + if p.promoted.Len() > 0 { select { - case p.newTxs <- notifyNewTxs: + case p.newTxs <- common.Copy(p.promoted): default: } } @@ -414,24 +391,17 @@ func (p *TxPool) processRemoteTxs(ctx context.Context) error { return err } - if err := addTxs(p.lastSeenBlock.Load(), cache, viewID, coreTx, p.cfg, p.senders, newTxs, p.protocolBaseFee.Load(), p.currentBaseFee.Load(), p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.addLocked, p.discardLocked); err != nil { + p.pending.captureAddedHashes(&p.promoted) + if err := addTxs(p.lastSeenBlock.Load(), nil, cache, viewID, coreTx, p.senders, newTxs, p.protocolBaseFee.Load(), p.currentBaseFee.Load(), p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.addLocked, p.discardLocked); err != nil { return err } + p.pending.added = nil - // notify about all non-dropped txs - notifyNewTxs := make(Hashes, 0, 32*len(newTxs.txs)) - for i := range newTxs.txs { - _, ok := p.byHash[string(newTxs.txs[i].idHash[:])] - if !ok { - continue - } - notifyNewTxs = append(notifyNewTxs, newTxs.txs[i].idHash[:]...) - } - if len(notifyNewTxs) > 0 { + if p.promoted.Len() > 0 { select { case <-ctx.Done(): return nil - case p.newTxs <- notifyNewTxs: + case p.newTxs <- common.Copy(p.promoted): default: } } @@ -588,22 +558,16 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTxs TxSlots) ([]DiscardReas if err = p.senders.onNewTxs(newTxs); err != nil { return nil, err } - if err := addTxs(p.lastSeenBlock.Load(), p._cache, viewID, coreTx, p.cfg, p.senders, newTxs, p.protocolBaseFee.Load(), p.currentBaseFee.Load(), p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.addLocked, p.discardLocked); err != nil { + + p.pending.captureAddedHashes(&p.promoted) + if err := addTxs(p.lastSeenBlock.Load(), nil, p._cache, viewID, coreTx, p.senders, newTxs, p.protocolBaseFee.Load(), p.currentBaseFee.Load(), p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.addLocked, p.discardLocked); err != nil { return nil, err } + p.pending.added = nil - // notify about all non-dropped txs - notifyNewTxs := make(Hashes, 0, 32*len(newTxs.txs)) - for i := range newTxs.txs { - _, ok := p.byHash[string(newTxs.txs[i].idHash[:])] - if !ok { - continue - } - notifyNewTxs = append(notifyNewTxs, newTxs.txs[i].idHash[:]...) - } - if len(notifyNewTxs) > 0 { + if p.promoted.Len() > 0 { select { - case p.newTxs <- notifyNewTxs: + case p.newTxs <- common.Copy(p.promoted): default: } } @@ -628,7 +592,7 @@ func (p *TxPool) cache() kvcache.Cache { defer p.lock.RUnlock() return p._cache } -func addTxs(blockNum uint64, cache kvcache.Cache, viewID kvcache.ViewID, coreTx kv.Tx, cfg Config, senders *sendersBatch, newTxs TxSlots, protocolBaseFee, currentBaseFee uint64, pending *PendingPool, baseFee, queued *SubPool, byNonce *ByNonce, byHash map[string]*metaTx, add func(*metaTx) bool, discard func(*metaTx, DiscardReason)) error { +func addTxs(blockNum uint64, stateChanges *remote.StateChangeBatch, cache kvcache.Cache, viewID kvcache.ViewID, coreTx kv.Tx, senders *sendersBatch, newTxs TxSlots, protocolBaseFee, currentBaseFee uint64, pending *PendingPool, baseFee, queued *SubPool, byNonce *ByNonce, byHash map[string]*metaTx, add func(*metaTx) bool, discard func(*metaTx, DiscardReason)) error { if ASSERT { for i := range newTxs.txs { if newTxs.txs[i].senderID == 0 { @@ -636,7 +600,6 @@ func addTxs(blockNum uint64, cache kvcache.Cache, viewID kvcache.ViewID, coreTx } } } - //defer func(t time.Time) { fmt.Printf("pool.go:611: %s\n", time.Since(t)) }(time.Now()) // This can be thought of a reverse operation from the one described before. // When a block that was deemed "the best" of its height, is no longer deemed "the best", the // transactions contained in it, are now viable for inclusion in other blocks, and therefore should @@ -646,20 +609,50 @@ func addTxs(blockNum uint64, cache kvcache.Cache, viewID kvcache.ViewID, coreTx // they effective lose their priority over the "remote" transactions. In order to prevent that, // somehow the fact that certain transactions were local, needs to be remembered for some // time (up to some "immutability threshold"). - changedSenders := unsafeAddToPendingPool(blockNum, newTxs, byHash, add) - for id := range changedSenders { - nonce, balance, err := senders.info(cache, viewID, coreTx, id) + changedSenders := map[uint64]struct{}{} + for i, txn := range newTxs.txs { + if _, ok := byHash[string(txn.idHash[:])]; ok { + continue + } + mt := newMetaTx(txn, newTxs.isLocal[i], blockNum) + if !add(mt) { + continue + } + changedSenders[mt.Tx.senderID] = struct{}{} + } + + if stateChanges != nil { + // re-calc all transactions of changed senders + for _, changesList := range stateChanges.ChangeBatch { + for _, change := range changesList.Changes { + switch change.Action { + case remote.Action_UPSERT, remote.Action_UPSERT_CODE: + if change.Incarnation > 0 { + continue + } + addr := gointerfaces.ConvertH160toAddress(change.Address) + id, ok := senders.id(string(addr[:])) + if !ok { + continue + } + changedSenders[id] = struct{}{} + } + } + } + } + + for senderID := range changedSenders { + nonce, balance, err := senders.info(cache, viewID, coreTx, senderID) if err != nil { return err } - onSenderChange(id, nonce, balance, byNonce, protocolBaseFee, currentBaseFee, pending, baseFee, queued) + onSenderChange(senderID, nonce, balance, byNonce, protocolBaseFee, currentBaseFee, pending, baseFee, queued) } - //defer func(t time.Time) { fmt.Printf("pool.go:630: %s\n", time.Since(t)) }(time.Now()) //pending.EnforceWorstInvariants() //baseFee.EnforceInvariants() //queued.EnforceInvariants() - promote(pending, baseFee, queued, cfg, discard) + promote(pending, baseFee, queued, discard) //pending.EnforceWorstInvariants() pending.EnforceBestInvariants() @@ -707,7 +700,7 @@ func (p *TxPool) addLocked(mt *metaTx) bool { if mt.subPool&IsLocal != 0 { p.isLocalLRU.Add(string(mt.Tx.idHash[:]), struct{}{}) } - p.pending.Add(mt) + p.queued.Add(mt) return true } func (p *TxPool) discardLocked(mt *metaTx, reason DiscardReason) { @@ -767,22 +760,6 @@ func removeMined(byNonce *ByNonce, minedTxs []*TxSlot, pending *PendingPool, bas return nil } -// unwind -func unsafeAddToPendingPool(blockNum uint64, newTxs TxSlots, byHash map[string]*metaTx, add func(*metaTx) bool) (changedSenders map[uint64]struct{}) { - changedSenders = map[uint64]struct{}{} - for i, txn := range newTxs.txs { - if _, ok := byHash[string(txn.idHash[:])]; ok { - continue - } - mt := newMetaTx(txn, newTxs.isLocal[i], blockNum) - - if add(mt) { - changedSenders[txn.senderID] = struct{}{} - } - } - return changedSenders -} - func onSenderChange(senderID uint64, senderNonce uint64, senderBalance uint256.Int, byNonce *ByNonce, protocolBaseFee, currentBaseFee uint64, pending *PendingPool, baseFee, queued *SubPool) { noGapsNonce := senderNonce cumulativeRequiredBalance := uint256.NewInt(0) @@ -856,7 +833,7 @@ func onSenderChange(senderID uint64, senderNonce uint64, senderBalance uint256.I }) } -func promote(pending *PendingPool, baseFee, queued *SubPool, cfg Config, discard func(*metaTx, DiscardReason)) { +func promote(pending *PendingPool, baseFee, queued *SubPool, discard func(*metaTx, DiscardReason)) { //1. If top element in the worst green queue has subPool != 0b1111 (binary), it needs to be removed from the green pool. // If subPool < 0b1000 (not satisfying minimum fee), discard. // If subPool == 0b1110, demote to the yellow pool, otherwise demote to the red pool. @@ -876,7 +853,7 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, cfg Config, discard } //2. If top element in the worst green queue has subPool == 0b1111, but there is not enough room in the pool, discard. - for worst := pending.Worst(); pending.Len() > cfg.PendingSubPoolLimit; worst = pending.Worst() { + for worst := pending.Worst(); pending.Len() > pending.limit; worst = pending.Worst() { if worst.subPool >= 0b11111 { // TODO: here must 'subPool == 0b1111' or 'subPool <= 0b1111' ? break } @@ -905,7 +882,7 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, cfg Config, discard } //5. If the top element in the worst yellow queue has subPool == 0x1110, but there is not enough room in the pool, discard. - for worst := baseFee.Worst(); baseFee.Len() > cfg.BaseFeeSubPoolLimit; worst = baseFee.Worst() { + for worst := baseFee.Worst(); baseFee.Len() > baseFee.limit; worst = baseFee.Worst() { if worst.subPool >= 0b11110 { break } @@ -930,12 +907,11 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, cfg Config, discard if worst.subPool >= 0b10000 { break } - discard(queued.PopWorst(), FeeTooLow) } //8. If the top element in the worst red queue has subPool >= 0b100, but there is not enough room in the pool, discard. - for _ = queued.Worst(); queued.Len() > cfg.QueuedSubPoolLimit; _ = queued.Worst() { + for _ = queued.Worst(); queued.Len() > queued.limit; _ = queued.Worst() { discard(queued.PopWorst(), QueuedPoolOverflow) } } @@ -1065,6 +1041,7 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (err error) { delete(p.senders.senderIDs, addr) } } + //fmt.Printf("del:%d,%d,%d\n", p.deletedTxs[i].Tx.senderID, p.deletedTxs[i].Tx.nonce, p.deletedTxs[i].Tx.tip) if err := tx.Delete(kv.PoolTransaction, p.deletedTxs[i].Tx.idHash[:], nil); err != nil { return err } @@ -1202,7 +1179,7 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { if err != nil { return err } - if err := addTxs(0, p._cache, viewID, coreTx, p.cfg, p.senders, txs, protocolBaseFee, currentBaseFee, p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.addLocked, p.discardLocked); err != nil { + if err := addTxs(p.lastSeenBlock.Load(), nil, p._cache, viewID, coreTx, p.senders, txs, protocolBaseFee, currentBaseFee, p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.addLocked, p.discardLocked); err != nil { return err } p.currentBaseFee.Store(currentBaseFee) @@ -1261,11 +1238,14 @@ func (p *TxPool) printDebug(prefix string) { fmt.Printf("\tsenderID=%d, nonce=%d, tip=%d\n", j.Tx.senderID, j.Tx.nonce, j.Tx.tip) } fmt.Printf("%s.pool.queues.len: %d,%d,%d\n", prefix, p.pending.Len(), p.baseFee.Len(), p.queued.Len()) - for i := range p.pending.best { - p.pending.best[i].Tx.printDebug(fmt.Sprintf("%s.pending: %b", prefix, p.pending.best[i].subPool)) + for _, mt := range p.pending.best { + mt.Tx.printDebug(fmt.Sprintf("%s.pending: %b,%d,%d,%d", prefix, mt.subPool, mt.Tx.senderID, mt.Tx.nonce, mt.Tx.tip)) } - for i := range *p.queued.best { - (*p.queued.best)[i].Tx.printDebug(fmt.Sprintf("%s.queued : %b", prefix, (*p.queued.best)[i].subPool)) + for _, mt := range *p.baseFee.best { + mt.Tx.printDebug(fmt.Sprintf("%s.baseFee : %b,%d,%d,%d", prefix, mt.subPool, mt.Tx.senderID, mt.Tx.nonce, mt.Tx.tip)) + } + for _, mt := range *p.queued.best { + mt.Tx.printDebug(fmt.Sprintf("%s.queued : %b,%d,%d,%d", prefix, mt.subPool, mt.Tx.senderID, mt.Tx.nonce, mt.Tx.tip)) } } func (p *TxPool) logStats() { @@ -1523,13 +1503,20 @@ func (b *ByNonce) replaceOrInsert(mt *metaTx) *metaTx { } type PendingPool struct { + limit int t SubPoolType best bestSlice worst *WorstQueue + added *Hashes } -func NewPendingSubPool(t SubPoolType) *PendingPool { - return &PendingPool{t: t, best: []*metaTx{}, worst: &WorstQueue{}} +func NewPendingSubPool(t SubPoolType, limit int) *PendingPool { + return &PendingPool{limit: limit, t: t, best: []*metaTx{}, worst: &WorstQueue{}} +} + +func (p *PendingPool) captureAddedHashes(to *Hashes) { + p.added = to + *p.added = (*p.added)[:0] } // bestSlice - is similar to best queue, but with O(n log n) complexity and @@ -1602,11 +1589,17 @@ func (p *PendingPool) UnsafeRemove(i *metaTx) { p.best = p.best.UnsafeRemove(i) } func (p *PendingPool) UnsafeAdd(i *metaTx) { + if p.added != nil { + *p.added = append(*p.added, i.Tx.idHash[:]...) + } i.currentSubPool = p.t p.worst.Push(i) p.best = p.best.UnsafeAdd(i) } func (p *PendingPool) Add(i *metaTx) { + if p.added != nil { + *p.added = append(*p.added, i.Tx.idHash[:]...) + } i.currentSubPool = p.t heap.Push(p.worst, i) p.best = p.best.UnsafeAdd(i) @@ -1621,13 +1614,14 @@ func (p *PendingPool) DebugPrint(prefix string) { } type SubPool struct { + limit int t SubPoolType best *BestQueue worst *WorstQueue } -func NewSubPool(t SubPoolType) *SubPool { - return &SubPool{t: t, best: &BestQueue{}, worst: &WorstQueue{}} +func NewSubPool(t SubPoolType, limit int) *SubPool { + return &SubPool{limit: limit, t: t, best: &BestQueue{}, worst: &WorstQueue{}} } func (p *SubPool) EnforceInvariants() { diff --git a/txpool/pool_fuzz_test.go b/txpool/pool_fuzz_test.go index 1220cd2e6..1015b1267 100644 --- a/txpool/pool_fuzz_test.go +++ b/txpool/pool_fuzz_test.go @@ -34,7 +34,7 @@ import ( // gotip test -trimpath -v -fuzz=Fuzz -fuzztime=10s ./txpool func init() { - log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler)) + //log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler)) } func FuzzTwoQueue(f *testing.F) { @@ -44,7 +44,7 @@ func FuzzTwoQueue(f *testing.F) { t.Parallel() assert := assert.New(t) { - sub := NewPendingSubPool(PendingSubPool) + sub := NewPendingSubPool(PendingSubPool, 1024) for _, i := range in { sub.UnsafeAdd(&metaTx{subPool: SubPoolMarker(i & 0b11111), Tx: &TxSlot{nonce: 1, value: *uint256.NewInt(1)}}) } @@ -65,7 +65,7 @@ func FuzzTwoQueue(f *testing.F) { } } { - sub := NewSubPool(BaseFeeSubPool) + sub := NewSubPool(BaseFeeSubPool, 1024) for _, i := range in { sub.Add(&metaTx{subPool: SubPoolMarker(i & 0b11111), Tx: &TxSlot{nonce: 1, value: *uint256.NewInt(1)}}) } @@ -98,7 +98,7 @@ func FuzzTwoQueue(f *testing.F) { } { - sub := NewSubPool(QueuedSubPool) + sub := NewSubPool(QueuedSubPool, 1024) for _, i := range in { sub.Add(&metaTx{subPool: SubPoolMarker(i & 0b11111), Tx: &TxSlot{nonce: 1, value: *uint256.NewInt(1)}}) } @@ -304,9 +304,9 @@ func splitDataset(in TxSlots) (TxSlots, TxSlots, TxSlots, TxSlots) { func FuzzOnNewBlocks(f *testing.F) { var u64 = [1 * 4]byte{1} var senderAddr = [1 + 1 + 1]byte{1} - f.Add(u64[:], u64[:], u64[:], u64[:], senderAddr[:], 12) - f.Add(u64[:], u64[:], u64[:], u64[:], senderAddr[:], 14) - f.Add(u64[:], u64[:], u64[:], u64[:], senderAddr[:], 123) + f.Add(u64[:], u64[:], u64[:], u64[:], senderAddr[:], uint8(12)) + f.Add(u64[:], u64[:], u64[:], u64[:], senderAddr[:], uint8(14)) + f.Add(u64[:], u64[:], u64[:], u64[:], senderAddr[:], uint8(123)) f.Fuzz(func(t *testing.T, txNonce, values, tips, feeCap, senderAddr []byte, currentBaseFee1 uint8) { //t.Parallel() ctx := context.Background() @@ -328,7 +328,7 @@ func FuzzOnNewBlocks(f *testing.F) { err := txs.Valid() assert.NoError(err) - var prevTotal int + var prevHashes Hashes ch := make(chan Hashes, 100) @@ -348,10 +348,6 @@ func FuzzOnNewBlocks(f *testing.F) { pool.senders.senderID = uint64(len(senderIDs)) check := func(unwindTxs, minedTxs TxSlots, msg string) { pending, baseFee, queued := pool.pending, pool.baseFee, pool.queued - //if pending.Len() > 5 && baseFee.Len() > 5 && queued.Len() > 5 { - // fmt.Printf("len %s: %d,%d,%d\n", msg, pending.Len(), baseFee.Len(), queued.Len()) - //} - best, worst := pending.Best(), pending.Worst() assert.LessOrEqual(pending.Len(), cfg.PendingSubPoolLimit) assert.False(worst != nil && best == nil, msg) @@ -362,10 +358,6 @@ func FuzzOnNewBlocks(f *testing.F) { for _, tx := range pending.best { i := tx.Tx if tx.subPool&NoNonceGaps > 0 { - //for id := range senders { - // fmt.Printf("now: %d, %d\n", id, senders[id].nonce) - //} - //fmt.Printf("?? %d,%d\n", i.senderID, senders[i.senderID].nonce) assert.GreaterOrEqual(i.nonce, senders[i.senderID].nonce, msg, i.senderID) } if tx.subPool&EnoughBalance > 0 { @@ -481,6 +473,7 @@ func FuzzOnNewBlocks(f *testing.F) { checkNotify := func(unwindTxs, minedTxs TxSlots, msg string) { pending, baseFee, queued := pool.pending, pool.baseFee, pool.queued + _, _ = baseFee, queued select { case newHashes := <-ch: //assert.Equal(len(txs1.txs), newHashes.Len()) @@ -505,9 +498,11 @@ func FuzzOnNewBlocks(f *testing.F) { assert.False(foundInMined, msg) } default: // no notifications - means pools must be unchanged or drop some txs - assert.GreaterOrEqual(prevTotal, pending.Len()+baseFee.Len()+queued.Len(), msg) + pendingHashes := copyHashes(pending) + require.Zero(extractNewHashes(pendingHashes, prevHashes).Len()) } - prevTotal = pending.Len() + baseFee.Len() + queued.Len() + prevHashes = copyHashes(pending) + _ = prevHashes } //TODO: check that id=>addr and addr=>id mappings have same len @@ -540,9 +535,8 @@ func FuzzOnNewBlocks(f *testing.F) { }) } // go to first fork - //fmt.Printf("ll1: %d,%d,%d\n", pool.pending.Len(), pool.baseFee.Len(), pool.queued.Len()) txs1, txs2, p2pReceived, txs3 := splitDataset(txs) - err = pool.OnNewBlock(ctx, change, txs1, TxSlots{}, nil) + err = pool.OnNewBlock(ctx, change, txs1, TxSlots{}, tx) assert.NoError(err) check(txs1, TxSlots{}, "fork1") checkNotify(txs1, TxSlots{}, "fork1") @@ -554,7 +548,7 @@ func FuzzOnNewBlocks(f *testing.F) { {BlockHeight: 1, BlockHash: h1, PrevBlockHeight: 0, PrevBlockHash: h1, ProtocolBaseFee: currentBaseFee}, }, } - err = pool.OnNewBlock(ctx, change, TxSlots{}, txs2, nil) + err = pool.OnNewBlock(ctx, change, TxSlots{}, txs2, tx) check(TxSlots{}, txs2, "fork1 mined") checkNotify(TxSlots{}, txs2, "fork1 mined") @@ -565,7 +559,7 @@ func FuzzOnNewBlocks(f *testing.F) { {BlockHeight: 0, BlockHash: h1, Direction: remote.Direction_UNWIND, PrevBlockHeight: 1, PrevBlockHash: h1, ProtocolBaseFee: currentBaseFee}, }, } - err = pool.OnNewBlock(ctx, change, txs2, TxSlots{}, nil) + err = pool.OnNewBlock(ctx, change, txs2, TxSlots{}, tx) assert.NoError(err) check(txs2, TxSlots{}, "fork2") checkNotify(txs2, TxSlots{}, "fork2") @@ -576,7 +570,7 @@ func FuzzOnNewBlocks(f *testing.F) { {BlockHeight: 1, BlockHash: h22, PrevBlockHeight: 0, PrevBlockHash: h1, ProtocolBaseFee: currentBaseFee}, }, } - err = pool.OnNewBlock(ctx, change, TxSlots{}, txs3, nil) + err = pool.OnNewBlock(ctx, change, TxSlots{}, txs3, tx) assert.NoError(err) check(TxSlots{}, txs3, "fork2 mined") checkNotify(TxSlots{}, txs3, "fork2 mined") @@ -610,7 +604,7 @@ func FuzzOnNewBlocks(f *testing.F) { assert.Equal(pool.pending.Len(), p2.pending.Len()) assert.Equal(pool.baseFee.Len(), p2.baseFee.Len()) - assert.Equal(pool.queued.Len(), p2.queued.Len()) + require.Equal(pool.queued.Len(), p2.queued.Len()) assert.Equal(pool.currentBaseFee.Load(), p2.currentBaseFee.Load()) assert.Equal(pool.protocolBaseFee.Load(), p2.protocolBaseFee.Load()) }) @@ -622,3 +616,27 @@ func bigEndian(n uint64) []byte { binary.BigEndian.PutUint64(num[:], n) return num[:] } + +func copyHashes(p *PendingPool) (hashes Hashes) { + for i := range p.best { + hashes = append(hashes, p.best[i].Tx.idHash[:]...) + } + return hashes +} + +//extractNewHashes - extract from h1 hashes which do not exist in h2 +func extractNewHashes(h1, h2 Hashes) (result Hashes) { + for i := 0; i < h1.Len(); i++ { + found := false + for j := 0; j < h2.Len(); j++ { + if bytes.Equal(h1.At(i), h2.At(j)) { + found = true + break + } + } + if !found { + result = append(result, h1.At(i)...) + } + } + return result +} diff --git a/txpool/pool_test.go b/txpool/pool_test.go index 23d97c0dd..1ad3456d7 100644 --- a/txpool/pool_test.go +++ b/txpool/pool_test.go @@ -23,7 +23,7 @@ import ( func BenchmarkName(b *testing.B) { txs := make([]*metaTx, 10_000) - p := NewSubPool(BaseFeeSubPool) + p := NewSubPool(BaseFeeSubPool, 1024) for i := 0; i < len(txs); i++ { txs[i] = &metaTx{Tx: &TxSlot{}} } @@ -41,7 +41,7 @@ func BenchmarkName(b *testing.B) { func BenchmarkName2(b *testing.B) { txs := make([]*metaTx, 10_000) - p := NewSubPool(BaseFeeSubPool) + p := NewSubPool(BaseFeeSubPool, 1024) for i := 0; i < len(txs); i++ { txs[i] = &metaTx{Tx: &TxSlot{}} }