allow multiple log subscriptions at the same time (#5358)

This commit is contained in:
hexoscott 2022-09-16 14:36:25 +01:00 committed by GitHub
parent a8a104c35c
commit cd8cad6a89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 591 additions and 19 deletions

View File

@ -0,0 +1,245 @@
package privateapi
import (
"context"
"testing"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
types2 "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"google.golang.org/grpc"
"github.com/ledgerwatch/erigon/common"
)
var (
address1 = common.HexToHash("0xdac17f958d2ee523a2206206994597c13d831ec7")
topic1 = common.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef")
address160 *types2.H160
topic1H256 *types2.H256
)
func init() {
var a common.Address
a.SetBytes(address1.Bytes())
address160 = gointerfaces.ConvertAddressToH160(a)
topic1H256 = gointerfaces.ConvertHashToH256(topic1)
}
type testServer struct {
received chan *remote.LogsFilterRequest
receiveCompleted chan struct{}
sent []*remote.SubscribeLogsReply
ctx context.Context
grpc.ServerStream
}
func (ts *testServer) Send(m *remote.SubscribeLogsReply) error {
ts.sent = append(ts.sent, m)
return nil
}
func (ts *testServer) Recv() (*remote.LogsFilterRequest, error) {
// notify complete when the last request has been processed
defer func() {
if len(ts.received) == 0 {
ts.receiveCompleted <- struct{}{}
}
}()
return <-ts.received, nil
}
func createLog() *remote.SubscribeLogsReply {
return &remote.SubscribeLogsReply{
Address: gointerfaces.ConvertAddressToH160([20]byte{}),
BlockHash: gointerfaces.ConvertHashToH256([32]byte{}),
BlockNumber: 0,
Data: []byte{},
LogIndex: 0,
Topics: []*types2.H256{gointerfaces.ConvertHashToH256([32]byte{99, 99})},
TransactionHash: gointerfaces.ConvertHashToH256([32]byte{}),
TransactionIndex: 0,
Removed: false,
}
}
func TestLogsFilter_EmptyFilter_DoesNotDistributeAnything(t *testing.T) {
events := NewEvents()
agg := NewLogsFilterAggregator(events)
srv := &testServer{
received: make(chan *remote.LogsFilterRequest, 256),
receiveCompleted: make(chan struct{}, 1),
sent: make([]*remote.SubscribeLogsReply, 0),
ctx: context.Background(),
ServerStream: nil,
}
req1 := &remote.LogsFilterRequest{
AllAddresses: false,
Addresses: nil,
AllTopics: false,
Topics: nil,
}
srv.received <- req1
go func() {
err := agg.subscribeLogs(srv)
if err != nil {
t.Error(err)
}
}()
<-srv.receiveCompleted
// now see if a log would be sent or not
log := createLog()
agg.distributeLogs([]*remote.SubscribeLogsReply{log})
if len(srv.sent) != 0 {
t.Error("expected the sent slice to be empty")
}
}
func TestLogsFilter_AllAddressesAndTopicsFilter_DistributesLogRegardless(t *testing.T) {
events := NewEvents()
agg := NewLogsFilterAggregator(events)
srv := &testServer{
received: make(chan *remote.LogsFilterRequest, 256),
receiveCompleted: make(chan struct{}, 1),
sent: make([]*remote.SubscribeLogsReply, 0),
ctx: context.Background(),
ServerStream: nil,
}
req1 := &remote.LogsFilterRequest{
AllAddresses: true,
Addresses: nil,
AllTopics: true,
Topics: nil,
}
srv.received <- req1
go func() {
err := agg.subscribeLogs(srv)
if err != nil {
t.Error(err)
}
}()
<-srv.receiveCompleted
// now see if a log would be sent or not
log := createLog()
agg.distributeLogs([]*remote.SubscribeLogsReply{log})
if len(srv.sent) != 1 {
t.Error("expected the sent slice to have the log present")
}
log = createLog()
log.Topics = []*types2.H256{topic1H256}
agg.distributeLogs([]*remote.SubscribeLogsReply{log})
if len(srv.sent) != 2 {
t.Error("expected any topic to be allowed through the filter")
}
log = createLog()
log.Address = address160
agg.distributeLogs([]*remote.SubscribeLogsReply{log})
if len(srv.sent) != 3 {
t.Error("expected any address to be allowed through the filter")
}
}
func TestLogsFilter_TopicFilter_OnlyAllowsThatTopicThrough(t *testing.T) {
events := NewEvents()
agg := NewLogsFilterAggregator(events)
srv := &testServer{
received: make(chan *remote.LogsFilterRequest, 256),
receiveCompleted: make(chan struct{}, 1),
sent: make([]*remote.SubscribeLogsReply, 0),
ctx: context.Background(),
ServerStream: nil,
}
req1 := &remote.LogsFilterRequest{
AllAddresses: true, // need to allow all addresses on the request else it will filter on them
Addresses: nil,
AllTopics: false,
Topics: []*types2.H256{topic1H256},
}
srv.received <- req1
go func() {
err := agg.subscribeLogs(srv)
if err != nil {
t.Error(err)
}
}()
<-srv.receiveCompleted
// now see if a log would be sent or not
log := createLog()
agg.distributeLogs([]*remote.SubscribeLogsReply{log})
if len(srv.sent) != 0 {
t.Error("the sent slice should be empty as the topic didn't match")
}
log = createLog()
log.Topics = []*types2.H256{topic1H256}
agg.distributeLogs([]*remote.SubscribeLogsReply{log})
if len(srv.sent) != 1 {
t.Error("expected the log to be distributed as the topic matched")
}
}
func TestLogsFilter_AddressFilter_OnlyAllowsThatAddressThrough(t *testing.T) {
events := NewEvents()
agg := NewLogsFilterAggregator(events)
srv := &testServer{
received: make(chan *remote.LogsFilterRequest, 256),
receiveCompleted: make(chan struct{}, 1),
sent: make([]*remote.SubscribeLogsReply, 0),
ctx: context.Background(),
ServerStream: nil,
}
req1 := &remote.LogsFilterRequest{
AllAddresses: false,
Addresses: []*types2.H160{address160},
AllTopics: true,
Topics: []*types2.H256{},
}
srv.received <- req1
go func() {
err := agg.subscribeLogs(srv)
if err != nil {
t.Error(err)
}
}()
<-srv.receiveCompleted
// now see if a log would be sent or not
log := createLog()
agg.distributeLogs([]*remote.SubscribeLogsReply{log})
if len(srv.sent) != 0 {
t.Error("the sent slice should be empty as the address didn't match")
}
log = createLog()
log.Address = address160
agg.distributeLogs([]*remote.SubscribeLogsReply{log})
if len(srv.sent) != 1 {
t.Error("expected the log to be distributed as the address matched")
}
}

View File

@ -397,9 +397,11 @@ func (ff *Filters) SubscribeLogs(out chan *types.Log, crit filters.FilterCriteri
}
f.topicsOriginal = crit.Topics
ff.logsSubs.addLogsFilters(f)
// if any filter in the aggregate needs all addresses or all topics then the global log subscription needs to
// allow all addresses or topics through
lfr := &remote.LogsFilterRequest{
AllAddresses: ff.logsSubs.aggLogsFilter.allAddrs == 1,
AllTopics: ff.logsSubs.aggLogsFilter.allTopics == 1,
AllAddresses: ff.logsSubs.aggLogsFilter.allAddrs >= 1,
AllTopics: ff.logsSubs.aggLogsFilter.allTopics >= 1,
}
addresses, topics := ff.logsSubs.getAggMaps()
@ -430,9 +432,11 @@ func (ff *Filters) loadLogsRequester() any {
func (ff *Filters) UnsubscribeLogs(id LogsSubID) bool {
isDeleted := ff.logsSubs.removeLogsFilter(id)
// if any filters in the aggregate need all addresses or all topics then the request to the central
// log subscription needs to honour this
lfr := &remote.LogsFilterRequest{
AllAddresses: ff.logsSubs.aggLogsFilter.allAddrs == 1,
AllTopics: ff.logsSubs.aggLogsFilter.allTopics == 1,
AllAddresses: ff.logsSubs.aggLogsFilter.allAddrs >= 1,
AllTopics: ff.logsSubs.aggLogsFilter.allTopics >= 1,
}
addresses, topics := ff.logsSubs.getAggMaps()
@ -539,21 +543,6 @@ func (ff *Filters) OnNewTx(reply *txpool.OnAddReply) {
}
func (ff *Filters) OnNewLogs(reply *remote.SubscribeLogsReply) {
lg := &types.Log{
Address: gointerfaces.ConvertH160toAddress(reply.Address),
Data: reply.Data,
BlockNumber: reply.BlockNumber,
TxHash: gointerfaces.ConvertH256ToHash(reply.TransactionHash),
TxIndex: uint(reply.TransactionIndex),
BlockHash: gointerfaces.ConvertH256ToHash(reply.BlockHash),
Index: uint(reply.LogIndex),
Removed: reply.Removed,
}
t := make([]common.Hash, 0)
for _, v := range reply.Topics {
t = append(t, gointerfaces.ConvertH256ToHash(v))
}
lg.Topics = t
ff.logsSubs.distributeLog(reply)
}

View File

@ -0,0 +1,338 @@
package rpchelper
import (
"context"
"testing"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
types2 "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/filters"
)
func createLog() *remote.SubscribeLogsReply {
return &remote.SubscribeLogsReply{
Address: gointerfaces.ConvertAddressToH160([20]byte{}),
BlockHash: gointerfaces.ConvertHashToH256([32]byte{}),
BlockNumber: 0,
Data: []byte{},
LogIndex: 0,
Topics: []*types2.H256{gointerfaces.ConvertHashToH256([32]byte{99, 99})},
TransactionHash: gointerfaces.ConvertHashToH256([32]byte{}),
TransactionIndex: 0,
Removed: false,
}
}
var (
address1 = common.HexToAddress("0xdac17f958d2ee523a2206206994597c13d831ec7")
address1H160 = gointerfaces.ConvertAddressToH160(address1)
topic1 = common.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef")
topic1H256 = gointerfaces.ConvertHashToH256(topic1)
)
func TestFilters_SingleSubscription_OnlyTopicsSubscribedAreBroadcast(t *testing.T) {
f := New(context.TODO(), nil, nil, nil, func() {})
subbedTopic := common.BytesToHash([]byte{10, 20})
criteria := filters.FilterCriteria{
Addresses: nil,
Topics: [][]common.Hash{{subbedTopic}},
}
outChan := make(chan *types.Log, 1)
f.SubscribeLogs(outChan, criteria)
// now create a log for some other topic and distribute it
log := createLog()
f.OnNewLogs(log)
if len(outChan) != 0 {
t.Error("expected the subscription channel to be empty")
}
// now a log that the subscription cares about
log.Topics = []*types2.H256{gointerfaces.ConvertHashToH256(subbedTopic)}
f.OnNewLogs(log)
if len(outChan) != 1 {
t.Error("expected a message in the channel for the subscribed topic")
}
}
func TestFilters_SingleSubscription_EmptyTopicsInCriteria_OnlyTopicsSubscribedAreBroadcast(t *testing.T) {
f := New(context.TODO(), nil, nil, nil, func() {})
var nilTopic common.Hash
subbedTopic := common.BytesToHash([]byte{10, 20})
criteria := filters.FilterCriteria{
Addresses: nil,
Topics: [][]common.Hash{{nilTopic, subbedTopic, nilTopic}},
}
outChan := make(chan *types.Log, 1)
f.SubscribeLogs(outChan, criteria)
// now create a log for some other topic and distribute it
log := createLog()
f.OnNewLogs(log)
if len(outChan) != 0 {
t.Error("expected the subscription channel to be empty")
}
// now a log that the subscription cares about
log.Topics = []*types2.H256{gointerfaces.ConvertHashToH256(subbedTopic)}
f.OnNewLogs(log)
if len(outChan) != 1 {
t.Error("expected a message in the channel for the subscribed topic")
}
}
func TestFilters_TwoSubscriptionsWithDifferentCriteria(t *testing.T) {
f := New(context.TODO(), nil, nil, nil, func() {})
criteria1 := filters.FilterCriteria{
Addresses: nil,
Topics: [][]common.Hash{},
}
criteria2 := filters.FilterCriteria{
Addresses: nil,
Topics: [][]common.Hash{{topic1}},
}
chan1 := make(chan *types.Log, 256)
chan2 := make(chan *types.Log, 256)
f.SubscribeLogs(chan1, criteria1)
f.SubscribeLogs(chan2, criteria2)
// now create a log for some other topic and distribute it
log := createLog()
f.OnNewLogs(log)
if len(chan1) != 1 {
t.Error("expected channel 1 to receive the log message, no filters")
}
if len(chan2) != 0 {
t.Error("expected channel 2 to be empty, it has a topic filter")
}
// now a log that the subscription cares about
log.Topics = []*types2.H256{gointerfaces.ConvertHashToH256(topic1)}
f.OnNewLogs(log)
if len(chan1) != 2 {
t.Error("expected the second log to be in the channel with no filters")
}
if len(chan2) != 1 {
t.Error("expected the channel with filters to receive the message as the filter matches")
}
}
func TestFilters_ThreeSubscriptionsWithDifferentCriteria(t *testing.T) {
f := New(context.TODO(), nil, nil, nil, func() {})
criteria1 := filters.FilterCriteria{
Addresses: nil,
Topics: [][]common.Hash{},
}
criteria2 := filters.FilterCriteria{
Addresses: nil,
Topics: [][]common.Hash{{topic1}},
}
criteria3 := filters.FilterCriteria{
Addresses: []common.Address{common.HexToAddress(address1.String())},
Topics: [][]common.Hash{},
}
chan1 := make(chan *types.Log, 256)
chan2 := make(chan *types.Log, 256)
chan3 := make(chan *types.Log, 256)
f.SubscribeLogs(chan1, criteria1)
f.SubscribeLogs(chan2, criteria2)
f.SubscribeLogs(chan3, criteria3)
// now create a log for some other topic and distribute it
log := createLog()
f.OnNewLogs(log)
if len(chan1) != 1 {
t.Error("expected channel 1 to receive the log message, no filters")
}
if len(chan2) != 0 {
t.Error("expected channel 2 to be empty, it has a topic filter")
}
if len(chan3) != 0 {
t.Error("expected channel 3 to be empty as the address doesn't match")
}
// now a log that the subscription cares about
var a common.Address
a.SetBytes(address1.Bytes())
log.Address = gointerfaces.ConvertAddressToH160(a)
f.OnNewLogs(log)
if len(chan1) != 2 {
t.Error("expected the second log to be in the channel with no filters")
}
if len(chan2) != 0 {
t.Error("expected the second channel to still be empty as no log has the correct topic yet")
}
if len(chan3) != 1 {
t.Error("expected the third channel to have 1 entry as the previous log address matched")
}
log = createLog()
log.Topics = []*types2.H256{topic1H256}
f.OnNewLogs(log)
if len(chan1) != 3 {
t.Error("expected the third log to be in the channel with no filters")
}
if len(chan2) != 1 {
t.Error("expected the third channel to contain a log as the topic matched")
}
if len(chan3) != 1 {
t.Error("expected the third channel to still have 1 as the address didn't match in the third log")
}
}
func TestFilters_SubscribeLogsGeneratesCorrectLogFilterRequest(t *testing.T) {
var lastFilterRequest *remote.LogsFilterRequest
loadRequester := func(r *remote.LogsFilterRequest) error {
lastFilterRequest = r
return nil
}
f := New(context.TODO(), nil, nil, nil, func() {})
f.logsRequestor.Store(loadRequester)
// first request has no filters
chan1 := make(chan *types.Log, 1)
criteria1 := filters.FilterCriteria{
Addresses: []common.Address{},
Topics: [][]common.Hash{},
}
id1 := f.SubscribeLogs(chan1, criteria1)
// request should have all addresses and topics enabled
if lastFilterRequest.AllAddresses == false {
t.Error("1: expected all addresses to be true")
}
if lastFilterRequest.AllTopics == false {
t.Error("1: expected all topics to be true")
}
// second request filters on an address
chan2 := make(chan *types.Log, 1)
criteria2 := filters.FilterCriteria{
Addresses: []common.Address{address1},
Topics: [][]common.Hash{},
}
id2 := f.SubscribeLogs(chan2, criteria2)
// request should have all addresses and all topics still and the new address
if lastFilterRequest.AllAddresses == false {
t.Error("2: expected all addresses to be true")
}
if lastFilterRequest.AllTopics == false {
t.Error("2: expected all topics to be true")
}
if len(lastFilterRequest.Addresses) != 1 && lastFilterRequest.Addresses[0] != address1H160 {
t.Error("2: expected the address to match the last request")
}
// third request filters on topic
chan3 := make(chan *types.Log, 1)
criteria3 := filters.FilterCriteria{
Addresses: []common.Address{},
Topics: [][]common.Hash{{topic1}},
}
id3 := f.SubscribeLogs(chan3, criteria3)
// request should have all addresses and all topics as well as the previous address and new topic
if lastFilterRequest.AllAddresses == false {
t.Error("3: expected all addresses to be true")
}
if lastFilterRequest.AllTopics == false {
t.Error("3: expected all topics to be true")
}
if len(lastFilterRequest.Addresses) != 1 && lastFilterRequest.Addresses[0] != address1H160 {
t.Error("3: expected the address to match the previous request")
}
if len(lastFilterRequest.Topics) != 1 && lastFilterRequest.Topics[0] != topic1H256 {
t.Error("3: expected the topics to match the last request")
}
// now start unsubscribing to check the state of things
// unsubscribing the first filter should leave us with all topics and all addresses 2 because request 2 and 3
// have empty addresses and topics between the two of them. Effectively the state should be the same as the
// subscription in step 3
f.UnsubscribeLogs(id1)
if lastFilterRequest.AllAddresses == false {
t.Error("4: expected all addresses to be true")
}
if lastFilterRequest.AllTopics == false {
t.Error("4: expected all topics to be true")
}
if len(lastFilterRequest.Addresses) != 1 && lastFilterRequest.Addresses[0] != address1H160 {
t.Error("4: expected an address to be present")
}
if len(lastFilterRequest.Topics) != 1 && lastFilterRequest.Topics[0] != topic1H256 {
t.Error("4: expected a topic to be present")
}
// unsubscribing the second filter should remove the all topics filter as the only subscription remaining
// specifies a topic. All addresses should be present still. The state should represent the single
// subscription in step 3
f.UnsubscribeLogs(id2)
if lastFilterRequest.AllAddresses == false {
t.Error("5: expected all addresses to be true")
}
if lastFilterRequest.AllTopics == true {
t.Error("5: expected all topics to be false")
}
if len(lastFilterRequest.Addresses) != 0 {
t.Error("5: expected addresses to be empty")
}
if len(lastFilterRequest.Topics) != 1 && lastFilterRequest.Topics[0] != topic1H256 {
t.Error("5: expected a topic to be present")
}
// unsubscribing the last filter should leave us with false for the all addresses and all topics
// and nothing in the address or topics lists
f.UnsubscribeLogs(id3)
if lastFilterRequest.AllAddresses == true {
t.Error("5: expected all addresses to be false")
}
if lastFilterRequest.AllTopics == true {
t.Error("5: expected all topics to be false")
}
if len(lastFilterRequest.Addresses) != 0 {
t.Error("5: expected addresses to be empty")
}
if len(lastFilterRequest.Topics) != 0 {
t.Error("5: expected topics to be empty")
}
}