Pool: correct new pending txs notifications (#85)

This commit is contained in:
Alex Sharov 2021-09-21 16:39:41 +07:00 committed by GitHub
parent 13b0978d86
commit 4862356290
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 155 additions and 147 deletions

5
go.mod
View File

@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon-lib
go 1.16
require (
github.com/VictoriaMetrics/metrics v1.17.3
github.com/VictoriaMetrics/metrics v1.18.0
github.com/c2h5oh/datasize v0.0.0-20200825124411-48ed595a09d2
github.com/go-stack/stack v1.8.1
github.com/golang/protobuf v1.5.2
@ -11,7 +11,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d
github.com/holiman/uint256 v1.2.0
github.com/ledgerwatch/log/v3 v3.3.0
github.com/ledgerwatch/log/v3 v3.3.1
github.com/ledgerwatch/secp256k1 v0.0.0-20210626115225-cd5cd00ed72d
github.com/matryer/moq v0.2.3
github.com/spaolacci/murmur3 v1.1.0
@ -19,7 +19,6 @@ require (
github.com/torquem-ch/mdbx-go v0.18.1
github.com/ugorji/go/codec v1.1.13
go.uber.org/atomic v1.9.0
golang.org/dl v0.0.0-20210909185531-e2a88a019121 // indirect
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e
google.golang.org/grpc v1.39.1
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0

29
go.sum
View File

@ -1,8 +1,8 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/VictoriaMetrics/metrics v1.17.3 h1:QPUakR6JRy8BhL2C2kOgYKLuoPDwtJQ+7iKIZSjt1A4=
github.com/VictoriaMetrics/metrics v1.17.3/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE=
github.com/VictoriaMetrics/metrics v1.18.0 h1:vov5NxDHRSXFbdiH4dYLYEjKLoAXXSQ7hcnG8TSD9JQ=
github.com/VictoriaMetrics/metrics v1.18.0/go.mod h1:ArjwVz7WpgpegX/JpB0zpNF2h2232kErkEnzH1sxMmA=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/c2h5oh/datasize v0.0.0-20200825124411-48ed595a09d2 h1:t8KYCwSKsOEZBFELI4Pn/phbp38iJ1RRAkDFNin1aak=
github.com/c2h5oh/datasize v0.0.0-20200825124411-48ed595a09d2/go.mod h1:S/7n9copUssQ56c7aAgHqftWO4LTf4xY6CGWt8Bc+3M=
@ -64,17 +64,17 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/ledgerwatch/log/v3 v3.3.0 h1:k8N/3NQLILr8CKCMyza261vLFKU7VA+nMNNb0wVyQSc=
github.com/ledgerwatch/log/v3 v3.3.0/go.mod h1:J58eOHHrIYHxl7LKkRsb/0YibKwtLfauUryl5SLRGm0=
github.com/ledgerwatch/log/v3 v3.3.1 h1:HmvLeTEvtCtqSvtu4t/a5MAdcLfeBcbIeowXbLYuzLc=
github.com/ledgerwatch/log/v3 v3.3.1/go.mod h1:S3VJqhhVX32rbp1JyyvhJou12twtFwNEPESBgpbNkRk=
github.com/ledgerwatch/secp256k1 v0.0.0-20210626115225-cd5cd00ed72d h1:/IKMrJdfRsoYNc36PXqP4xMH3vhW/8IQyBKGQbKZUno=
github.com/ledgerwatch/secp256k1 v0.0.0-20210626115225-cd5cd00ed72d/go.mod h1:SPmqJFciiF/Q0mPt2jVs2dTr/1TZBTIA+kPMmKgBAak=
github.com/matryer/moq v0.2.3 h1:Q06vEqnBYjjfx5KKgHfYRKE/lvlRu+Nj+xodG4YdHnU=
github.com/matryer/moq v0.2.3/go.mod h1:9RtPYjTnH1bSBIkpvtHkFN7nbWAnO7oRpdJkEIn6UtE=
github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.9 h1:sqDoxXbdeALODt0DAeJCVp38ps9ZogZEAXjus69YV3U=
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.13 h1:qdl+GuBjcsKKDco5BsxPJlId98mSWNKqYA+Co0SC1yA=
github.com/mattn/go-isatty v0.0.13/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@ -98,10 +98,10 @@ github.com/ugorji/go v1.1.13 h1:nB3O5kBSQGjEQAcfe1aLUYuxmXdFKmYgBZhY32rQb6Q=
github.com/ugorji/go v1.1.13/go.mod h1:jxau1n+/wyTGLQoCkjok9r5zFa/FxT6eI5HiHKQszjc=
github.com/ugorji/go/codec v1.1.13 h1:013LbFhocBoIqgHeIHKlV4JWYhqogATYWZhIcH0WHn4=
github.com/ugorji/go/codec v1.1.13/go.mod h1:oNVt3Dq+FO91WNQ/9JnHKQP2QJxTzoN7wCBFCq1OeuU=
github.com/valyala/fastrand v1.0.0 h1:LUKT9aKer2dVQNUi3waewTbKV+7H17kvWFNKs2ObdkI=
github.com/valyala/fastrand v1.0.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
github.com/valyala/histogram v1.1.2 h1:vOk5VrGjMBIoPR5k6wA8vBaC8toeJ8XO0yfRjFEc1h8=
github.com/valyala/histogram v1.1.2/go.mod h1:CZAr6gK9dbD7hYx2s8WSPh0p5x5wETjC+2b3PJVtEdg=
github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8=
github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ=
github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@ -111,8 +111,6 @@ go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/dl v0.0.0-20210909185531-e2a88a019121 h1:fZ6ug6KBaLGJtlXN5TVaEEZ545CrhlY0LiG2l+GMs0Q=
golang.org/dl v0.0.0-20210909185531-e2a88a019121/go.mod h1:IUMfjQLJQd4UTqG1Z90tenwKoCX93Gn3MAQJMOSBsDQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
@ -157,8 +155,9 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210917161153-d61c044b1678 h1:J27LZFQBFoihqXoegpscI10HpjZ7B5WQLLKL2FZXQKw=
golang.org/x/sys v0.0.0-20210917161153-d61c044b1678/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=

View File

@ -60,8 +60,6 @@ func NewServer(rateLimit uint32, creds credentials.TransportCredentials) *grpc.S
// unaryInterceptors = append(unaryInterceptors, grpc_prometheus.UnaryServerInterceptor)
//}
var grpcServer *grpc.Server
reflection.Register(grpcServer)
//cpus := uint32(runtime.GOMAXPROCS(-1))
opts := []grpc.ServerOption{
//grpc.NumStreamWorkers(cpus), // reduce amount of goroutines
@ -78,7 +76,8 @@ func NewServer(rateLimit uint32, creds credentials.TransportCredentials) *grpc.S
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)),
grpc.Creds(creds),
}
grpcServer = grpc.NewServer(opts...)
grpcServer := grpc.NewServer(opts...)
reflection.Register(grpcServer)
//if metrics.Enabled {
// grpc_prometheus.Register(grpcServer)

View File

@ -290,8 +290,6 @@ func StartGrpc(txPoolServer txpool_proto.TxpoolServer, miningServer txpool_proto
// unaryInterceptors = append(unaryInterceptors, grpc_prometheus.UnaryServerInterceptor)
//}
var grpcServer *grpc.Server
reflection.Register(grpcServer) // Register reflection service on gRPC server.
//cpus := uint32(runtime.GOMAXPROCS(-1))
opts := []grpc.ServerOption{
//grpc.NumStreamWorkers(cpus), // reduce amount of goroutines
@ -310,7 +308,8 @@ func StartGrpc(txPoolServer txpool_proto.TxpoolServer, miningServer txpool_proto
} else {
opts = append(opts, grpc.Creds(*creds))
}
grpcServer = grpc.NewServer(opts...)
grpcServer := grpc.NewServer(opts...)
reflection.Register(grpcServer) // Register reflection service on gRPC server.
if txPoolServer != nil {
txpool_proto.RegisterTxpoolServer(grpcServer, txPoolServer)
}

View File

@ -222,6 +222,7 @@ type TxPool struct {
senders *sendersBatch
_cache kvcache.Cache
byNonce *ByNonce // senderID => (sorted map of tx nonce => *metaTx)
promoted Hashes // pre-allocated buffer to write promoted to pending pool txn hashes
// batch processing of remote transactions
// handling works fast without batching, but batching allow:
@ -252,9 +253,9 @@ func New(newTxs chan Hashes, coreDB kv.RoDB, cfg Config, cache kvcache.Cache, ru
discardReasonsLRU: discardHistory,
byNonce: &ByNonce{btree.New(32)},
recentlyConnectedPeers: &recentlyConnectedPeers{},
pending: NewPendingSubPool(PendingSubPool),
baseFee: NewSubPool(BaseFeeSubPool),
queued: NewSubPool(QueuedSubPool),
pending: NewPendingSubPool(PendingSubPool, cfg.PendingSubPoolLimit),
baseFee: NewSubPool(BaseFeeSubPool, cfg.BaseFeeSubPoolLimit),
queued: NewSubPool(QueuedSubPool, cfg.QueuedSubPoolLimit),
newTxs: newTxs,
_cache: cache,
senders: newSendersCache(),
@ -265,6 +266,7 @@ func New(newTxs chan Hashes, coreDB kv.RoDB, cfg Config, cache kvcache.Cache, ru
senderID: 1,
unprocessedRemoteTxs: &TxSlots{},
unprocessedRemoteByHash: map[string]int{},
promoted: make(Hashes, 0, 32*1024),
}, nil
}
@ -331,46 +333,21 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang
return err
}
for _, changesList := range stateChanges.ChangeBatch {
for _, change := range changesList.Changes {
switch change.Action {
case remote.Action_UPSERT, remote.Action_UPSERT_CODE:
if change.Incarnation > 0 {
continue
}
addr := gointerfaces.ConvertH160toAddress(change.Address)
id, ok := p.senders.id(string(addr[:]))
if !ok {
continue
}
nonce, balance, err := p.senders.info(cache, viewID, coreTx, id)
if err != nil {
return err
}
onSenderChange(id, nonce, balance, p.byNonce, protocolBaseFee, baseFee, p.pending, p.baseFee, p.queued)
}
}
}
//log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "baseFee", baseFee, "blockHeight", blockHeight)
if err := addTxs(p.lastSeenBlock.Load(), cache, viewID, coreTx, p.cfg, p.senders, unwindTxs, protocolBaseFee, baseFee, p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.addLocked, p.discardLocked); err != nil {
p.pending.captureAddedHashes(&p.promoted)
if err := addTxs(p.lastSeenBlock.Load(), stateChanges, cache, viewID, coreTx, p.senders, unwindTxs, protocolBaseFee, baseFee, p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.addLocked, p.discardLocked); err != nil {
return err
}
p.pending.added = nil
if p.started.CAS(false, true) {
log.Info("[txpool] Started")
}
notifyNewTxs := make(Hashes, 0, 32*len(unwindTxs.txs))
for i := range unwindTxs.txs {
_, ok := p.byHash[string(unwindTxs.txs[i].idHash[:])]
if !ok {
continue
}
notifyNewTxs = append(notifyNewTxs, unwindTxs.txs[i].idHash[:]...)
}
if len(notifyNewTxs) > 0 {
if p.promoted.Len() > 0 {
select {
case p.newTxs <- notifyNewTxs:
case p.newTxs <- common.Copy(p.promoted):
default:
}
}
@ -414,24 +391,17 @@ func (p *TxPool) processRemoteTxs(ctx context.Context) error {
return err
}
if err := addTxs(p.lastSeenBlock.Load(), cache, viewID, coreTx, p.cfg, p.senders, newTxs, p.protocolBaseFee.Load(), p.currentBaseFee.Load(), p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.addLocked, p.discardLocked); err != nil {
p.pending.captureAddedHashes(&p.promoted)
if err := addTxs(p.lastSeenBlock.Load(), nil, cache, viewID, coreTx, p.senders, newTxs, p.protocolBaseFee.Load(), p.currentBaseFee.Load(), p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.addLocked, p.discardLocked); err != nil {
return err
}
p.pending.added = nil
// notify about all non-dropped txs
notifyNewTxs := make(Hashes, 0, 32*len(newTxs.txs))
for i := range newTxs.txs {
_, ok := p.byHash[string(newTxs.txs[i].idHash[:])]
if !ok {
continue
}
notifyNewTxs = append(notifyNewTxs, newTxs.txs[i].idHash[:]...)
}
if len(notifyNewTxs) > 0 {
if p.promoted.Len() > 0 {
select {
case <-ctx.Done():
return nil
case p.newTxs <- notifyNewTxs:
case p.newTxs <- common.Copy(p.promoted):
default:
}
}
@ -588,22 +558,16 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTxs TxSlots) ([]DiscardReas
if err = p.senders.onNewTxs(newTxs); err != nil {
return nil, err
}
if err := addTxs(p.lastSeenBlock.Load(), p._cache, viewID, coreTx, p.cfg, p.senders, newTxs, p.protocolBaseFee.Load(), p.currentBaseFee.Load(), p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.addLocked, p.discardLocked); err != nil {
p.pending.captureAddedHashes(&p.promoted)
if err := addTxs(p.lastSeenBlock.Load(), nil, p._cache, viewID, coreTx, p.senders, newTxs, p.protocolBaseFee.Load(), p.currentBaseFee.Load(), p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.addLocked, p.discardLocked); err != nil {
return nil, err
}
p.pending.added = nil
// notify about all non-dropped txs
notifyNewTxs := make(Hashes, 0, 32*len(newTxs.txs))
for i := range newTxs.txs {
_, ok := p.byHash[string(newTxs.txs[i].idHash[:])]
if !ok {
continue
}
notifyNewTxs = append(notifyNewTxs, newTxs.txs[i].idHash[:]...)
}
if len(notifyNewTxs) > 0 {
if p.promoted.Len() > 0 {
select {
case p.newTxs <- notifyNewTxs:
case p.newTxs <- common.Copy(p.promoted):
default:
}
}
@ -628,7 +592,7 @@ func (p *TxPool) cache() kvcache.Cache {
defer p.lock.RUnlock()
return p._cache
}
func addTxs(blockNum uint64, cache kvcache.Cache, viewID kvcache.ViewID, coreTx kv.Tx, cfg Config, senders *sendersBatch, newTxs TxSlots, protocolBaseFee, currentBaseFee uint64, pending *PendingPool, baseFee, queued *SubPool, byNonce *ByNonce, byHash map[string]*metaTx, add func(*metaTx) bool, discard func(*metaTx, DiscardReason)) error {
func addTxs(blockNum uint64, stateChanges *remote.StateChangeBatch, cache kvcache.Cache, viewID kvcache.ViewID, coreTx kv.Tx, senders *sendersBatch, newTxs TxSlots, protocolBaseFee, currentBaseFee uint64, pending *PendingPool, baseFee, queued *SubPool, byNonce *ByNonce, byHash map[string]*metaTx, add func(*metaTx) bool, discard func(*metaTx, DiscardReason)) error {
if ASSERT {
for i := range newTxs.txs {
if newTxs.txs[i].senderID == 0 {
@ -636,7 +600,6 @@ func addTxs(blockNum uint64, cache kvcache.Cache, viewID kvcache.ViewID, coreTx
}
}
}
//defer func(t time.Time) { fmt.Printf("pool.go:611: %s\n", time.Since(t)) }(time.Now())
// This can be thought of a reverse operation from the one described before.
// When a block that was deemed "the best" of its height, is no longer deemed "the best", the
// transactions contained in it, are now viable for inclusion in other blocks, and therefore should
@ -646,20 +609,50 @@ func addTxs(blockNum uint64, cache kvcache.Cache, viewID kvcache.ViewID, coreTx
// they effective lose their priority over the "remote" transactions. In order to prevent that,
// somehow the fact that certain transactions were local, needs to be remembered for some
// time (up to some "immutability threshold").
changedSenders := unsafeAddToPendingPool(blockNum, newTxs, byHash, add)
for id := range changedSenders {
nonce, balance, err := senders.info(cache, viewID, coreTx, id)
changedSenders := map[uint64]struct{}{}
for i, txn := range newTxs.txs {
if _, ok := byHash[string(txn.idHash[:])]; ok {
continue
}
mt := newMetaTx(txn, newTxs.isLocal[i], blockNum)
if !add(mt) {
continue
}
changedSenders[mt.Tx.senderID] = struct{}{}
}
if stateChanges != nil {
// re-calc all transactions of changed senders
for _, changesList := range stateChanges.ChangeBatch {
for _, change := range changesList.Changes {
switch change.Action {
case remote.Action_UPSERT, remote.Action_UPSERT_CODE:
if change.Incarnation > 0 {
continue
}
addr := gointerfaces.ConvertH160toAddress(change.Address)
id, ok := senders.id(string(addr[:]))
if !ok {
continue
}
changedSenders[id] = struct{}{}
}
}
}
}
for senderID := range changedSenders {
nonce, balance, err := senders.info(cache, viewID, coreTx, senderID)
if err != nil {
return err
}
onSenderChange(id, nonce, balance, byNonce, protocolBaseFee, currentBaseFee, pending, baseFee, queued)
onSenderChange(senderID, nonce, balance, byNonce, protocolBaseFee, currentBaseFee, pending, baseFee, queued)
}
//defer func(t time.Time) { fmt.Printf("pool.go:630: %s\n", time.Since(t)) }(time.Now())
//pending.EnforceWorstInvariants()
//baseFee.EnforceInvariants()
//queued.EnforceInvariants()
promote(pending, baseFee, queued, cfg, discard)
promote(pending, baseFee, queued, discard)
//pending.EnforceWorstInvariants()
pending.EnforceBestInvariants()
@ -707,7 +700,7 @@ func (p *TxPool) addLocked(mt *metaTx) bool {
if mt.subPool&IsLocal != 0 {
p.isLocalLRU.Add(string(mt.Tx.idHash[:]), struct{}{})
}
p.pending.Add(mt)
p.queued.Add(mt)
return true
}
func (p *TxPool) discardLocked(mt *metaTx, reason DiscardReason) {
@ -767,22 +760,6 @@ func removeMined(byNonce *ByNonce, minedTxs []*TxSlot, pending *PendingPool, bas
return nil
}
// unwind
func unsafeAddToPendingPool(blockNum uint64, newTxs TxSlots, byHash map[string]*metaTx, add func(*metaTx) bool) (changedSenders map[uint64]struct{}) {
changedSenders = map[uint64]struct{}{}
for i, txn := range newTxs.txs {
if _, ok := byHash[string(txn.idHash[:])]; ok {
continue
}
mt := newMetaTx(txn, newTxs.isLocal[i], blockNum)
if add(mt) {
changedSenders[txn.senderID] = struct{}{}
}
}
return changedSenders
}
func onSenderChange(senderID uint64, senderNonce uint64, senderBalance uint256.Int, byNonce *ByNonce, protocolBaseFee, currentBaseFee uint64, pending *PendingPool, baseFee, queued *SubPool) {
noGapsNonce := senderNonce
cumulativeRequiredBalance := uint256.NewInt(0)
@ -856,7 +833,7 @@ func onSenderChange(senderID uint64, senderNonce uint64, senderBalance uint256.I
})
}
func promote(pending *PendingPool, baseFee, queued *SubPool, cfg Config, discard func(*metaTx, DiscardReason)) {
func promote(pending *PendingPool, baseFee, queued *SubPool, discard func(*metaTx, DiscardReason)) {
//1. If top element in the worst green queue has subPool != 0b1111 (binary), it needs to be removed from the green pool.
// If subPool < 0b1000 (not satisfying minimum fee), discard.
// If subPool == 0b1110, demote to the yellow pool, otherwise demote to the red pool.
@ -876,7 +853,7 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, cfg Config, discard
}
//2. If top element in the worst green queue has subPool == 0b1111, but there is not enough room in the pool, discard.
for worst := pending.Worst(); pending.Len() > cfg.PendingSubPoolLimit; worst = pending.Worst() {
for worst := pending.Worst(); pending.Len() > pending.limit; worst = pending.Worst() {
if worst.subPool >= 0b11111 { // TODO: here must 'subPool == 0b1111' or 'subPool <= 0b1111' ?
break
}
@ -905,7 +882,7 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, cfg Config, discard
}
//5. If the top element in the worst yellow queue has subPool == 0x1110, but there is not enough room in the pool, discard.
for worst := baseFee.Worst(); baseFee.Len() > cfg.BaseFeeSubPoolLimit; worst = baseFee.Worst() {
for worst := baseFee.Worst(); baseFee.Len() > baseFee.limit; worst = baseFee.Worst() {
if worst.subPool >= 0b11110 {
break
}
@ -930,12 +907,11 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, cfg Config, discard
if worst.subPool >= 0b10000 {
break
}
discard(queued.PopWorst(), FeeTooLow)
}
//8. If the top element in the worst red queue has subPool >= 0b100, but there is not enough room in the pool, discard.
for _ = queued.Worst(); queued.Len() > cfg.QueuedSubPoolLimit; _ = queued.Worst() {
for _ = queued.Worst(); queued.Len() > queued.limit; _ = queued.Worst() {
discard(queued.PopWorst(), QueuedPoolOverflow)
}
}
@ -1065,6 +1041,7 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (err error) {
delete(p.senders.senderIDs, addr)
}
}
//fmt.Printf("del:%d,%d,%d\n", p.deletedTxs[i].Tx.senderID, p.deletedTxs[i].Tx.nonce, p.deletedTxs[i].Tx.tip)
if err := tx.Delete(kv.PoolTransaction, p.deletedTxs[i].Tx.idHash[:], nil); err != nil {
return err
}
@ -1202,7 +1179,7 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error {
if err != nil {
return err
}
if err := addTxs(0, p._cache, viewID, coreTx, p.cfg, p.senders, txs, protocolBaseFee, currentBaseFee, p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.addLocked, p.discardLocked); err != nil {
if err := addTxs(p.lastSeenBlock.Load(), nil, p._cache, viewID, coreTx, p.senders, txs, protocolBaseFee, currentBaseFee, p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.addLocked, p.discardLocked); err != nil {
return err
}
p.currentBaseFee.Store(currentBaseFee)
@ -1261,11 +1238,14 @@ func (p *TxPool) printDebug(prefix string) {
fmt.Printf("\tsenderID=%d, nonce=%d, tip=%d\n", j.Tx.senderID, j.Tx.nonce, j.Tx.tip)
}
fmt.Printf("%s.pool.queues.len: %d,%d,%d\n", prefix, p.pending.Len(), p.baseFee.Len(), p.queued.Len())
for i := range p.pending.best {
p.pending.best[i].Tx.printDebug(fmt.Sprintf("%s.pending: %b", prefix, p.pending.best[i].subPool))
for _, mt := range p.pending.best {
mt.Tx.printDebug(fmt.Sprintf("%s.pending: %b,%d,%d,%d", prefix, mt.subPool, mt.Tx.senderID, mt.Tx.nonce, mt.Tx.tip))
}
for i := range *p.queued.best {
(*p.queued.best)[i].Tx.printDebug(fmt.Sprintf("%s.queued : %b", prefix, (*p.queued.best)[i].subPool))
for _, mt := range *p.baseFee.best {
mt.Tx.printDebug(fmt.Sprintf("%s.baseFee : %b,%d,%d,%d", prefix, mt.subPool, mt.Tx.senderID, mt.Tx.nonce, mt.Tx.tip))
}
for _, mt := range *p.queued.best {
mt.Tx.printDebug(fmt.Sprintf("%s.queued : %b,%d,%d,%d", prefix, mt.subPool, mt.Tx.senderID, mt.Tx.nonce, mt.Tx.tip))
}
}
func (p *TxPool) logStats() {
@ -1523,13 +1503,20 @@ func (b *ByNonce) replaceOrInsert(mt *metaTx) *metaTx {
}
type PendingPool struct {
limit int
t SubPoolType
best bestSlice
worst *WorstQueue
added *Hashes
}
func NewPendingSubPool(t SubPoolType) *PendingPool {
return &PendingPool{t: t, best: []*metaTx{}, worst: &WorstQueue{}}
func NewPendingSubPool(t SubPoolType, limit int) *PendingPool {
return &PendingPool{limit: limit, t: t, best: []*metaTx{}, worst: &WorstQueue{}}
}
func (p *PendingPool) captureAddedHashes(to *Hashes) {
p.added = to
*p.added = (*p.added)[:0]
}
// bestSlice - is similar to best queue, but with O(n log n) complexity and
@ -1602,11 +1589,17 @@ func (p *PendingPool) UnsafeRemove(i *metaTx) {
p.best = p.best.UnsafeRemove(i)
}
func (p *PendingPool) UnsafeAdd(i *metaTx) {
if p.added != nil {
*p.added = append(*p.added, i.Tx.idHash[:]...)
}
i.currentSubPool = p.t
p.worst.Push(i)
p.best = p.best.UnsafeAdd(i)
}
func (p *PendingPool) Add(i *metaTx) {
if p.added != nil {
*p.added = append(*p.added, i.Tx.idHash[:]...)
}
i.currentSubPool = p.t
heap.Push(p.worst, i)
p.best = p.best.UnsafeAdd(i)
@ -1621,13 +1614,14 @@ func (p *PendingPool) DebugPrint(prefix string) {
}
type SubPool struct {
limit int
t SubPoolType
best *BestQueue
worst *WorstQueue
}
func NewSubPool(t SubPoolType) *SubPool {
return &SubPool{t: t, best: &BestQueue{}, worst: &WorstQueue{}}
func NewSubPool(t SubPoolType, limit int) *SubPool {
return &SubPool{limit: limit, t: t, best: &BestQueue{}, worst: &WorstQueue{}}
}
func (p *SubPool) EnforceInvariants() {

View File

@ -34,7 +34,7 @@ import (
// gotip test -trimpath -v -fuzz=Fuzz -fuzztime=10s ./txpool
func init() {
log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler))
//log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler))
}
func FuzzTwoQueue(f *testing.F) {
@ -44,7 +44,7 @@ func FuzzTwoQueue(f *testing.F) {
t.Parallel()
assert := assert.New(t)
{
sub := NewPendingSubPool(PendingSubPool)
sub := NewPendingSubPool(PendingSubPool, 1024)
for _, i := range in {
sub.UnsafeAdd(&metaTx{subPool: SubPoolMarker(i & 0b11111), Tx: &TxSlot{nonce: 1, value: *uint256.NewInt(1)}})
}
@ -65,7 +65,7 @@ func FuzzTwoQueue(f *testing.F) {
}
}
{
sub := NewSubPool(BaseFeeSubPool)
sub := NewSubPool(BaseFeeSubPool, 1024)
for _, i := range in {
sub.Add(&metaTx{subPool: SubPoolMarker(i & 0b11111), Tx: &TxSlot{nonce: 1, value: *uint256.NewInt(1)}})
}
@ -98,7 +98,7 @@ func FuzzTwoQueue(f *testing.F) {
}
{
sub := NewSubPool(QueuedSubPool)
sub := NewSubPool(QueuedSubPool, 1024)
for _, i := range in {
sub.Add(&metaTx{subPool: SubPoolMarker(i & 0b11111), Tx: &TxSlot{nonce: 1, value: *uint256.NewInt(1)}})
}
@ -304,9 +304,9 @@ func splitDataset(in TxSlots) (TxSlots, TxSlots, TxSlots, TxSlots) {
func FuzzOnNewBlocks(f *testing.F) {
var u64 = [1 * 4]byte{1}
var senderAddr = [1 + 1 + 1]byte{1}
f.Add(u64[:], u64[:], u64[:], u64[:], senderAddr[:], 12)
f.Add(u64[:], u64[:], u64[:], u64[:], senderAddr[:], 14)
f.Add(u64[:], u64[:], u64[:], u64[:], senderAddr[:], 123)
f.Add(u64[:], u64[:], u64[:], u64[:], senderAddr[:], uint8(12))
f.Add(u64[:], u64[:], u64[:], u64[:], senderAddr[:], uint8(14))
f.Add(u64[:], u64[:], u64[:], u64[:], senderAddr[:], uint8(123))
f.Fuzz(func(t *testing.T, txNonce, values, tips, feeCap, senderAddr []byte, currentBaseFee1 uint8) {
//t.Parallel()
ctx := context.Background()
@ -328,7 +328,7 @@ func FuzzOnNewBlocks(f *testing.F) {
err := txs.Valid()
assert.NoError(err)
var prevTotal int
var prevHashes Hashes
ch := make(chan Hashes, 100)
@ -348,10 +348,6 @@ func FuzzOnNewBlocks(f *testing.F) {
pool.senders.senderID = uint64(len(senderIDs))
check := func(unwindTxs, minedTxs TxSlots, msg string) {
pending, baseFee, queued := pool.pending, pool.baseFee, pool.queued
//if pending.Len() > 5 && baseFee.Len() > 5 && queued.Len() > 5 {
// fmt.Printf("len %s: %d,%d,%d\n", msg, pending.Len(), baseFee.Len(), queued.Len())
//}
best, worst := pending.Best(), pending.Worst()
assert.LessOrEqual(pending.Len(), cfg.PendingSubPoolLimit)
assert.False(worst != nil && best == nil, msg)
@ -362,10 +358,6 @@ func FuzzOnNewBlocks(f *testing.F) {
for _, tx := range pending.best {
i := tx.Tx
if tx.subPool&NoNonceGaps > 0 {
//for id := range senders {
// fmt.Printf("now: %d, %d\n", id, senders[id].nonce)
//}
//fmt.Printf("?? %d,%d\n", i.senderID, senders[i.senderID].nonce)
assert.GreaterOrEqual(i.nonce, senders[i.senderID].nonce, msg, i.senderID)
}
if tx.subPool&EnoughBalance > 0 {
@ -481,6 +473,7 @@ func FuzzOnNewBlocks(f *testing.F) {
checkNotify := func(unwindTxs, minedTxs TxSlots, msg string) {
pending, baseFee, queued := pool.pending, pool.baseFee, pool.queued
_, _ = baseFee, queued
select {
case newHashes := <-ch:
//assert.Equal(len(txs1.txs), newHashes.Len())
@ -505,9 +498,11 @@ func FuzzOnNewBlocks(f *testing.F) {
assert.False(foundInMined, msg)
}
default: // no notifications - means pools must be unchanged or drop some txs
assert.GreaterOrEqual(prevTotal, pending.Len()+baseFee.Len()+queued.Len(), msg)
pendingHashes := copyHashes(pending)
require.Zero(extractNewHashes(pendingHashes, prevHashes).Len())
}
prevTotal = pending.Len() + baseFee.Len() + queued.Len()
prevHashes = copyHashes(pending)
_ = prevHashes
}
//TODO: check that id=>addr and addr=>id mappings have same len
@ -540,9 +535,8 @@ func FuzzOnNewBlocks(f *testing.F) {
})
}
// go to first fork
//fmt.Printf("ll1: %d,%d,%d\n", pool.pending.Len(), pool.baseFee.Len(), pool.queued.Len())
txs1, txs2, p2pReceived, txs3 := splitDataset(txs)
err = pool.OnNewBlock(ctx, change, txs1, TxSlots{}, nil)
err = pool.OnNewBlock(ctx, change, txs1, TxSlots{}, tx)
assert.NoError(err)
check(txs1, TxSlots{}, "fork1")
checkNotify(txs1, TxSlots{}, "fork1")
@ -554,7 +548,7 @@ func FuzzOnNewBlocks(f *testing.F) {
{BlockHeight: 1, BlockHash: h1, PrevBlockHeight: 0, PrevBlockHash: h1, ProtocolBaseFee: currentBaseFee},
},
}
err = pool.OnNewBlock(ctx, change, TxSlots{}, txs2, nil)
err = pool.OnNewBlock(ctx, change, TxSlots{}, txs2, tx)
check(TxSlots{}, txs2, "fork1 mined")
checkNotify(TxSlots{}, txs2, "fork1 mined")
@ -565,7 +559,7 @@ func FuzzOnNewBlocks(f *testing.F) {
{BlockHeight: 0, BlockHash: h1, Direction: remote.Direction_UNWIND, PrevBlockHeight: 1, PrevBlockHash: h1, ProtocolBaseFee: currentBaseFee},
},
}
err = pool.OnNewBlock(ctx, change, txs2, TxSlots{}, nil)
err = pool.OnNewBlock(ctx, change, txs2, TxSlots{}, tx)
assert.NoError(err)
check(txs2, TxSlots{}, "fork2")
checkNotify(txs2, TxSlots{}, "fork2")
@ -576,7 +570,7 @@ func FuzzOnNewBlocks(f *testing.F) {
{BlockHeight: 1, BlockHash: h22, PrevBlockHeight: 0, PrevBlockHash: h1, ProtocolBaseFee: currentBaseFee},
},
}
err = pool.OnNewBlock(ctx, change, TxSlots{}, txs3, nil)
err = pool.OnNewBlock(ctx, change, TxSlots{}, txs3, tx)
assert.NoError(err)
check(TxSlots{}, txs3, "fork2 mined")
checkNotify(TxSlots{}, txs3, "fork2 mined")
@ -610,7 +604,7 @@ func FuzzOnNewBlocks(f *testing.F) {
assert.Equal(pool.pending.Len(), p2.pending.Len())
assert.Equal(pool.baseFee.Len(), p2.baseFee.Len())
assert.Equal(pool.queued.Len(), p2.queued.Len())
require.Equal(pool.queued.Len(), p2.queued.Len())
assert.Equal(pool.currentBaseFee.Load(), p2.currentBaseFee.Load())
assert.Equal(pool.protocolBaseFee.Load(), p2.protocolBaseFee.Load())
})
@ -622,3 +616,27 @@ func bigEndian(n uint64) []byte {
binary.BigEndian.PutUint64(num[:], n)
return num[:]
}
func copyHashes(p *PendingPool) (hashes Hashes) {
for i := range p.best {
hashes = append(hashes, p.best[i].Tx.idHash[:]...)
}
return hashes
}
//extractNewHashes - extract from h1 hashes which do not exist in h2
func extractNewHashes(h1, h2 Hashes) (result Hashes) {
for i := 0; i < h1.Len(); i++ {
found := false
for j := 0; j < h2.Len(); j++ {
if bytes.Equal(h1.At(i), h2.At(j)) {
found = true
break
}
}
if !found {
result = append(result, h1.At(i)...)
}
}
return result
}

View File

@ -23,7 +23,7 @@ import (
func BenchmarkName(b *testing.B) {
txs := make([]*metaTx, 10_000)
p := NewSubPool(BaseFeeSubPool)
p := NewSubPool(BaseFeeSubPool, 1024)
for i := 0; i < len(txs); i++ {
txs[i] = &metaTx{Tx: &TxSlot{}}
}
@ -41,7 +41,7 @@ func BenchmarkName(b *testing.B) {
func BenchmarkName2(b *testing.B) {
txs := make([]*metaTx, 10_000)
p := NewSubPool(BaseFeeSubPool)
p := NewSubPool(BaseFeeSubPool, 1024)
for i := 0; i < len(txs); i++ {
txs[i] = &metaTx{Tx: &TxSlot{}}
}