beacon: Beginning the Sync Service for the Beacon Chain (#322)

This commit is contained in:
Yutaro Mori 2018-07-26 01:57:44 +09:00 committed by Raul Jordan
parent b8fe4228af
commit 09ca832a5f
15 changed files with 579 additions and 7 deletions

5
.gitignore vendored
View File

@ -1,5 +1,8 @@
# Ignore bazel directories
bazel-*
build
.DS_Store
.gitattributes
# delve debugger output (not sure how to get rid of these)
**/debug.test

View File

@ -142,7 +142,6 @@ func (b *BeaconChain) persist() error {
// computeNewActiveState computes a new active state for every beacon block.
func (b *BeaconChain) computeNewActiveState(seed common.Hash) (*types.ActiveState, error) {
attesters, proposer, err := b.getAttestersProposer(seed)
if err != nil {
return nil, err

View File

@ -2,6 +2,7 @@ package blockchain
import (
"context"
"hash"
"github.com/prysmaticlabs/prysm/beacon-chain/database"
"github.com/prysmaticlabs/prysm/beacon-chain/powchain"
@ -31,7 +32,7 @@ func NewChainService(ctx context.Context, beaconDB *database.BeaconDB, web3Servi
// Start a blockchain service's main event loop.
func (c *ChainService) Start() {
log.Infof("Starting blockchain service")
log.Infof("Starting service")
beaconChain, err := NewBeaconChain(c.beaconDB.DB())
if err != nil {
@ -44,10 +45,23 @@ func (c *ChainService) Start() {
// Stop the blockchain service's main event loop and associated goroutines.
func (c *ChainService) Stop() error {
defer c.cancel()
log.Info("Stopping blockchain service")
log.Info("Stopping service")
return nil
}
// ProcessBlock accepts a new block for inclusion in the chain.
func (c *ChainService) ProcessBlock(b *types.Block) error {
c.latestBeaconBlock <- b
return nil
}
// ContainsBlock checks if a block for the hash exists in the chain.
// This method must be safe to call from a goroutine
func (c *ChainService) ContainsBlock(h hash.Hash) bool {
// TODO
return false
}
// updateActiveState receives a beacon block, computes a new active state and writes it to db.
func (c *ChainService) updateActiveState() {
for {

View File

@ -46,7 +46,7 @@ func TestStartStop(t *testing.T) {
}
msg = hook.AllEntries()[1].Message
want = "Starting blockchain service"
want = "Starting service"
if msg != want {
t.Errorf("incorrect log, expected %s, got %s", want, msg)
}
@ -58,7 +58,7 @@ func TestStartStop(t *testing.T) {
}
msg = hook.AllEntries()[3].Message
want = "Stopping blockchain service"
want = "Stopping service"
if msg != want {
t.Errorf("incorrect log, expected %s, got %s", want, msg)
}

View File

@ -0,0 +1,12 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["service.go"],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/network",
visibility = ["//visibility:public"],
deps = [
"//beacon-chain/types:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

View File

@ -0,0 +1,62 @@
package network
import (
"hash"
"github.com/prysmaticlabs/prysm/beacon-chain/types"
"github.com/sirupsen/logrus"
)
var log = logrus.WithField("prefix", "network")
// Service is the middleware between the application-agnostic p2p service and subscribers to the network.
type Service struct {
syncService SyncService
}
// SyncService is the interface for the sync service.
type SyncService interface {
ReceiveBlockHash(hash.Hash)
ReceiveBlock(*types.Block) error
}
// NewNetworkService instantiates a new network service.
func NewNetworkService() *Service {
return &Service{}
}
// SetSyncService sets a concrete value for the sync service.
func (ns *Service) SetSyncService(ss SyncService) {
ns.syncService = ss
}
// Start launches the service's goroutine.
func (ns *Service) Start() {
log.Info("Starting service")
go run()
}
// Stop kills the service's goroutine (unimplemented).
func (ns *Service) Stop() error {
log.Info("Stopping service")
return nil
}
// BroadcastBlockHash sends the block hash to other peers in the network.
func (ns *Service) BroadcastBlockHash(h hash.Hash) error {
return nil
}
// BroadcastBlock sends the block to other peers in the network.
func (ns *Service) BroadcastBlock(b *types.Block) error {
return nil
}
// RequestBlock requests the contents of the block given the block hash.
func (ns *Service) RequestBlock(hash.Hash) error {
return nil
}
func run() {
select {}
}

View File

@ -8,7 +8,9 @@ go_library(
deps = [
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/database:go_default_library",
"//beacon-chain/network:go_default_library",
"//beacon-chain/powchain:go_default_library",
"//beacon-chain/sync:go_default_library",
"//beacon-chain/utils:go_default_library",
"//shared:go_default_library",
"//shared/cmd:go_default_library",

View File

@ -11,7 +11,9 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/database"
"github.com/prysmaticlabs/prysm/beacon-chain/network"
"github.com/prysmaticlabs/prysm/beacon-chain/powchain"
rbcSync "github.com/prysmaticlabs/prysm/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/beacon-chain/utils"
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/cmd"
@ -57,6 +59,14 @@ func New(ctx *cli.Context) (*BeaconNode, error) {
return nil, err
}
if err := beacon.registerNetworkService(); err != nil {
return nil, err
}
if err := beacon.registerSyncService(); err != nil {
return nil, err
}
return beacon, nil
}
@ -140,3 +150,25 @@ func (b *BeaconNode) registerPOWChainService() error {
}
return b.services.RegisterService(web3Service)
}
func (b *BeaconNode) registerNetworkService() error {
networkService := network.NewNetworkService()
return b.services.RegisterService(networkService)
}
func (b *BeaconNode) registerSyncService() error {
var chainService *blockchain.ChainService
b.services.FetchService(&chainService)
var networkService *network.Service
b.services.FetchService(&networkService)
syncService := rbcSync.NewSyncService(context.Background(), rbcSync.DefaultConfig())
syncService.SetChainService(chainService)
syncService.SetNetworkService(networkService)
networkService.SetSyncService(syncService)
return b.services.RegisterService(syncService)
}

View File

@ -0,0 +1,24 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["service.go"],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/sync",
visibility = ["//visibility:public"],
deps = [
"//beacon-chain/types:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["service_test.go"],
embed = [":go_default_library"],
deps = [
"//beacon-chain/types:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
"@org_golang_x_crypto//blake2b:go_default_library",
],
)

View File

@ -0,0 +1,134 @@
package sync
import (
"context"
"hash"
"github.com/prysmaticlabs/prysm/beacon-chain/types"
"github.com/sirupsen/logrus"
)
var log = logrus.WithField("prefix", "sync")
// Service is the gateway and the bridge between the p2p network and the local beacon chain.
// In broad terms, a new block is synced in 4 steps:
// 1. Receive a block hash from a peer
// 2. Request the block for the hash from the network
// 3. Receive the block
// 4. Forward block to the beacon service for full validation
//
// In addition, Service will handle the following responsibilities:
// * Decide which messages are forwarded to other peers
// * Filter redundant data and unwanted data
// * Drop peers that send invalid data
// * Trottle incoming requests
type Service struct {
ctx context.Context
cancel context.CancelFunc
networkService NetworkService
chainService ChainService
hashBuf chan hash.Hash
blockBuf chan *types.Block
}
// Config allows the channel's buffer sizes to be changed
type Config struct {
HashBufferSize int
BlockBufferSize int
}
// DefaultConfig provides the default configuration for a sync service
func DefaultConfig() Config {
return Config{100, 100}
}
// NetworkService is the interface for the p2p network.
type NetworkService interface {
BroadcastBlockHash(hash.Hash) error
BroadcastBlock(*types.Block) error
RequestBlock(hash.Hash) error
}
// ChainService is the interface for the local beacon chain.
type ChainService interface {
ProcessBlock(*types.Block) error
ContainsBlock(hash.Hash) bool
}
// NewSyncService accepts a context and returns a new Service.
func NewSyncService(ctx context.Context, cfg Config) *Service {
ctx, cancel := context.WithCancel(ctx)
return &Service{
ctx: ctx,
cancel: cancel,
hashBuf: make(chan hash.Hash, cfg.HashBufferSize),
blockBuf: make(chan *types.Block, cfg.BlockBufferSize),
}
}
// SetNetworkService sets a concrete value for the p2p layer.
func (ss *Service) SetNetworkService(ps NetworkService) {
ss.networkService = ps
}
// SetChainService sets a concrete value for the local beacon chain.
func (ss *Service) SetChainService(cs ChainService) {
ss.chainService = cs
}
// Start begins the block processing goroutine.
func (ss *Service) Start() {
log.Info("Starting service")
go run(ss.ctx.Done(), ss.hashBuf, ss.blockBuf, ss.networkService, ss.chainService)
}
// Stop kills the block processing goroutine, but does not wait until the goroutine exits.
func (ss *Service) Stop() error {
log.Info("Stopping service")
ss.cancel()
return nil
}
// ReceiveBlockHash accepts a block hash.
// New hashes are forwarded to other peers in the network (unimplemented), and
// the contents of the block are requested if the local chain doesn't have the block.
func (ss *Service) ReceiveBlockHash(h hash.Hash) {
if ss.chainService.ContainsBlock(h) {
return
}
ss.hashBuf <- h
ss.networkService.BroadcastBlockHash(h)
}
// ReceiveBlock accepts a block to potentially be included in the local chain.
// The service will filter blocks that have not been requested (unimplemented).
func (ss *Service) ReceiveBlock(b *types.Block) error {
h, err := b.Hash()
if err != nil {
return err
}
if ss.chainService.ContainsBlock(h) {
return nil
}
ss.blockBuf <- b
ss.networkService.BroadcastBlock(b)
return nil
}
func run(done <-chan struct{}, hashBuf <-chan hash.Hash, blockBuf <-chan *types.Block, ps NetworkService, cs ChainService) {
for {
select {
case <-done:
log.Infof("exiting goroutine")
return
case h := <-hashBuf:
ps.RequestBlock(h)
case b := <-blockBuf:
cs.ProcessBlock(b)
}
}
}

View File

@ -0,0 +1,231 @@
package sync
import (
"bytes"
"context"
"fmt"
"hash"
"testing"
"github.com/prysmaticlabs/prysm/beacon-chain/types"
"github.com/prysmaticlabs/prysm/shared/testutil"
logTest "github.com/sirupsen/logrus/hooks/test"
"golang.org/x/crypto/blake2b"
)
var testLog = log.WithField("prefix", "sync_test")
type MockNetworkService struct{}
func (ns *MockNetworkService) BroadcastBlockHash(h hash.Hash) error {
testLog.Infof("broadcasting hash: %x", h.Sum(nil))
return nil
}
func (ns *MockNetworkService) BroadcastBlock(b *types.Block) error {
h, err := b.Hash()
if err != nil {
return err
}
testLog.Infof("broadcasting block: %x", h.Sum(nil))
return nil
}
func (ns *MockNetworkService) RequestBlock(h hash.Hash) error {
testLog.Infof("requesting block: %x", h.Sum(nil))
return nil
}
// MockChainService implements a simplified local chain that stores blocks in a slice
type MockChainService struct {
processedHashes []hash.Hash
}
func (ms *MockChainService) ProcessBlock(b *types.Block) error {
h, err := b.Hash()
if err != nil {
return err
}
testLog.Infof("forwarding block: %x", h.Sum(nil))
if ms.processedHashes == nil {
ms.processedHashes = []hash.Hash{}
}
ms.processedHashes = append(ms.processedHashes, h)
return nil
}
func (ms *MockChainService) ContainsBlock(h hash.Hash) bool {
for _, h1 := range ms.processedHashes {
if bytes.Equal(h.Sum(nil), h1.Sum(nil)) {
return true
}
}
return false
}
func TestProcessBlockHash(t *testing.T) {
hook := logTest.NewGlobal()
// set the channel's buffer to 0 to make channel interactions blocking
cfg := Config{HashBufferSize: 0, BlockBufferSize: 0}
ss := NewSyncService(context.Background(), cfg)
ns := MockNetworkService{}
cs := MockChainService{}
ss.SetNetworkService(&ns)
ss.SetChainService(&cs)
exitRoutine := make(chan bool)
go func() {
run(ss.ctx.Done(), ss.hashBuf, ss.blockBuf, &ns, &cs)
exitRoutine <- true
}()
h, err := blake2b.New256(nil)
if err != nil {
t.Errorf("failed to intialize hash: %v", err)
}
// if a new hash is processed
ss.ReceiveBlockHash(h)
ss.cancel()
<-exitRoutine
// sync service requests the contents of the block and broadcasts the hash to peers
testutil.AssertLogsContain(t, hook, fmt.Sprintf("requesting block: %x", h.Sum(nil)))
testutil.AssertLogsContain(t, hook, fmt.Sprintf("broadcasting hash: %x", h.Sum(nil)))
}
func TestProcessBlock(t *testing.T) {
hook := logTest.NewGlobal()
cfg := Config{HashBufferSize: 0, BlockBufferSize: 0}
ss := NewSyncService(context.Background(), cfg)
ns := MockNetworkService{}
cs := MockChainService{}
ss.SetNetworkService(&ns)
ss.SetChainService(&cs)
exitRoutine := make(chan bool)
go func() {
run(ss.ctx.Done(), ss.hashBuf, ss.blockBuf, &ns, &cs)
exitRoutine <- true
}()
b := types.NewBlock(0)
h, err := b.Hash()
if err != nil {
t.Fatal(err)
}
// if the hash and the block are processed in order
ss.ReceiveBlockHash(h)
ss.ReceiveBlock(b)
ss.cancel()
<-exitRoutine
// sync service broadcasts the block and forwards the block to to the local chain
testutil.AssertLogsContain(t, hook, fmt.Sprintf("broadcasting block: %x", h.Sum(nil)))
testutil.AssertLogsContain(t, hook, fmt.Sprintf("forwarding block: %x", h.Sum(nil)))
}
func TestProcessMultipleBlocks(t *testing.T) {
hook := logTest.NewGlobal()
cfg := Config{HashBufferSize: 0, BlockBufferSize: 0}
ss := NewSyncService(context.Background(), cfg)
ns := MockNetworkService{}
cs := MockChainService{}
ss.SetNetworkService(&ns)
ss.SetChainService(&cs)
exitRoutine := make(chan bool)
go func() {
run(ss.ctx.Done(), ss.hashBuf, ss.blockBuf, &ns, &cs)
exitRoutine <- true
}()
b1 := types.NewBlock(0)
h1, err := b1.Hash()
if err != nil {
t.Fatal(err)
}
b2 := types.NewBlock(1)
h2, err := b2.Hash()
if err != nil {
t.Fatal(err)
}
if bytes.Equal(h1.Sum(nil), h2.Sum(nil)) {
t.Fatalf("two blocks should not have the same hash:\n%x\n%x", h1.Sum(nil), h2.Sum(nil))
}
// if two different blocks are submitted
ss.ReceiveBlockHash(h1)
ss.ReceiveBlock(b1)
ss.ReceiveBlockHash(h2)
ss.ReceiveBlock(b2)
ss.cancel()
<-exitRoutine
// both blocks are processed
testutil.AssertLogsContain(t, hook, fmt.Sprintf("broadcasting block: %x", h1.Sum(nil)))
testutil.AssertLogsContain(t, hook, fmt.Sprintf("forwarding block: %x", h1.Sum(nil)))
testutil.AssertLogsContain(t, hook, fmt.Sprintf("broadcasting block: %x", h2.Sum(nil)))
testutil.AssertLogsContain(t, hook, fmt.Sprintf("forwarding block: %x", h2.Sum(nil)))
}
func TestProcessSameBlock(t *testing.T) {
hook := logTest.NewGlobal()
cfg := Config{HashBufferSize: 0, BlockBufferSize: 0}
ss := NewSyncService(context.Background(), cfg)
ns := MockNetworkService{}
cs := MockChainService{}
ss.SetNetworkService(&ns)
ss.SetChainService(&cs)
exitRoutine := make(chan bool)
go func() {
run(ss.ctx.Done(), ss.hashBuf, ss.blockBuf, &ns, &cs)
exitRoutine <- true
}()
b := types.NewBlock(0)
h, err := b.Hash()
if err != nil {
t.Fatal(err)
}
// if the same block is processed twice
ss.ReceiveBlockHash(h)
ss.ReceiveBlock(b)
ss.ReceiveBlockHash(h)
// there's a tricky race condition where the second hash can sneak into the goroutine
// before the first block inserts itself into the chain. therefore, its important
// for hook.Reset() to be called after the second ProcessBlockHash call
hook.Reset()
ss.ReceiveBlock(b)
ss.cancel()
<-exitRoutine
// the block isn't processed the second time
testutil.AssertLogsDoNotContain(t, hook, fmt.Sprintf("broadcasting block: %x", h.Sum(nil)))
testutil.AssertLogsDoNotContain(t, hook, fmt.Sprintf("forwarding block: %x", h.Sum(nil)))
}

View File

@ -11,5 +11,7 @@ go_library(
deps = [
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_ethereum_go_ethereum//rlp:go_default_library",
"@org_golang_x_crypto//blake2b:go_default_library",
],
)

View File

@ -5,6 +5,8 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"golang.org/x/crypto/blake2b"
)
// Block defines a beacon chain core primitive.
@ -12,6 +14,15 @@ type Block struct {
data *Data
}
// Hash generates the BLAKE2b hash of the block
func (b Block) Hash() (hash.Hash, error) {
data, err := rlp.EncodeToBytes(b.Data())
if err != nil {
return nil, err
}
return blake2b.New256(data)
}
// Data getter makes the block's properties read-only.
func (b *Block) Data() *Data {
return b.data

View File

@ -0,0 +1,9 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["log.go"],
importpath = "github.com/prysmaticlabs/prysm/shared/testutil",
visibility = ["//visibility:public"],
deps = ["@com_github_sirupsen_logrus//hooks/test:go_default_library"],
)

37
shared/testutil/log.go Normal file
View File

@ -0,0 +1,37 @@
package testutil
import (
"strings"
"testing"
"github.com/sirupsen/logrus/hooks/test"
)
// AssertLogsContain checks that the desired string is a subset of the current log output.
// Set exitOnFail to true to immediately exit the test on failure
func AssertLogsContain(t *testing.T, hook *test.Hook, want string) {
assertLogs(t, hook, want, true)
}
// AssertLogsDoNotContain is the inverse check of AssertLogsContain
func AssertLogsDoNotContain(t *testing.T, hook *test.Hook, want string) {
assertLogs(t, hook, want, false)
}
func assertLogs(t *testing.T, hook *test.Hook, want string, flag bool) {
t.Logf("scanning for: %s", want)
entries := hook.AllEntries()
match := false
for _, e := range entries {
if strings.Contains(e.Message, want) {
match = true
}
t.Logf("log: %s", e.Message)
}
if flag && !match {
t.Fatalf("log not found: %s", want)
} else if !flag && match {
t.Fatalf("unwanted log found: %s", want)
}
}