mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-22 03:30:37 +00:00
parent
44d9fba09a
commit
d29a3c86b4
@ -28,6 +28,7 @@ import (
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
@ -89,6 +90,49 @@ type Service struct {
|
||||
|
||||
pongCh chan struct{} // Pong notifications are fed into this channel
|
||||
histCh chan []uint64 // History request block numbers are fed into this channel
|
||||
|
||||
}
|
||||
|
||||
// connWrapper is a wrapper to prevent concurrent-write or concurrent-read on the
|
||||
// websocket.
|
||||
// From Gorilla websocket docs:
|
||||
// Connections support one concurrent reader and one concurrent writer.
|
||||
// Applications are responsible for ensuring that no more than one goroutine calls the write methods
|
||||
// - NextWriter, SetWriteDeadline, WriteMessage, WriteJSON, EnableWriteCompression, SetCompressionLevel
|
||||
// concurrently and that no more than one goroutine calls the read methods
|
||||
// - NextReader, SetReadDeadline, ReadMessage, ReadJSON, SetPongHandler, SetPingHandler
|
||||
// concurrently.
|
||||
// The Close and WriteControl methods can be called concurrently with all other methods.
|
||||
//
|
||||
// The connWrapper uses a single mutex for both reading and writing.
|
||||
type connWrapper struct {
|
||||
conn *websocket.Conn
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func newConnectionWrapper(conn *websocket.Conn) *connWrapper {
|
||||
return &connWrapper{conn: conn}
|
||||
}
|
||||
|
||||
// WriteJSON wraps corresponding method on the websocket but is safe for concurrent calling
|
||||
func (w *connWrapper) WriteJSON(v interface{}) error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
return w.conn.WriteJSON(v)
|
||||
}
|
||||
|
||||
// ReadJSON wraps corresponding method on the websocket but is safe for concurrent calling
|
||||
func (w *connWrapper) ReadJSON(v interface{}) error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
return w.conn.ReadJSON(v)
|
||||
}
|
||||
|
||||
// Close wraps corresponding method on the websocket but is safe for concurrent calling
|
||||
func (w *connWrapper) Close() error {
|
||||
// The Close and WriteControl methods can be called concurrently with all other methods,
|
||||
// so the mutex is not used here
|
||||
return w.conn.Close()
|
||||
}
|
||||
|
||||
// New returns a monitoring service ready for stats reporting.
|
||||
@ -200,17 +244,19 @@ func (s *Service) loop() {
|
||||
case <-errTimer.C:
|
||||
// Establish a websocket connection to the server on any supported URL
|
||||
var (
|
||||
conn *websocket.Conn
|
||||
conn *connWrapper
|
||||
err error
|
||||
)
|
||||
dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second}
|
||||
header := make(http.Header)
|
||||
header.Set("origin", "http://localhost")
|
||||
for _, url := range urls {
|
||||
conn, _, err = dialer.Dial(url, header)
|
||||
if err == nil {
|
||||
c, _, e := dialer.Dial(url, header)
|
||||
if e == nil {
|
||||
conn = newConnectionWrapper(c)
|
||||
break
|
||||
}
|
||||
err = e
|
||||
}
|
||||
if err != nil {
|
||||
log.Warn("Stats server unreachable", "err", err)
|
||||
@ -278,7 +324,7 @@ func (s *Service) loop() {
|
||||
// from the network socket. If any of them match an active request, it forwards
|
||||
// it, if they themselves are requests it initiates a reply, and lastly it drops
|
||||
// unknown packets.
|
||||
func (s *Service) readLoop(conn *websocket.Conn) {
|
||||
func (s *Service) readLoop(conn *connWrapper) {
|
||||
// If the read loop exists, close the connection
|
||||
defer conn.Close()
|
||||
|
||||
@ -387,7 +433,7 @@ type authMsg struct {
|
||||
}
|
||||
|
||||
// login tries to authorize the client at the remote server.
|
||||
func (s *Service) login(conn *websocket.Conn) error {
|
||||
func (s *Service) login(conn *connWrapper) error {
|
||||
// Construct and send the login authentication
|
||||
infos := s.server.NodeInfo()
|
||||
|
||||
@ -429,7 +475,7 @@ func (s *Service) login(conn *websocket.Conn) error {
|
||||
// report collects all possible data to report and send it to the stats server.
|
||||
// This should only be used on reconnects or rarely to avoid overloading the
|
||||
// server. Use the individual methods for reporting subscribed events.
|
||||
func (s *Service) report(conn *websocket.Conn) error {
|
||||
func (s *Service) report(conn *connWrapper) error {
|
||||
if err := s.reportLatency(conn); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -447,7 +493,7 @@ func (s *Service) report(conn *websocket.Conn) error {
|
||||
|
||||
// reportLatency sends a ping request to the server, measures the RTT time and
|
||||
// finally sends a latency update.
|
||||
func (s *Service) reportLatency(conn *websocket.Conn) error {
|
||||
func (s *Service) reportLatency(conn *connWrapper) error {
|
||||
// Send the current time to the ethstats server
|
||||
start := time.Now()
|
||||
|
||||
@ -516,7 +562,7 @@ func (s uncleStats) MarshalJSON() ([]byte, error) {
|
||||
}
|
||||
|
||||
// reportBlock retrieves the current chain head and reports it to the stats server.
|
||||
func (s *Service) reportBlock(conn *websocket.Conn, block *types.Block) error {
|
||||
func (s *Service) reportBlock(conn *connWrapper, block *types.Block) error {
|
||||
// Gather the block details from the header or block chain
|
||||
details := s.assembleBlockStats(block)
|
||||
|
||||
@ -582,7 +628,7 @@ func (s *Service) assembleBlockStats(block *types.Block) *blockStats {
|
||||
|
||||
// reportHistory retrieves the most recent batch of blocks and reports it to the
|
||||
// stats server.
|
||||
func (s *Service) reportHistory(conn *websocket.Conn, list []uint64) error {
|
||||
func (s *Service) reportHistory(conn *connWrapper, list []uint64) error {
|
||||
// Figure out the indexes that need reporting
|
||||
indexes := make([]uint64, 0, historyUpdateRange)
|
||||
if len(list) > 0 {
|
||||
@ -643,7 +689,7 @@ type pendStats struct {
|
||||
|
||||
// reportPending retrieves the current number of pending transactions and reports
|
||||
// it to the stats server.
|
||||
func (s *Service) reportPending(conn *websocket.Conn) error {
|
||||
func (s *Service) reportPending(conn *connWrapper) error {
|
||||
// Retrieve the pending count from the local blockchain
|
||||
pending, _ := s.backend.Stats()
|
||||
// Assemble the transaction stats and send it to the server
|
||||
@ -674,7 +720,7 @@ type nodeStats struct {
|
||||
|
||||
// reportStats retrieves various stats about the node at the networking and
|
||||
// mining layer and reports it to the stats server.
|
||||
func (s *Service) reportStats(conn *websocket.Conn) error {
|
||||
func (s *Service) reportStats(conn *connWrapper) error {
|
||||
// Gather the syncing and mining infos from the local miner instance
|
||||
var (
|
||||
mining bool
|
||||
|
Loading…
Reference in New Issue
Block a user