mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-22 03:30:35 +00:00
Add Mutex on Top of p2p Feeds Map (#321)
* add mutex on top of p2p feeds map * lock read too * add concurrent write test and enable race detector * restrict race detection to concurrent feed map write test * gazelle fix * increase ram and I/O in bazel's local_resources * reverted local_resources changes * address review comments * do not disable sandboxing on travis * address review comments
This commit is contained in:
parent
8d42418d0d
commit
b8fe4228af
@ -1,9 +1,5 @@
|
||||
startup --host_jvm_args=-Xmx500m --host_jvm_args=-Xms500m
|
||||
|
||||
# Disable sandboxing since it may fail inside of Travis' containers because the
|
||||
# mount system call is not permitted.
|
||||
build --spawn_strategy=standalone --genrule_strategy=standalone
|
||||
|
||||
# Remote caching over Google Cloud Storage
|
||||
# TODO(#282): Enable remote caching/execution
|
||||
#build:remote --remote_http_cache=https://storage.googleapis.com/prysmatic-bazel-cache
|
||||
@ -12,7 +8,7 @@ build --spawn_strategy=standalone --genrule_strategy=standalone
|
||||
# Set some build options for travis container.
|
||||
build --local_resources=1536,1.5,0.5
|
||||
build --noshow_progress
|
||||
build --verbose_failures
|
||||
build --verbose_failures
|
||||
build --sandbox_debug
|
||||
build --test_output=errors
|
||||
build --flaky_test_attempts=5
|
||||
|
@ -2,8 +2,10 @@ load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
|
||||
|
||||
http_archive(
|
||||
name = "io_bazel_rules_go",
|
||||
urls = ["https://github.com/bazelbuild/rules_go/releases/download/0.13.0/rules_go-0.13.0.tar.gz"],
|
||||
sha256 = "ba79c532ac400cefd1859cbc8a9829346aa69e3b99482cd5a54432092cbc3933",
|
||||
# in order to be able to enable race detection we need to use a version
|
||||
# < 0.13.0 until this bug is fixed: https://github.com/bazelbuild/rules_go/issues/1592
|
||||
urls = ["https://github.com/bazelbuild/rules_go/releases/download/0.12.1/rules_go-0.12.1.tar.gz"],
|
||||
sha256 = "8b68d0630d63d95dacc0016c3bb4b76154fe34fca93efd65d1c366de3fcb4294",
|
||||
)
|
||||
|
||||
http_archive(
|
||||
|
@ -54,3 +54,14 @@ go_test(
|
||||
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
# by default gazelle tries to add all the test files to the first
|
||||
# go_test; we want to treat feed_concurrent_test.go differently
|
||||
# gazelle:exclude feed_concurrent_test.go
|
||||
|
||||
go_test(
|
||||
name = "go_feed_concurrent_write_test",
|
||||
srcs = ["feed_concurrent_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
race = "on",
|
||||
)
|
||||
|
@ -35,8 +35,11 @@ func (s *Server) Feed(msg interface{}) *event.Feed {
|
||||
t = reflect.TypeOf(msg)
|
||||
}
|
||||
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
if s.feeds[t] == nil {
|
||||
s.feeds[t] = new(event.Feed)
|
||||
}
|
||||
|
||||
return s.feeds[t]
|
||||
}
|
||||
|
14
shared/p2p/feed_concurrent_test.go
Normal file
14
shared/p2p/feed_concurrent_test.go
Normal file
@ -0,0 +1,14 @@
|
||||
package p2p
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestFeed_ConcurrentWrite(t *testing.T) {
|
||||
s, err := NewServer()
|
||||
if err != nil {
|
||||
t.Fatalf("could not create server %v", err)
|
||||
}
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
go s.Feed("a")
|
||||
}
|
||||
}
|
@ -12,6 +12,7 @@ package p2p
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/event"
|
||||
"github.com/golang/protobuf/proto"
|
||||
@ -33,6 +34,7 @@ type Sender interface {
|
||||
type Server struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
mutex *sync.Mutex
|
||||
feeds map[reflect.Type]*event.Feed
|
||||
host host.Host
|
||||
gsub *floodsub.PubSub
|
||||
@ -60,6 +62,7 @@ func NewServer() (*Server, error) {
|
||||
feeds: make(map[reflect.Type]*event.Feed),
|
||||
host: host,
|
||||
gsub: gsub,
|
||||
mutex: &sync.Mutex{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -82,6 +83,7 @@ func TestSubscribeToTopic(t *testing.T) {
|
||||
gsub: gsub,
|
||||
host: h,
|
||||
feeds: make(map[reflect.Type]*event.Feed),
|
||||
mutex: &sync.Mutex{},
|
||||
}
|
||||
|
||||
feed := s.Feed(pb.CollationBodyRequest{})
|
||||
|
Loading…
Reference in New Issue
Block a user