prysm-pulse/beacon-chain/simulator/service.go
2018-12-03 08:10:47 -08:00

357 lines
9.9 KiB
Go

// Package simulator defines the simulation utility to test the beacon-chain.
package simulator
import (
"bytes"
"context"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/prysmaticlabs/prysm/beacon-chain/core/types"
v "github.com/prysmaticlabs/prysm/beacon-chain/core/validators"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bitutil"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/p2p"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/slotticker"
"github.com/sirupsen/logrus"
)
var log = logrus.WithField("prefix", "simulator")
type p2pAPI interface {
Subscribe(msg proto.Message, channel chan p2p.Message) event.Subscription
Send(msg proto.Message, peer p2p.Peer)
Broadcast(msg proto.Message)
}
type powChainService interface {
LatestBlockHash() common.Hash
}
// Simulator struct.
type Simulator struct {
ctx context.Context
cancel context.CancelFunc
p2p p2pAPI
web3Service powChainService
beaconDB *db.BeaconDB
enablePOWChain bool
blockRequestChan chan p2p.Message
blockBySlotChan chan p2p.Message
stateReqChan chan p2p.Message
chainHeadRequestChan chan p2p.Message
}
// Config options for the simulator service.
type Config struct {
BlockRequestBuf int
BlockSlotBuf int
ChainHeadRequestBuf int
StateReqBuf int
P2P p2pAPI
Web3Service powChainService
BeaconDB *db.BeaconDB
EnablePOWChain bool
}
// DefaultConfig options for the simulator.
func DefaultConfig() *Config {
return &Config{
BlockRequestBuf: 100,
BlockSlotBuf: 100,
StateReqBuf: 100,
ChainHeadRequestBuf: 100,
}
}
// NewSimulator creates a simulator instance for a syncer to consume fake, generated blocks.
func NewSimulator(ctx context.Context, cfg *Config) *Simulator {
ctx, cancel := context.WithCancel(ctx)
return &Simulator{
ctx: ctx,
cancel: cancel,
p2p: cfg.P2P,
web3Service: cfg.Web3Service,
beaconDB: cfg.BeaconDB,
enablePOWChain: cfg.EnablePOWChain,
blockRequestChan: make(chan p2p.Message, cfg.BlockRequestBuf),
blockBySlotChan: make(chan p2p.Message, cfg.BlockSlotBuf),
stateReqChan: make(chan p2p.Message, cfg.StateReqBuf),
chainHeadRequestChan: make(chan p2p.Message, cfg.ChainHeadRequestBuf),
}
}
// Start the sim.
func (sim *Simulator) Start() {
log.Info("Starting service")
genesisTime, err := sim.beaconDB.GetGenesisTime()
if err != nil {
log.Fatal(err)
return
}
currentSlot, err := sim.beaconDB.GetSimulatorSlot()
if err != nil {
log.Fatal(err)
return
}
slotTicker := slotticker.GetSimulatorTicker(genesisTime, params.BeaconConfig().SlotDuration, currentSlot)
go func() {
sim.run(slotTicker.C())
close(sim.blockRequestChan)
close(sim.blockBySlotChan)
slotTicker.Done()
}()
}
// Stop the sim.
func (sim *Simulator) Stop() error {
defer sim.cancel()
log.Info("Stopping service")
return nil
}
func (sim *Simulator) run(slotInterval <-chan uint64) {
chainHdReqSub := sim.p2p.Subscribe(&pb.ChainHeadRequest{}, sim.chainHeadRequestChan)
blockReqSub := sim.p2p.Subscribe(&pb.BeaconBlockRequest{}, sim.blockRequestChan)
blockBySlotSub := sim.p2p.Subscribe(&pb.BeaconBlockRequestBySlotNumber{}, sim.blockBySlotChan)
stateReqSub := sim.p2p.Subscribe(&pb.BeaconStateRequest{}, sim.stateReqChan)
defer blockReqSub.Unsubscribe()
defer blockBySlotSub.Unsubscribe()
defer stateReqSub.Unsubscribe()
defer chainHdReqSub.Unsubscribe()
lastBlock, err := sim.beaconDB.GetChainHead()
if err != nil {
log.Errorf("Could not fetch latest block: %v", err)
return
}
lastHash, err := lastBlock.Hash()
if err != nil {
log.Errorf("Could not get hash of the latest block: %v", err)
}
broadcastedBlocksByHash := map[[32]byte]*types.Block{}
broadcastedBlocksBySlot := map[uint64]*types.Block{}
for {
select {
case <-sim.ctx.Done():
log.Debug("Simulator context closed, exiting goroutine")
return
case msg := <-sim.chainHeadRequestChan:
log.Debug("Received chain head request")
if err := sim.SendChainHead(msg.Peer); err != nil {
log.Errorf("Unable to send chain head response %v", err)
}
case slot := <-slotInterval:
block, err := sim.generateBlock(slot, lastHash)
if err != nil {
log.Error(err)
continue
}
hash, err := block.Hash()
if err != nil {
log.Errorf("Could not hash simulated block: %v", err)
continue
}
sim.p2p.Broadcast(&pb.BeaconBlockAnnounce{
Hash: hash[:],
SlotNumber: slot,
})
log.WithFields(logrus.Fields{
"hash": fmt.Sprintf("%#x", hash),
"slot": slot,
}).Debug("Broadcast block hash and slot")
sim.SaveSimulatorSlot(slot)
broadcastedBlocksByHash[hash] = block
broadcastedBlocksBySlot[slot] = block
lastHash = hash
case msg := <-sim.blockBySlotChan:
data := msg.Data.(*pb.BeaconBlockRequestBySlotNumber)
block := broadcastedBlocksBySlot[data.GetSlotNumber()]
if block == nil {
log.WithFields(logrus.Fields{
"slot": fmt.Sprintf("%d", data.GetSlotNumber()),
}).Debug("Requested block not found:")
continue
}
log.WithFields(logrus.Fields{
"slot": fmt.Sprintf("%d", data.GetSlotNumber()),
}).Debug("Responding to full block request")
// Sends the full block body to the requester.
res := &pb.BeaconBlockResponse{Block: block.Proto(), Attestation: &pb.AggregatedAttestation{
Slot: block.SlotNumber(),
AttesterBitfield: []byte{byte(255)},
}}
sim.p2p.Send(res, msg.Peer)
case msg := <-sim.blockRequestChan:
data := msg.Data.(*pb.BeaconBlockRequest)
var hash [32]byte
copy(hash[:], data.Hash)
block := broadcastedBlocksByHash[hash]
if block == nil {
log.WithFields(logrus.Fields{
"hash": fmt.Sprintf("%#x", hash),
}).Debug("Requested block not found:")
continue
}
log.WithFields(logrus.Fields{
"hash": fmt.Sprintf("%#x", hash),
}).Debug("Responding to full block request")
// Sends the full block body to the requester.
res := &pb.BeaconBlockResponse{Block: block.Proto(), Attestation: &pb.AggregatedAttestation{
Slot: block.SlotNumber(),
AttesterBitfield: []byte{byte(255)},
}}
sim.p2p.Send(res, msg.Peer)
case msg := <-sim.stateReqChan:
data := msg.Data.(*pb.BeaconStateRequest)
beaconState, err := sim.beaconDB.GetState()
if err != nil {
log.Errorf("Could not retrieve beacon state: %v", err)
continue
}
hash, err := beaconState.Hash()
if err != nil {
log.Errorf("Could not hash beacon state: %v", err)
continue
}
if !bytes.Equal(data.GetHash(), hash[:]) {
log.WithFields(logrus.Fields{
"hash": fmt.Sprintf("%#x", data.GetHash()),
}).Debug("Requested beacon state is of a different hash")
continue
}
log.WithFields(logrus.Fields{
"hash": fmt.Sprintf("%#x", hash),
}).Debug("Responding to full beacon state request")
// Sends the full crystallized state to the requester.
res := &pb.BeaconStateResponse{
BeaconState: beaconState.Proto(),
}
sim.p2p.Send(res, msg.Peer)
}
}
}
// generateBlock generates fake blocks for the simulator.
func (sim *Simulator) generateBlock(slot uint64, lastHash [32]byte) (*types.Block, error) {
beaconState, err := sim.beaconDB.GetState()
if err != nil {
return nil, fmt.Errorf("could not retrieve beacon state: %v", err)
}
stateHash, err := beaconState.Hash()
if err != nil {
return nil, fmt.Errorf("could not hash beacon state: %v", err)
}
var powChainRef []byte
if sim.enablePOWChain {
powChainRef = sim.web3Service.LatestBlockHash().Bytes()
} else {
powChainRef = []byte{byte(slot)}
}
parentSlot := slot - 1
committees, err := v.GetShardAndCommitteesForSlot(
beaconState.ShardAndCommitteesForSlots(),
beaconState.LastStateRecalculationSlot(),
parentSlot,
)
if err != nil {
return nil, fmt.Errorf("failed to get shard committee: %v", err)
}
parentHash := make([]byte, 32)
copy(parentHash, lastHash[:])
shardCommittees := committees.ArrayShardAndCommittee
attestations := make([]*pb.AggregatedAttestation, len(shardCommittees))
// Create attestations for all committees of the previous block.
// Ensure that all attesters have voted by calling FillBitfield.
for i, shardCommittee := range shardCommittees {
shardID := shardCommittee.Shard
numAttesters := len(shardCommittee.Committee)
attestations[i] = &pb.AggregatedAttestation{
Slot: parentSlot,
AttesterBitfield: bitutil.FillBitfield(numAttesters),
JustifiedBlockHash: parentHash,
Shard: shardID,
}
}
block := types.NewBlock(&pb.BeaconBlock{
Slot: slot,
Timestamp: ptypes.TimestampNow(),
CandidatePowReceiptRootHash32: powChainRef,
StateRootHash32: stateHash[:],
AncestorHash32S: [][]byte{parentHash},
RandaoRevealHash32: params.BeaconConfig().SimulatedBlockRandao[:],
Attestations: attestations,
})
return block, nil
}
// SendChainHead sends the latest head of the local chain
// to the peer who requested it.
func (sim *Simulator) SendChainHead(peer p2p.Peer) error {
block, err := sim.beaconDB.GetChainHead()
if err != nil {
return err
}
hash, err := block.Hash()
if err != nil {
return err
}
res := &pb.ChainHeadResponse{
Hash: hash[:],
Slot: block.SlotNumber(),
Block: block.Proto(),
}
sim.p2p.Send(res, peer)
log.WithFields(logrus.Fields{
"hash": fmt.Sprintf("%#x", hash),
}).Debug("Responding to chain head request")
return nil
}
// SaveSimulatorSlot persists the current slot of the simulator.
func (sim *Simulator) SaveSimulatorSlot(slot uint64) {
err := sim.beaconDB.SaveSimulatorSlot(slot)
if err != nil {
log.Error(err)
}
}