mirror of
https://gitlab.com/pulsechaincom/go-pulse.git
synced 2025-01-15 06:48:20 +00:00
dcca613a0b
* swarm: initial instrumentation with go-metrics * swarm: initialise metrics collection and add ResettingTimer to HTTP requests * swarm: update metrics flags names. remove redundant Timer. * swarm: rename method for periodically updating gauges * swarm: finalise metrics after feedback * swarm/network: always init kad metrics containers * swarm/network: off-by-one index in metrics containers * swarm, metrics: resolved conflicts
233 lines
8.3 KiB
Go
233 lines
8.3 KiB
Go
// Copyright 2016 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package network
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/ethereum/go-ethereum/metrics"
|
|
"github.com/ethereum/go-ethereum/swarm/storage"
|
|
)
|
|
|
|
//metrics variables
|
|
var (
|
|
syncReceiveCount = metrics.NewRegisteredCounter("network.sync.recv.count", nil)
|
|
syncReceiveIgnore = metrics.NewRegisteredCounter("network.sync.recv.ignore", nil)
|
|
syncSendCount = metrics.NewRegisteredCounter("network.sync.send.count", nil)
|
|
syncSendRefused = metrics.NewRegisteredCounter("network.sync.send.refused", nil)
|
|
syncSendNotFound = metrics.NewRegisteredCounter("network.sync.send.notfound", nil)
|
|
)
|
|
|
|
// Handler for storage/retrieval related protocol requests
|
|
// implements the StorageHandler interface used by the bzz protocol
|
|
type Depo struct {
|
|
hashfunc storage.SwarmHasher
|
|
localStore storage.ChunkStore
|
|
netStore storage.ChunkStore
|
|
}
|
|
|
|
func NewDepo(hash storage.SwarmHasher, localStore, remoteStore storage.ChunkStore) *Depo {
|
|
return &Depo{
|
|
hashfunc: hash,
|
|
localStore: localStore,
|
|
netStore: remoteStore, // entrypoint internal
|
|
}
|
|
}
|
|
|
|
// Handles UnsyncedKeysMsg after msg decoding - unsynced hashes upto sync state
|
|
// * the remote sync state is just stored and handled in protocol
|
|
// * filters through the new syncRequests and send the ones missing
|
|
// * back immediately as a deliveryRequest message
|
|
// * empty message just pings back for more (is this needed?)
|
|
// * strict signed sync states may be needed.
|
|
func (self *Depo) HandleUnsyncedKeysMsg(req *unsyncedKeysMsgData, p *peer) error {
|
|
unsynced := req.Unsynced
|
|
var missing []*syncRequest
|
|
var chunk *storage.Chunk
|
|
var err error
|
|
for _, req := range unsynced {
|
|
// skip keys that are found,
|
|
chunk, err = self.localStore.Get(req.Key[:])
|
|
if err != nil || chunk.SData == nil {
|
|
missing = append(missing, req)
|
|
}
|
|
}
|
|
log.Debug(fmt.Sprintf("Depo.HandleUnsyncedKeysMsg: received %v unsynced keys: %v missing. new state: %v", len(unsynced), len(missing), req.State))
|
|
log.Trace(fmt.Sprintf("Depo.HandleUnsyncedKeysMsg: received %v", unsynced))
|
|
// send delivery request with missing keys
|
|
err = p.deliveryRequest(missing)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// set peers state to persist
|
|
p.syncState = req.State
|
|
return nil
|
|
}
|
|
|
|
// Handles deliveryRequestMsg
|
|
// * serves actual chunks asked by the remote peer
|
|
// by pushing to the delivery queue (sync db) of the correct priority
|
|
// (remote peer is free to reprioritize)
|
|
// * the message implies remote peer wants more, so trigger for
|
|
// * new outgoing unsynced keys message is fired
|
|
func (self *Depo) HandleDeliveryRequestMsg(req *deliveryRequestMsgData, p *peer) error {
|
|
deliver := req.Deliver
|
|
// queue the actual delivery of a chunk ()
|
|
log.Trace(fmt.Sprintf("Depo.HandleDeliveryRequestMsg: received %v delivery requests: %v", len(deliver), deliver))
|
|
for _, sreq := range deliver {
|
|
// TODO: look up in cache here or in deliveries
|
|
// priorities are taken from the message so the remote party can
|
|
// reprioritise to at their leisure
|
|
// r = self.pullCached(sreq.Key) // pulls and deletes from cache
|
|
Push(p, sreq.Key, sreq.Priority)
|
|
}
|
|
|
|
// sends it out as unsyncedKeysMsg
|
|
p.syncer.sendUnsyncedKeys()
|
|
return nil
|
|
}
|
|
|
|
// the entrypoint for store requests coming from the bzz wire protocol
|
|
// if key found locally, return. otherwise
|
|
// remote is untrusted, so hash is verified and chunk passed on to NetStore
|
|
func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) {
|
|
var islocal bool
|
|
req.from = p
|
|
chunk, err := self.localStore.Get(req.Key)
|
|
switch {
|
|
case err != nil:
|
|
log.Trace(fmt.Sprintf("Depo.handleStoreRequest: %v not found locally. create new chunk/request", req.Key))
|
|
// not found in memory cache, ie., a genuine store request
|
|
// create chunk
|
|
syncReceiveCount.Inc(1)
|
|
chunk = storage.NewChunk(req.Key, nil)
|
|
|
|
case chunk.SData == nil:
|
|
// found chunk in memory store, needs the data, validate now
|
|
log.Trace(fmt.Sprintf("Depo.HandleStoreRequest: %v. request entry found", req))
|
|
|
|
default:
|
|
// data is found, store request ignored
|
|
// this should update access count?
|
|
syncReceiveIgnore.Inc(1)
|
|
log.Trace(fmt.Sprintf("Depo.HandleStoreRequest: %v found locally. ignore.", req))
|
|
islocal = true
|
|
//return
|
|
}
|
|
|
|
hasher := self.hashfunc()
|
|
hasher.Write(req.SData)
|
|
if !bytes.Equal(hasher.Sum(nil), req.Key) {
|
|
// data does not validate, ignore
|
|
// TODO: peer should be penalised/dropped?
|
|
log.Warn(fmt.Sprintf("Depo.HandleStoreRequest: chunk invalid. store request ignored: %v", req))
|
|
return
|
|
}
|
|
|
|
if islocal {
|
|
return
|
|
}
|
|
// update chunk with size and data
|
|
chunk.SData = req.SData // protocol validates that SData is minimum 9 bytes long (int64 size + at least one byte of data)
|
|
chunk.Size = int64(binary.LittleEndian.Uint64(req.SData[0:8]))
|
|
log.Trace(fmt.Sprintf("delivery of %v from %v", chunk, p))
|
|
chunk.Source = p
|
|
self.netStore.Put(chunk)
|
|
}
|
|
|
|
// entrypoint for retrieve requests coming from the bzz wire protocol
|
|
// checks swap balance - return if peer has no credit
|
|
func (self *Depo) HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer) {
|
|
req.from = p
|
|
// swap - record credit for 1 request
|
|
// note that only charge actual reqsearches
|
|
var err error
|
|
if p.swap != nil {
|
|
err = p.swap.Add(1)
|
|
}
|
|
if err != nil {
|
|
log.Warn(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - cannot process request: %v", req.Key.Log(), err))
|
|
return
|
|
}
|
|
|
|
// call storage.NetStore#Get which
|
|
// blocks until local retrieval finished
|
|
// launches cloud retrieval
|
|
chunk, _ := self.netStore.Get(req.Key)
|
|
req = self.strategyUpdateRequest(chunk.Req, req)
|
|
// check if we can immediately deliver
|
|
if chunk.SData != nil {
|
|
log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content found, delivering...", req.Key.Log()))
|
|
|
|
if req.MaxSize == 0 || int64(req.MaxSize) >= chunk.Size {
|
|
sreq := &storeRequestMsgData{
|
|
Id: req.Id,
|
|
Key: chunk.Key,
|
|
SData: chunk.SData,
|
|
requestTimeout: req.timeout, //
|
|
}
|
|
syncSendCount.Inc(1)
|
|
p.syncer.addRequest(sreq, DeliverReq)
|
|
} else {
|
|
syncSendRefused.Inc(1)
|
|
log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content found, not wanted", req.Key.Log()))
|
|
}
|
|
} else {
|
|
syncSendNotFound.Inc(1)
|
|
log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content not found locally. asked swarm for help. will get back", req.Key.Log()))
|
|
}
|
|
}
|
|
|
|
// add peer request the chunk and decides the timeout for the response if still searching
|
|
func (self *Depo) strategyUpdateRequest(rs *storage.RequestStatus, origReq *retrieveRequestMsgData) (req *retrieveRequestMsgData) {
|
|
log.Trace(fmt.Sprintf("Depo.strategyUpdateRequest: key %v", origReq.Key.Log()))
|
|
// we do not create an alternative one
|
|
req = origReq
|
|
if rs != nil {
|
|
self.addRequester(rs, req)
|
|
req.setTimeout(self.searchTimeout(rs, req))
|
|
}
|
|
return
|
|
}
|
|
|
|
// decides the timeout promise sent with the immediate peers response to a retrieve request
|
|
// if timeout is explicitly set and expired
|
|
func (self *Depo) searchTimeout(rs *storage.RequestStatus, req *retrieveRequestMsgData) (timeout *time.Time) {
|
|
reqt := req.getTimeout()
|
|
t := time.Now().Add(searchTimeout)
|
|
if reqt != nil && reqt.Before(t) {
|
|
return reqt
|
|
} else {
|
|
return &t
|
|
}
|
|
}
|
|
|
|
/*
|
|
adds a new peer to an existing open request
|
|
only add if less than requesterCount peers forwarded the same request id so far
|
|
note this is done irrespective of status (searching or found)
|
|
*/
|
|
func (self *Depo) addRequester(rs *storage.RequestStatus, req *retrieveRequestMsgData) {
|
|
log.Trace(fmt.Sprintf("Depo.addRequester: key %v - add peer to req.Id %v", req.Key.Log(), req.Id))
|
|
list := rs.Requesters[req.Id]
|
|
rs.Requesters[req.Id] = append(list, req)
|
|
}
|