From 09ca832a5fb2fb878faf440d45e07e3da59811fb Mon Sep 17 00:00:00 2001 From: Yutaro Mori Date: Thu, 26 Jul 2018 01:57:44 +0900 Subject: [PATCH] beacon: Beginning the Sync Service for the Beacon Chain (#322) --- .gitignore | 7 +- beacon-chain/blockchain/core.go | 1 - beacon-chain/blockchain/service.go | 18 +- beacon-chain/blockchain/service_test.go | 4 +- beacon-chain/network/BUILD.bazel | 12 ++ beacon-chain/network/service.go | 62 +++++++ beacon-chain/node/BUILD.bazel | 2 + beacon-chain/node/node.go | 32 ++++ beacon-chain/sync/BUILD.bazel | 24 +++ beacon-chain/sync/service.go | 134 ++++++++++++++ beacon-chain/sync/service_test.go | 231 ++++++++++++++++++++++++ beacon-chain/types/BUILD.bazel | 2 + beacon-chain/types/block.go | 11 ++ shared/testutil/BUILD.bazel | 9 + shared/testutil/log.go | 37 ++++ 15 files changed, 579 insertions(+), 7 deletions(-) create mode 100644 beacon-chain/network/BUILD.bazel create mode 100644 beacon-chain/network/service.go create mode 100644 beacon-chain/sync/BUILD.bazel create mode 100644 beacon-chain/sync/service.go create mode 100644 beacon-chain/sync/service_test.go create mode 100644 shared/testutil/BUILD.bazel create mode 100644 shared/testutil/log.go diff --git a/.gitignore b/.gitignore index d213aedcd..29ce9bf8b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,8 @@ # Ignore bazel directories bazel-* -build + .DS_Store -.gitattributes \ No newline at end of file +.gitattributes + +# delve debugger output (not sure how to get rid of these) +**/debug.test diff --git a/beacon-chain/blockchain/core.go b/beacon-chain/blockchain/core.go index fb2c2f272..58b64508e 100644 --- a/beacon-chain/blockchain/core.go +++ b/beacon-chain/blockchain/core.go @@ -142,7 +142,6 @@ func (b *BeaconChain) persist() error { // computeNewActiveState computes a new active state for every beacon block. func (b *BeaconChain) computeNewActiveState(seed common.Hash) (*types.ActiveState, error) { - attesters, proposer, err := b.getAttestersProposer(seed) if err != nil { return nil, err diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index f9176b4a0..0243c3251 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -2,6 +2,7 @@ package blockchain import ( "context" + "hash" "github.com/prysmaticlabs/prysm/beacon-chain/database" "github.com/prysmaticlabs/prysm/beacon-chain/powchain" @@ -31,7 +32,7 @@ func NewChainService(ctx context.Context, beaconDB *database.BeaconDB, web3Servi // Start a blockchain service's main event loop. func (c *ChainService) Start() { - log.Infof("Starting blockchain service") + log.Infof("Starting service") beaconChain, err := NewBeaconChain(c.beaconDB.DB()) if err != nil { @@ -44,10 +45,23 @@ func (c *ChainService) Start() { // Stop the blockchain service's main event loop and associated goroutines. func (c *ChainService) Stop() error { defer c.cancel() - log.Info("Stopping blockchain service") + log.Info("Stopping service") return nil } +// ProcessBlock accepts a new block for inclusion in the chain. +func (c *ChainService) ProcessBlock(b *types.Block) error { + c.latestBeaconBlock <- b + return nil +} + +// ContainsBlock checks if a block for the hash exists in the chain. +// This method must be safe to call from a goroutine +func (c *ChainService) ContainsBlock(h hash.Hash) bool { + // TODO + return false +} + // updateActiveState receives a beacon block, computes a new active state and writes it to db. func (c *ChainService) updateActiveState() { for { diff --git a/beacon-chain/blockchain/service_test.go b/beacon-chain/blockchain/service_test.go index 8063924e7..78fe6df04 100644 --- a/beacon-chain/blockchain/service_test.go +++ b/beacon-chain/blockchain/service_test.go @@ -46,7 +46,7 @@ func TestStartStop(t *testing.T) { } msg = hook.AllEntries()[1].Message - want = "Starting blockchain service" + want = "Starting service" if msg != want { t.Errorf("incorrect log, expected %s, got %s", want, msg) } @@ -58,7 +58,7 @@ func TestStartStop(t *testing.T) { } msg = hook.AllEntries()[3].Message - want = "Stopping blockchain service" + want = "Stopping service" if msg != want { t.Errorf("incorrect log, expected %s, got %s", want, msg) } diff --git a/beacon-chain/network/BUILD.bazel b/beacon-chain/network/BUILD.bazel new file mode 100644 index 000000000..db27f9401 --- /dev/null +++ b/beacon-chain/network/BUILD.bazel @@ -0,0 +1,12 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["service.go"], + importpath = "github.com/prysmaticlabs/prysm/beacon-chain/network", + visibility = ["//visibility:public"], + deps = [ + "//beacon-chain/types:go_default_library", + "@com_github_sirupsen_logrus//:go_default_library", + ], +) diff --git a/beacon-chain/network/service.go b/beacon-chain/network/service.go new file mode 100644 index 000000000..8c7f2faa6 --- /dev/null +++ b/beacon-chain/network/service.go @@ -0,0 +1,62 @@ +package network + +import ( + "hash" + + "github.com/prysmaticlabs/prysm/beacon-chain/types" + "github.com/sirupsen/logrus" +) + +var log = logrus.WithField("prefix", "network") + +// Service is the middleware between the application-agnostic p2p service and subscribers to the network. +type Service struct { + syncService SyncService +} + +// SyncService is the interface for the sync service. +type SyncService interface { + ReceiveBlockHash(hash.Hash) + ReceiveBlock(*types.Block) error +} + +// NewNetworkService instantiates a new network service. +func NewNetworkService() *Service { + return &Service{} +} + +// SetSyncService sets a concrete value for the sync service. +func (ns *Service) SetSyncService(ss SyncService) { + ns.syncService = ss +} + +// Start launches the service's goroutine. +func (ns *Service) Start() { + log.Info("Starting service") + go run() +} + +// Stop kills the service's goroutine (unimplemented). +func (ns *Service) Stop() error { + log.Info("Stopping service") + return nil +} + +// BroadcastBlockHash sends the block hash to other peers in the network. +func (ns *Service) BroadcastBlockHash(h hash.Hash) error { + return nil +} + +// BroadcastBlock sends the block to other peers in the network. +func (ns *Service) BroadcastBlock(b *types.Block) error { + return nil +} + +// RequestBlock requests the contents of the block given the block hash. +func (ns *Service) RequestBlock(hash.Hash) error { + return nil +} + +func run() { + select {} +} diff --git a/beacon-chain/node/BUILD.bazel b/beacon-chain/node/BUILD.bazel index 5dfeac491..0621d8f98 100644 --- a/beacon-chain/node/BUILD.bazel +++ b/beacon-chain/node/BUILD.bazel @@ -8,7 +8,9 @@ go_library( deps = [ "//beacon-chain/blockchain:go_default_library", "//beacon-chain/database:go_default_library", + "//beacon-chain/network:go_default_library", "//beacon-chain/powchain:go_default_library", + "//beacon-chain/sync:go_default_library", "//beacon-chain/utils:go_default_library", "//shared:go_default_library", "//shared/cmd:go_default_library", diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index e3721c9c3..005b97e98 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -11,7 +11,9 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/beacon-chain/database" + "github.com/prysmaticlabs/prysm/beacon-chain/network" "github.com/prysmaticlabs/prysm/beacon-chain/powchain" + rbcSync "github.com/prysmaticlabs/prysm/beacon-chain/sync" "github.com/prysmaticlabs/prysm/beacon-chain/utils" "github.com/prysmaticlabs/prysm/shared" "github.com/prysmaticlabs/prysm/shared/cmd" @@ -57,6 +59,14 @@ func New(ctx *cli.Context) (*BeaconNode, error) { return nil, err } + if err := beacon.registerNetworkService(); err != nil { + return nil, err + } + + if err := beacon.registerSyncService(); err != nil { + return nil, err + } + return beacon, nil } @@ -140,3 +150,25 @@ func (b *BeaconNode) registerPOWChainService() error { } return b.services.RegisterService(web3Service) } + +func (b *BeaconNode) registerNetworkService() error { + networkService := network.NewNetworkService() + + return b.services.RegisterService(networkService) +} + +func (b *BeaconNode) registerSyncService() error { + var chainService *blockchain.ChainService + b.services.FetchService(&chainService) + + var networkService *network.Service + b.services.FetchService(&networkService) + + syncService := rbcSync.NewSyncService(context.Background(), rbcSync.DefaultConfig()) + syncService.SetChainService(chainService) + syncService.SetNetworkService(networkService) + + networkService.SetSyncService(syncService) + + return b.services.RegisterService(syncService) +} diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel new file mode 100644 index 000000000..08e177dcd --- /dev/null +++ b/beacon-chain/sync/BUILD.bazel @@ -0,0 +1,24 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["service.go"], + importpath = "github.com/prysmaticlabs/prysm/beacon-chain/sync", + visibility = ["//visibility:public"], + deps = [ + "//beacon-chain/types:go_default_library", + "@com_github_sirupsen_logrus//:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["service_test.go"], + embed = [":go_default_library"], + deps = [ + "//beacon-chain/types:go_default_library", + "//shared/testutil:go_default_library", + "@com_github_sirupsen_logrus//hooks/test:go_default_library", + "@org_golang_x_crypto//blake2b:go_default_library", + ], +) diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go new file mode 100644 index 000000000..7ac242c8d --- /dev/null +++ b/beacon-chain/sync/service.go @@ -0,0 +1,134 @@ +package sync + +import ( + "context" + "hash" + + "github.com/prysmaticlabs/prysm/beacon-chain/types" + "github.com/sirupsen/logrus" +) + +var log = logrus.WithField("prefix", "sync") + +// Service is the gateway and the bridge between the p2p network and the local beacon chain. +// In broad terms, a new block is synced in 4 steps: +// 1. Receive a block hash from a peer +// 2. Request the block for the hash from the network +// 3. Receive the block +// 4. Forward block to the beacon service for full validation +// +// In addition, Service will handle the following responsibilities: +// * Decide which messages are forwarded to other peers +// * Filter redundant data and unwanted data +// * Drop peers that send invalid data +// * Trottle incoming requests +type Service struct { + ctx context.Context + cancel context.CancelFunc + networkService NetworkService + chainService ChainService + hashBuf chan hash.Hash + blockBuf chan *types.Block +} + +// Config allows the channel's buffer sizes to be changed +type Config struct { + HashBufferSize int + BlockBufferSize int +} + +// DefaultConfig provides the default configuration for a sync service +func DefaultConfig() Config { + return Config{100, 100} +} + +// NetworkService is the interface for the p2p network. +type NetworkService interface { + BroadcastBlockHash(hash.Hash) error + BroadcastBlock(*types.Block) error + RequestBlock(hash.Hash) error +} + +// ChainService is the interface for the local beacon chain. +type ChainService interface { + ProcessBlock(*types.Block) error + ContainsBlock(hash.Hash) bool +} + +// NewSyncService accepts a context and returns a new Service. +func NewSyncService(ctx context.Context, cfg Config) *Service { + ctx, cancel := context.WithCancel(ctx) + return &Service{ + ctx: ctx, + cancel: cancel, + hashBuf: make(chan hash.Hash, cfg.HashBufferSize), + blockBuf: make(chan *types.Block, cfg.BlockBufferSize), + } +} + +// SetNetworkService sets a concrete value for the p2p layer. +func (ss *Service) SetNetworkService(ps NetworkService) { + ss.networkService = ps +} + +// SetChainService sets a concrete value for the local beacon chain. +func (ss *Service) SetChainService(cs ChainService) { + ss.chainService = cs +} + +// Start begins the block processing goroutine. +func (ss *Service) Start() { + log.Info("Starting service") + go run(ss.ctx.Done(), ss.hashBuf, ss.blockBuf, ss.networkService, ss.chainService) +} + +// Stop kills the block processing goroutine, but does not wait until the goroutine exits. +func (ss *Service) Stop() error { + log.Info("Stopping service") + ss.cancel() + return nil +} + +// ReceiveBlockHash accepts a block hash. +// New hashes are forwarded to other peers in the network (unimplemented), and +// the contents of the block are requested if the local chain doesn't have the block. +func (ss *Service) ReceiveBlockHash(h hash.Hash) { + if ss.chainService.ContainsBlock(h) { + return + } + + ss.hashBuf <- h + ss.networkService.BroadcastBlockHash(h) +} + +// ReceiveBlock accepts a block to potentially be included in the local chain. +// The service will filter blocks that have not been requested (unimplemented). +func (ss *Service) ReceiveBlock(b *types.Block) error { + h, err := b.Hash() + if err != nil { + return err + } + + if ss.chainService.ContainsBlock(h) { + return nil + } + + ss.blockBuf <- b + ss.networkService.BroadcastBlock(b) + + return nil +} + +func run(done <-chan struct{}, hashBuf <-chan hash.Hash, blockBuf <-chan *types.Block, ps NetworkService, cs ChainService) { + for { + select { + case <-done: + log.Infof("exiting goroutine") + return + case h := <-hashBuf: + ps.RequestBlock(h) + case b := <-blockBuf: + cs.ProcessBlock(b) + } + } +} diff --git a/beacon-chain/sync/service_test.go b/beacon-chain/sync/service_test.go new file mode 100644 index 000000000..64263afaf --- /dev/null +++ b/beacon-chain/sync/service_test.go @@ -0,0 +1,231 @@ +package sync + +import ( + "bytes" + "context" + "fmt" + "hash" + "testing" + + "github.com/prysmaticlabs/prysm/beacon-chain/types" + "github.com/prysmaticlabs/prysm/shared/testutil" + logTest "github.com/sirupsen/logrus/hooks/test" + "golang.org/x/crypto/blake2b" +) + +var testLog = log.WithField("prefix", "sync_test") + +type MockNetworkService struct{} + +func (ns *MockNetworkService) BroadcastBlockHash(h hash.Hash) error { + testLog.Infof("broadcasting hash: %x", h.Sum(nil)) + return nil +} + +func (ns *MockNetworkService) BroadcastBlock(b *types.Block) error { + h, err := b.Hash() + if err != nil { + return err + } + + testLog.Infof("broadcasting block: %x", h.Sum(nil)) + return nil +} + +func (ns *MockNetworkService) RequestBlock(h hash.Hash) error { + testLog.Infof("requesting block: %x", h.Sum(nil)) + return nil +} + +// MockChainService implements a simplified local chain that stores blocks in a slice +type MockChainService struct { + processedHashes []hash.Hash +} + +func (ms *MockChainService) ProcessBlock(b *types.Block) error { + h, err := b.Hash() + if err != nil { + return err + } + + testLog.Infof("forwarding block: %x", h.Sum(nil)) + if ms.processedHashes == nil { + ms.processedHashes = []hash.Hash{} + } + + ms.processedHashes = append(ms.processedHashes, h) + return nil +} + +func (ms *MockChainService) ContainsBlock(h hash.Hash) bool { + for _, h1 := range ms.processedHashes { + if bytes.Equal(h.Sum(nil), h1.Sum(nil)) { + return true + } + } + + return false +} + +func TestProcessBlockHash(t *testing.T) { + hook := logTest.NewGlobal() + + // set the channel's buffer to 0 to make channel interactions blocking + cfg := Config{HashBufferSize: 0, BlockBufferSize: 0} + ss := NewSyncService(context.Background(), cfg) + + ns := MockNetworkService{} + cs := MockChainService{} + ss.SetNetworkService(&ns) + ss.SetChainService(&cs) + + exitRoutine := make(chan bool) + + go func() { + run(ss.ctx.Done(), ss.hashBuf, ss.blockBuf, &ns, &cs) + exitRoutine <- true + }() + + h, err := blake2b.New256(nil) + if err != nil { + t.Errorf("failed to intialize hash: %v", err) + } + + // if a new hash is processed + ss.ReceiveBlockHash(h) + + ss.cancel() + <-exitRoutine + + // sync service requests the contents of the block and broadcasts the hash to peers + testutil.AssertLogsContain(t, hook, fmt.Sprintf("requesting block: %x", h.Sum(nil))) + testutil.AssertLogsContain(t, hook, fmt.Sprintf("broadcasting hash: %x", h.Sum(nil))) +} + +func TestProcessBlock(t *testing.T) { + hook := logTest.NewGlobal() + + cfg := Config{HashBufferSize: 0, BlockBufferSize: 0} + ss := NewSyncService(context.Background(), cfg) + + ns := MockNetworkService{} + cs := MockChainService{} + + ss.SetNetworkService(&ns) + ss.SetChainService(&cs) + + exitRoutine := make(chan bool) + + go func() { + run(ss.ctx.Done(), ss.hashBuf, ss.blockBuf, &ns, &cs) + exitRoutine <- true + }() + + b := types.NewBlock(0) + h, err := b.Hash() + if err != nil { + t.Fatal(err) + } + + // if the hash and the block are processed in order + ss.ReceiveBlockHash(h) + ss.ReceiveBlock(b) + ss.cancel() + <-exitRoutine + + // sync service broadcasts the block and forwards the block to to the local chain + testutil.AssertLogsContain(t, hook, fmt.Sprintf("broadcasting block: %x", h.Sum(nil))) + testutil.AssertLogsContain(t, hook, fmt.Sprintf("forwarding block: %x", h.Sum(nil))) +} + +func TestProcessMultipleBlocks(t *testing.T) { + hook := logTest.NewGlobal() + + cfg := Config{HashBufferSize: 0, BlockBufferSize: 0} + ss := NewSyncService(context.Background(), cfg) + + ns := MockNetworkService{} + cs := MockChainService{} + + ss.SetNetworkService(&ns) + ss.SetChainService(&cs) + + exitRoutine := make(chan bool) + + go func() { + run(ss.ctx.Done(), ss.hashBuf, ss.blockBuf, &ns, &cs) + exitRoutine <- true + }() + + b1 := types.NewBlock(0) + h1, err := b1.Hash() + if err != nil { + t.Fatal(err) + } + + b2 := types.NewBlock(1) + h2, err := b2.Hash() + if err != nil { + t.Fatal(err) + } + + if bytes.Equal(h1.Sum(nil), h2.Sum(nil)) { + t.Fatalf("two blocks should not have the same hash:\n%x\n%x", h1.Sum(nil), h2.Sum(nil)) + } + + // if two different blocks are submitted + ss.ReceiveBlockHash(h1) + ss.ReceiveBlock(b1) + ss.ReceiveBlockHash(h2) + ss.ReceiveBlock(b2) + ss.cancel() + <-exitRoutine + + // both blocks are processed + testutil.AssertLogsContain(t, hook, fmt.Sprintf("broadcasting block: %x", h1.Sum(nil))) + testutil.AssertLogsContain(t, hook, fmt.Sprintf("forwarding block: %x", h1.Sum(nil))) + testutil.AssertLogsContain(t, hook, fmt.Sprintf("broadcasting block: %x", h2.Sum(nil))) + testutil.AssertLogsContain(t, hook, fmt.Sprintf("forwarding block: %x", h2.Sum(nil))) +} + +func TestProcessSameBlock(t *testing.T) { + hook := logTest.NewGlobal() + + cfg := Config{HashBufferSize: 0, BlockBufferSize: 0} + ss := NewSyncService(context.Background(), cfg) + + ns := MockNetworkService{} + cs := MockChainService{} + + ss.SetNetworkService(&ns) + ss.SetChainService(&cs) + + exitRoutine := make(chan bool) + + go func() { + run(ss.ctx.Done(), ss.hashBuf, ss.blockBuf, &ns, &cs) + exitRoutine <- true + }() + + b := types.NewBlock(0) + h, err := b.Hash() + if err != nil { + t.Fatal(err) + } + + // if the same block is processed twice + ss.ReceiveBlockHash(h) + ss.ReceiveBlock(b) + ss.ReceiveBlockHash(h) + // there's a tricky race condition where the second hash can sneak into the goroutine + // before the first block inserts itself into the chain. therefore, its important + // for hook.Reset() to be called after the second ProcessBlockHash call + hook.Reset() + ss.ReceiveBlock(b) + ss.cancel() + <-exitRoutine + + // the block isn't processed the second time + testutil.AssertLogsDoNotContain(t, hook, fmt.Sprintf("broadcasting block: %x", h.Sum(nil))) + testutil.AssertLogsDoNotContain(t, hook, fmt.Sprintf("forwarding block: %x", h.Sum(nil))) +} diff --git a/beacon-chain/types/BUILD.bazel b/beacon-chain/types/BUILD.bazel index a658a971e..b202e7883 100644 --- a/beacon-chain/types/BUILD.bazel +++ b/beacon-chain/types/BUILD.bazel @@ -11,5 +11,7 @@ go_library( deps = [ "@com_github_ethereum_go_ethereum//common:go_default_library", "@com_github_ethereum_go_ethereum//p2p/enr:go_default_library", + "@com_github_ethereum_go_ethereum//rlp:go_default_library", + "@org_golang_x_crypto//blake2b:go_default_library", ], ) diff --git a/beacon-chain/types/block.go b/beacon-chain/types/block.go index 510bd235c..633f5ec03 100644 --- a/beacon-chain/types/block.go +++ b/beacon-chain/types/block.go @@ -5,6 +5,8 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rlp" + "golang.org/x/crypto/blake2b" ) // Block defines a beacon chain core primitive. @@ -12,6 +14,15 @@ type Block struct { data *Data } +// Hash generates the BLAKE2b hash of the block +func (b Block) Hash() (hash.Hash, error) { + data, err := rlp.EncodeToBytes(b.Data()) + if err != nil { + return nil, err + } + return blake2b.New256(data) +} + // Data getter makes the block's properties read-only. func (b *Block) Data() *Data { return b.data diff --git a/shared/testutil/BUILD.bazel b/shared/testutil/BUILD.bazel new file mode 100644 index 000000000..6bb7f4ed1 --- /dev/null +++ b/shared/testutil/BUILD.bazel @@ -0,0 +1,9 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["log.go"], + importpath = "github.com/prysmaticlabs/prysm/shared/testutil", + visibility = ["//visibility:public"], + deps = ["@com_github_sirupsen_logrus//hooks/test:go_default_library"], +) diff --git a/shared/testutil/log.go b/shared/testutil/log.go new file mode 100644 index 000000000..9cf1041b5 --- /dev/null +++ b/shared/testutil/log.go @@ -0,0 +1,37 @@ +package testutil + +import ( + "strings" + "testing" + + "github.com/sirupsen/logrus/hooks/test" +) + +// AssertLogsContain checks that the desired string is a subset of the current log output. +// Set exitOnFail to true to immediately exit the test on failure +func AssertLogsContain(t *testing.T, hook *test.Hook, want string) { + assertLogs(t, hook, want, true) +} + +// AssertLogsDoNotContain is the inverse check of AssertLogsContain +func AssertLogsDoNotContain(t *testing.T, hook *test.Hook, want string) { + assertLogs(t, hook, want, false) +} + +func assertLogs(t *testing.T, hook *test.Hook, want string, flag bool) { + t.Logf("scanning for: %s", want) + entries := hook.AllEntries() + match := false + for _, e := range entries { + if strings.Contains(e.Message, want) { + match = true + } + t.Logf("log: %s", e.Message) + } + + if flag && !match { + t.Fatalf("log not found: %s", want) + } else if !flag && match { + t.Fatalf("unwanted log found: %s", want) + } +}