2019-01-30 19:48:50 +00:00
|
|
|
// Package operations defines the life-cycle of beacon block operations.
|
|
|
|
package operations
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2019-08-10 20:13:04 +00:00
|
|
|
"sync"
|
2019-01-30 19:48:50 +00:00
|
|
|
|
2019-12-06 20:06:37 +00:00
|
|
|
"github.com/dgraph-io/ristretto"
|
2019-11-27 05:08:18 +00:00
|
|
|
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
2019-01-30 19:48:50 +00:00
|
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/db"
|
2019-10-17 06:46:07 +00:00
|
|
|
dbpb "github.com/prysmaticlabs/prysm/proto/beacon/db"
|
2019-01-30 19:48:50 +00:00
|
|
|
"github.com/prysmaticlabs/prysm/shared/event"
|
2019-03-10 22:53:28 +00:00
|
|
|
handler "github.com/prysmaticlabs/prysm/shared/messagehandler"
|
2019-02-15 19:27:45 +00:00
|
|
|
"github.com/prysmaticlabs/prysm/shared/params"
|
2019-01-30 19:48:50 +00:00
|
|
|
)
|
|
|
|
|
2019-03-19 23:07:49 +00:00
|
|
|
// OperationFeeds inteface defines the informational feeds from the operations
|
|
|
|
// service.
|
|
|
|
type OperationFeeds interface {
|
|
|
|
IncomingProcessedBlockFeed() *event.Feed
|
2019-11-18 17:19:03 +00:00
|
|
|
Pool
|
2019-03-19 23:07:49 +00:00
|
|
|
}
|
|
|
|
|
2019-01-30 19:48:50 +00:00
|
|
|
// Service represents a service that handles the internal
|
|
|
|
// logic of beacon block operations.
|
|
|
|
type Service struct {
|
2019-02-18 23:34:49 +00:00
|
|
|
ctx context.Context
|
|
|
|
cancel context.CancelFunc
|
2019-08-21 16:04:00 +00:00
|
|
|
beaconDB db.Database
|
2019-02-18 23:34:49 +00:00
|
|
|
incomingProcessedBlockFeed *event.Feed
|
2019-07-22 14:03:57 +00:00
|
|
|
incomingProcessedBlock chan *ethpb.BeaconBlock
|
2019-02-18 23:34:49 +00:00
|
|
|
error error
|
2019-10-17 06:46:07 +00:00
|
|
|
attestationPool map[[32]byte]*dbpb.AttestationContainer
|
|
|
|
recentAttestationBitlist *recentAttestationMultiMap
|
2019-11-22 05:11:38 +00:00
|
|
|
attestationPoolLock sync.RWMutex
|
2019-12-06 20:06:37 +00:00
|
|
|
attestationLockCache *ristretto.Cache
|
2019-01-30 19:48:50 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Config options for the service.
|
|
|
|
type Config struct {
|
2019-08-21 16:04:00 +00:00
|
|
|
BeaconDB db.Database
|
2019-01-30 19:48:50 +00:00
|
|
|
}
|
|
|
|
|
2019-09-07 02:39:14 +00:00
|
|
|
// NewService instantiates a new operation service instance that will
|
2019-01-30 19:48:50 +00:00
|
|
|
// be registered into a running beacon node.
|
2019-09-07 02:39:14 +00:00
|
|
|
func NewService(ctx context.Context, cfg *Config) *Service {
|
2019-01-30 19:48:50 +00:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
2019-12-06 20:06:37 +00:00
|
|
|
attLockCache, _ := ristretto.NewCache(&ristretto.Config{
|
|
|
|
NumCounters: 500,
|
|
|
|
MaxCost: 500,
|
|
|
|
BufferItems: 64,
|
|
|
|
})
|
2019-01-30 19:48:50 +00:00
|
|
|
return &Service{
|
2019-02-18 23:34:49 +00:00
|
|
|
ctx: ctx,
|
|
|
|
cancel: cancel,
|
|
|
|
beaconDB: cfg.BeaconDB,
|
|
|
|
incomingProcessedBlockFeed: new(event.Feed),
|
2019-07-22 14:03:57 +00:00
|
|
|
incomingProcessedBlock: make(chan *ethpb.BeaconBlock, params.BeaconConfig().DefaultBufferSize),
|
2019-10-17 06:46:07 +00:00
|
|
|
attestationPool: make(map[[32]byte]*dbpb.AttestationContainer),
|
|
|
|
recentAttestationBitlist: newRecentAttestationMultiMap(),
|
2019-12-06 20:06:37 +00:00
|
|
|
attestationLockCache: attLockCache,
|
2019-01-30 19:48:50 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-18 23:34:49 +00:00
|
|
|
// Start an beacon block operation pool service's main event loop.
|
2019-01-30 19:48:50 +00:00
|
|
|
func (s *Service) Start() {
|
2019-02-18 23:34:49 +00:00
|
|
|
go s.removeOperations()
|
2019-01-30 19:48:50 +00:00
|
|
|
}
|
|
|
|
|
2019-02-18 23:34:49 +00:00
|
|
|
// Stop the beacon block operation pool service's main event loop
|
2019-01-30 19:48:50 +00:00
|
|
|
// and associated goroutines.
|
|
|
|
func (s *Service) Stop() error {
|
|
|
|
defer s.cancel()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2019-01-31 18:54:24 +00:00
|
|
|
// Status returns the current service error if there's any.
|
|
|
|
func (s *Service) Status() error {
|
|
|
|
if s.error != nil {
|
|
|
|
return s.error
|
2019-01-30 19:48:50 +00:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2019-02-18 23:34:49 +00:00
|
|
|
// removeOperations removes the processed operations from operation pool and DB.
|
|
|
|
func (s *Service) removeOperations() {
|
|
|
|
incomingBlockSub := s.incomingProcessedBlockFeed.Subscribe(s.incomingProcessedBlock)
|
|
|
|
defer incomingBlockSub.Unsubscribe()
|
|
|
|
|
|
|
|
for {
|
2019-09-03 18:06:35 +00:00
|
|
|
ctx := context.TODO()
|
2019-02-18 23:34:49 +00:00
|
|
|
select {
|
2019-10-01 20:05:17 +00:00
|
|
|
case err := <-incomingBlockSub.Err():
|
|
|
|
log.WithError(err).Error("Subscription to incoming block sub failed")
|
2019-09-25 18:01:55 +00:00
|
|
|
return
|
2019-02-18 23:34:49 +00:00
|
|
|
case <-s.ctx.Done():
|
2019-10-01 20:05:17 +00:00
|
|
|
log.Debug("Context closed, exiting goroutine")
|
2019-09-25 18:01:55 +00:00
|
|
|
return
|
2019-02-18 23:34:49 +00:00
|
|
|
// Listen for processed block from the block chain service.
|
|
|
|
case block := <-s.incomingProcessedBlock:
|
2019-09-03 18:06:35 +00:00
|
|
|
handler.SafelyHandleMessage(ctx, s.handleProcessedBlock, block)
|
2019-01-30 19:48:50 +00:00
|
|
|
}
|
2019-02-18 23:34:49 +00:00
|
|
|
}
|
|
|
|
}
|