prysm-pulse/beacon-chain/sync/initial-sync/fsm.go
Nishant Das caf9bdbc6f
Use Block Interface Across Prysm (#8918)
* commit initial work

* checkpoint current work

* gaz

* checkpoint

* req/resp changes

* initial-sync

* finally works

* fix error

* fix bugs

* fix issue

* fix issues

* fix refs

* tests

* more text fixes

* more text fixes

* more text fixes

* fix tests

* fix tests

* tests

* finally fix builds

* finally

* comments

* fix fuzz

* share common library

* fix

* fix

* add in more defensive nil checks

* add in more defensive nil checks

* imports

* Apply suggestions from code review

Co-authored-by: terence tsao <terence@prysmaticlabs.com>

* Apply suggestions from code review

Co-authored-by: terence tsao <terence@prysmaticlabs.com>

* Update shared/interfaces/block_interface.go

Co-authored-by: terence tsao <terence@prysmaticlabs.com>

* Update shared/interfaces/block_wrapper.go

Co-authored-by: terence tsao <terence@prysmaticlabs.com>

* Update shared/interfaces/block_interface.go

Co-authored-by: terence tsao <terence@prysmaticlabs.com>

* imports

* fix bad changes

* fix

* terence's review

* terence's review

* fmt

* Update beacon-chain/rpc/beacon/blocks.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* fix tests

* fix

* fix all tests

Co-authored-by: terence tsao <terence@prysmaticlabs.com>
Co-authored-by: Radosław Kapka <rkapka@wp.pl>
2021-05-26 16:19:54 +00:00

221 lines
5.9 KiB
Go

package initialsync
import (
"errors"
"fmt"
"sort"
"time"
"github.com/libp2p/go-libp2p-core/peer"
types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/shared/interfaces"
"github.com/prysmaticlabs/prysm/shared/timeutils"
)
const (
stateNew stateID = iota
stateScheduled
stateDataParsed
stateSkipped
stateSent
)
const (
eventTick eventID = iota
eventDataReceived
)
// stateID is unique handle for a state.
type stateID uint8
// eventID is unique handle for an event.
type eventID uint8
// stateMachineManager is a collection of managed FSMs.
type stateMachineManager struct {
keys []types.Slot
machines map[types.Slot]*stateMachine
handlers map[stateID]map[eventID]eventHandlerFn
}
// stateMachine holds a state of a single block processing FSM.
// Each FSM allows deterministic state transitions: State(S) x Event(E) -> Actions (A), State(S').
type stateMachine struct {
smm *stateMachineManager
start types.Slot
state stateID
pid peer.ID
blocks []interfaces.SignedBeaconBlock
updated time.Time
}
// eventHandlerFn is an event handler function's signature.
type eventHandlerFn func(m *stateMachine, data interface{}) (newState stateID, err error)
// newStateMachineManager returns fully initialized state machine manager.
func newStateMachineManager() *stateMachineManager {
return &stateMachineManager{
keys: make([]types.Slot, 0, lookaheadSteps),
machines: make(map[types.Slot]*stateMachine, lookaheadSteps),
handlers: make(map[stateID]map[eventID]eventHandlerFn),
}
}
// addHandler attaches an event handler to a state event.
func (smm *stateMachineManager) addEventHandler(event eventID, state stateID, fn eventHandlerFn) {
if _, ok := smm.handlers[state]; !ok {
smm.handlers[state] = make(map[eventID]eventHandlerFn)
}
if _, ok := smm.handlers[state][event]; !ok {
smm.handlers[state][event] = fn
}
}
// addStateMachine allocates memory for new FSM.
func (smm *stateMachineManager) addStateMachine(startSlot types.Slot) *stateMachine {
smm.machines[startSlot] = &stateMachine{
smm: smm,
start: startSlot,
state: stateNew,
blocks: []interfaces.SignedBeaconBlock{},
updated: timeutils.Now(),
}
smm.recalculateMachineAttribs()
return smm.machines[startSlot]
}
// removeStateMachine frees memory of a processed/finished FSM.
func (smm *stateMachineManager) removeStateMachine(startSlot types.Slot) error {
if _, ok := smm.machines[startSlot]; !ok {
return fmt.Errorf("state for machine %v is not found", startSlot)
}
smm.machines[startSlot].blocks = nil
delete(smm.machines, startSlot)
smm.recalculateMachineAttribs()
return nil
}
// removeAllStateMachines removes all managed machines.
func (smm *stateMachineManager) removeAllStateMachines() error {
for _, key := range smm.keys {
if err := smm.removeStateMachine(key); err != nil {
return err
}
}
smm.recalculateMachineAttribs()
return nil
}
// recalculateMachineAttribs updates cached attributes, which are used for efficiency.
func (smm *stateMachineManager) recalculateMachineAttribs() {
keys := make([]types.Slot, 0, lookaheadSteps)
for key := range smm.machines {
keys = append(keys, key)
}
sort.Slice(keys, func(i, j int) bool {
return keys[i] < keys[j]
})
smm.keys = keys
}
// findStateMachine returns a state machine for a given start slot (if exists).
func (smm *stateMachineManager) findStateMachine(startSlot types.Slot) (*stateMachine, bool) {
fsm, ok := smm.machines[startSlot]
return fsm, ok
}
// highestStartSlot returns the start slot for the latest known state machine.
func (smm *stateMachineManager) highestStartSlot() (types.Slot, error) {
if len(smm.keys) == 0 {
return 0, errors.New("no state machine exist")
}
key := smm.keys[len(smm.keys)-1]
return smm.machines[key].start, nil
}
// allMachinesInState checks whether all registered state machines are in the same state.
func (smm *stateMachineManager) allMachinesInState(state stateID) bool {
if len(smm.machines) == 0 {
return false
}
for _, fsm := range smm.machines {
if fsm.state != state {
return false
}
}
return true
}
// String returns human readable representation of a FSM collection.
func (smm *stateMachineManager) String() string {
return fmt.Sprintf("%v", smm.machines)
}
// setState updates the current state of a given state machine.
func (m *stateMachine) setState(name stateID) {
if m.state == name {
return
}
m.state = name
m.updated = timeutils.Now()
}
// trigger invokes the event handler on a given state machine.
func (m *stateMachine) trigger(event eventID, data interface{}) error {
handlers, ok := m.smm.handlers[m.state]
if !ok {
return fmt.Errorf("no event handlers registered for event: %v, state: %v", event, m.state)
}
if handlerFn, ok := handlers[event]; ok {
state, err := handlerFn(m, data)
if err != nil {
return err
}
m.setState(state)
}
return nil
}
// isFirst checks whether a given machine has the lowest start slot.
func (m *stateMachine) isFirst() bool {
return m.start == m.smm.keys[0]
}
// isLast checks whether a given machine has the highest start slot.
func (m *stateMachine) isLast() bool {
return m.start == m.smm.keys[len(m.smm.keys)-1]
}
// String returns human-readable representation of a FSM state.
func (m *stateMachine) String() string {
return fmt.Sprintf("{%d:%s}", helpers.SlotToEpoch(m.start), m.state)
}
// String returns human-readable representation of a state.
func (s stateID) String() string {
states := map[stateID]string{
stateNew: "new",
stateScheduled: "scheduled",
stateDataParsed: "dataParsed",
stateSkipped: "skipped",
stateSent: "sent",
}
if _, ok := states[s]; !ok {
return "stateUnknown"
}
return states[s]
}
// String returns human-readable representation of an event.
func (e eventID) String() string {
events := map[eventID]string{
eventTick: "tick",
eventDataReceived: "dataReceived",
}
if _, ok := events[e]; !ok {
return "eventUnknown"
}
return events[e]
}