mirror of
https://gitlab.com/pulsechaincom/go-pulse.git
synced 2025-01-16 07:18:20 +00:00
30cd5c1854
Package p2p/enode provides a generalized representation of p2p nodes which can contain arbitrary information in key/value pairs. It is also the new home for the node database. The "v4" identity scheme is also moved here from p2p/enr to remove the dependency on Ethereum crypto from that package. Record signature handling is changed significantly. The identity scheme registry is removed and acceptable schemes must be passed to any method that needs identity. This means records must now be validated explicitly after decoding. The enode API is designed to make signature handling easy and safe: most APIs around the codebase work with enode.Node, which is a wrapper around a valid record. Going from enr.Record to enode.Node requires a valid signature. * p2p/discover: port to p2p/enode This ports the discovery code to the new node representation in p2p/enode. The wire protocol is unchanged, this can be considered a refactoring change. The Kademlia table can now deal with nodes using an arbitrary identity scheme. This requires a few incompatible API changes: - Table.Lookup is not available anymore. It used to take a public key as argument because v4 protocol requires one. Its replacement is LookupRandom. - Table.Resolve takes *enode.Node instead of NodeID. This is also for v4 protocol compatibility because nodes cannot be looked up by ID alone. - Types Node and NodeID are gone. Further commits in the series will be fixes all over the the codebase to deal with those removals. * p2p: port to p2p/enode and discovery changes This adapts package p2p to the changes in p2p/discover. All uses of discover.Node and discover.NodeID are replaced by their equivalents from p2p/enode. New API is added to retrieve the enode.Node instance of a peer. The behavior of Server.Self with discovery disabled is improved. It now tries much harder to report a working IP address, falling back to 127.0.0.1 if no suitable address can be determined through other means. These changes were needed for tests of other packages later in the series. * p2p/simulations, p2p/testing: port to p2p/enode No surprises here, mostly replacements of discover.Node, discover.NodeID with their new equivalents. The 'interesting' API changes are: - testing.ProtocolSession tracks complete nodes, not just their IDs. - adapters.NodeConfig has a new method to create a complete node. These changes were needed to make swarm tests work. Note that the NodeID change makes the code incompatible with old simulation snapshots. * whisper/whisperv5, whisper/whisperv6: port to p2p/enode This port was easy because whisper uses []byte for node IDs and URL strings in the API. * eth: port to p2p/enode Again, easy to port because eth uses strings for node IDs and doesn't care about node information in any way. * les: port to p2p/enode Apart from replacing discover.NodeID with enode.ID, most changes are in the server pool code. It now deals with complete nodes instead of (Pubkey, IP, Port) triples. The database format is unchanged for now, but we should probably change it to use the node database later. * node: port to p2p/enode This change simply replaces discover.Node and discover.NodeID with their new equivalents. * swarm/network: port to p2p/enode Swarm has its own node address representation, BzzAddr, containing both an overlay address (the hash of a secp256k1 public key) and an underlay address (enode:// URL). There are no changes to the BzzAddr format in this commit, but certain operations such as creating a BzzAddr from a node ID are now impossible because node IDs aren't public keys anymore. Most swarm-related changes in the series remove uses of NewAddrFromNodeID, replacing it with NewAddr which takes a complete node as argument. ToOverlayAddr is removed because we can just use the node ID directly.
946 lines
29 KiB
Go
946 lines
29 KiB
Go
// Copyright 2018 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package pss
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"crypto/rand"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/crypto"
|
|
"github.com/ethereum/go-ethereum/metrics"
|
|
"github.com/ethereum/go-ethereum/p2p"
|
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
|
"github.com/ethereum/go-ethereum/p2p/protocols"
|
|
"github.com/ethereum/go-ethereum/rpc"
|
|
"github.com/ethereum/go-ethereum/swarm/log"
|
|
"github.com/ethereum/go-ethereum/swarm/network"
|
|
"github.com/ethereum/go-ethereum/swarm/pot"
|
|
"github.com/ethereum/go-ethereum/swarm/storage"
|
|
whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
|
|
)
|
|
|
|
const (
|
|
defaultPaddingByteSize = 16
|
|
DefaultMsgTTL = time.Second * 120
|
|
defaultDigestCacheTTL = time.Second * 10
|
|
defaultSymKeyCacheCapacity = 512
|
|
digestLength = 32 // byte length of digest used for pss cache (currently same as swarm chunk hash)
|
|
defaultWhisperWorkTime = 3
|
|
defaultWhisperPoW = 0.0000000001
|
|
defaultMaxMsgSize = 1024 * 1024
|
|
defaultCleanInterval = time.Second * 60 * 10
|
|
defaultOutboxCapacity = 100000
|
|
pssProtocolName = "pss"
|
|
pssVersion = 2
|
|
hasherCount = 8
|
|
)
|
|
|
|
var (
|
|
addressLength = len(pot.Address{})
|
|
)
|
|
|
|
// cache is used for preventing backwards routing
|
|
// will also be instrumental in flood guard mechanism
|
|
// and mailbox implementation
|
|
type pssCacheEntry struct {
|
|
expiresAt time.Time
|
|
}
|
|
|
|
// abstraction to enable access to p2p.protocols.Peer.Send
|
|
type senderPeer interface {
|
|
Info() *p2p.PeerInfo
|
|
ID() enode.ID
|
|
Address() []byte
|
|
Send(context.Context, interface{}) error
|
|
}
|
|
|
|
// per-key peer related information
|
|
// member `protected` prevents garbage collection of the instance
|
|
type pssPeer struct {
|
|
lastSeen time.Time
|
|
address *PssAddress
|
|
protected bool
|
|
}
|
|
|
|
// Pss configuration parameters
|
|
type PssParams struct {
|
|
MsgTTL time.Duration
|
|
CacheTTL time.Duration
|
|
privateKey *ecdsa.PrivateKey
|
|
SymKeyCacheCapacity int
|
|
AllowRaw bool // If true, enables sending and receiving messages without builtin pss encryption
|
|
}
|
|
|
|
// Sane defaults for Pss
|
|
func NewPssParams() *PssParams {
|
|
return &PssParams{
|
|
MsgTTL: DefaultMsgTTL,
|
|
CacheTTL: defaultDigestCacheTTL,
|
|
SymKeyCacheCapacity: defaultSymKeyCacheCapacity,
|
|
}
|
|
}
|
|
|
|
func (params *PssParams) WithPrivateKey(privatekey *ecdsa.PrivateKey) *PssParams {
|
|
params.privateKey = privatekey
|
|
return params
|
|
}
|
|
|
|
// Toplevel pss object, takes care of message sending, receiving, decryption and encryption, message handler dispatchers and message forwarding.
|
|
//
|
|
// Implements node.Service
|
|
type Pss struct {
|
|
*network.Kademlia // we can get the Kademlia address from this
|
|
privateKey *ecdsa.PrivateKey // pss can have it's own independent key
|
|
w *whisper.Whisper // key and encryption backend
|
|
auxAPIs []rpc.API // builtins (handshake, test) can add APIs
|
|
|
|
// sending and forwarding
|
|
fwdPool map[string]*protocols.Peer // keep track of all peers sitting on the pssmsg routing layer
|
|
fwdPoolMu sync.RWMutex
|
|
fwdCache map[pssDigest]pssCacheEntry // checksum of unique fields from pssmsg mapped to expiry, cache to determine whether to drop msg
|
|
fwdCacheMu sync.RWMutex
|
|
cacheTTL time.Duration // how long to keep messages in fwdCache (not implemented)
|
|
msgTTL time.Duration
|
|
paddingByteSize int
|
|
capstring string
|
|
outbox chan *PssMsg
|
|
|
|
// keys and peers
|
|
pubKeyPool map[string]map[Topic]*pssPeer // mapping of hex public keys to peer address by topic.
|
|
pubKeyPoolMu sync.RWMutex
|
|
symKeyPool map[string]map[Topic]*pssPeer // mapping of symkeyids to peer address by topic.
|
|
symKeyPoolMu sync.RWMutex
|
|
symKeyDecryptCache []*string // fast lookup of symkeys recently used for decryption; last used is on top of stack
|
|
symKeyDecryptCacheCursor int // modular cursor pointing to last used, wraps on symKeyDecryptCache array
|
|
symKeyDecryptCacheCapacity int // max amount of symkeys to keep.
|
|
|
|
// message handling
|
|
handlers map[Topic]map[*Handler]bool // topic and version based pss payload handlers. See pss.Handle()
|
|
handlersMu sync.RWMutex
|
|
allowRaw bool
|
|
hashPool sync.Pool
|
|
|
|
// process
|
|
quitC chan struct{}
|
|
}
|
|
|
|
func (p *Pss) String() string {
|
|
return fmt.Sprintf("pss: addr %x, pubkey %v", p.BaseAddr(), common.ToHex(crypto.FromECDSAPub(&p.privateKey.PublicKey)))
|
|
}
|
|
|
|
// Creates a new Pss instance.
|
|
//
|
|
// In addition to params, it takes a swarm network Kademlia
|
|
// and a FileStore storage for message cache storage.
|
|
func NewPss(k *network.Kademlia, params *PssParams) (*Pss, error) {
|
|
if params.privateKey == nil {
|
|
return nil, errors.New("missing private key for pss")
|
|
}
|
|
cap := p2p.Cap{
|
|
Name: pssProtocolName,
|
|
Version: pssVersion,
|
|
}
|
|
ps := &Pss{
|
|
Kademlia: k,
|
|
privateKey: params.privateKey,
|
|
w: whisper.New(&whisper.DefaultConfig),
|
|
quitC: make(chan struct{}),
|
|
|
|
fwdPool: make(map[string]*protocols.Peer),
|
|
fwdCache: make(map[pssDigest]pssCacheEntry),
|
|
cacheTTL: params.CacheTTL,
|
|
msgTTL: params.MsgTTL,
|
|
paddingByteSize: defaultPaddingByteSize,
|
|
capstring: cap.String(),
|
|
outbox: make(chan *PssMsg, defaultOutboxCapacity),
|
|
|
|
pubKeyPool: make(map[string]map[Topic]*pssPeer),
|
|
symKeyPool: make(map[string]map[Topic]*pssPeer),
|
|
symKeyDecryptCache: make([]*string, params.SymKeyCacheCapacity),
|
|
symKeyDecryptCacheCapacity: params.SymKeyCacheCapacity,
|
|
|
|
handlers: make(map[Topic]map[*Handler]bool),
|
|
allowRaw: params.AllowRaw,
|
|
hashPool: sync.Pool{
|
|
New: func() interface{} {
|
|
return storage.MakeHashFunc(storage.DefaultHash)()
|
|
},
|
|
},
|
|
}
|
|
|
|
for i := 0; i < hasherCount; i++ {
|
|
hashfunc := storage.MakeHashFunc(storage.DefaultHash)()
|
|
ps.hashPool.Put(hashfunc)
|
|
}
|
|
|
|
return ps, nil
|
|
}
|
|
|
|
/////////////////////////////////////////////////////////////////////
|
|
// SECTION: node.Service interface
|
|
/////////////////////////////////////////////////////////////////////
|
|
|
|
func (p *Pss) Start(srv *p2p.Server) error {
|
|
go func() {
|
|
ticker := time.NewTicker(defaultCleanInterval)
|
|
cacheTicker := time.NewTicker(p.cacheTTL)
|
|
defer ticker.Stop()
|
|
defer cacheTicker.Stop()
|
|
for {
|
|
select {
|
|
case <-cacheTicker.C:
|
|
p.cleanFwdCache()
|
|
case <-ticker.C:
|
|
p.cleanKeys()
|
|
case <-p.quitC:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
go func() {
|
|
for {
|
|
select {
|
|
case msg := <-p.outbox:
|
|
err := p.forward(msg)
|
|
if err != nil {
|
|
log.Error(err.Error())
|
|
metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1)
|
|
}
|
|
case <-p.quitC:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
log.Info("Started Pss")
|
|
log.Info("Loaded EC keys", "pubkey", common.ToHex(crypto.FromECDSAPub(p.PublicKey())), "secp256", common.ToHex(crypto.CompressPubkey(p.PublicKey())))
|
|
return nil
|
|
}
|
|
|
|
func (p *Pss) Stop() error {
|
|
log.Info("Pss shutting down")
|
|
close(p.quitC)
|
|
return nil
|
|
}
|
|
|
|
var pssSpec = &protocols.Spec{
|
|
Name: pssProtocolName,
|
|
Version: pssVersion,
|
|
MaxMsgSize: defaultMaxMsgSize,
|
|
Messages: []interface{}{
|
|
PssMsg{},
|
|
},
|
|
}
|
|
|
|
func (p *Pss) Protocols() []p2p.Protocol {
|
|
return []p2p.Protocol{
|
|
{
|
|
Name: pssSpec.Name,
|
|
Version: pssSpec.Version,
|
|
Length: pssSpec.Length(),
|
|
Run: p.Run,
|
|
},
|
|
}
|
|
}
|
|
|
|
func (p *Pss) Run(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
|
|
pp := protocols.NewPeer(peer, rw, pssSpec)
|
|
p.fwdPoolMu.Lock()
|
|
p.fwdPool[peer.Info().ID] = pp
|
|
p.fwdPoolMu.Unlock()
|
|
return pp.Run(p.handlePssMsg)
|
|
}
|
|
|
|
func (p *Pss) APIs() []rpc.API {
|
|
apis := []rpc.API{
|
|
{
|
|
Namespace: "pss",
|
|
Version: "1.0",
|
|
Service: NewAPI(p),
|
|
Public: true,
|
|
},
|
|
}
|
|
apis = append(apis, p.auxAPIs...)
|
|
return apis
|
|
}
|
|
|
|
// add API methods to the pss API
|
|
// must be run before node is started
|
|
func (p *Pss) addAPI(api rpc.API) {
|
|
p.auxAPIs = append(p.auxAPIs, api)
|
|
}
|
|
|
|
// Returns the swarm Kademlia address of the pss node
|
|
func (p *Pss) BaseAddr() []byte {
|
|
return p.Kademlia.BaseAddr()
|
|
}
|
|
|
|
// Returns the pss node's public key
|
|
func (p *Pss) PublicKey() *ecdsa.PublicKey {
|
|
return &p.privateKey.PublicKey
|
|
}
|
|
|
|
/////////////////////////////////////////////////////////////////////
|
|
// SECTION: Message handling
|
|
/////////////////////////////////////////////////////////////////////
|
|
|
|
// Links a handler function to a Topic
|
|
//
|
|
// All incoming messages with an envelope Topic matching the
|
|
// topic specified will be passed to the given Handler function.
|
|
//
|
|
// There may be an arbitrary number of handler functions per topic.
|
|
//
|
|
// Returns a deregister function which needs to be called to
|
|
// deregister the handler,
|
|
func (p *Pss) Register(topic *Topic, handler Handler) func() {
|
|
p.handlersMu.Lock()
|
|
defer p.handlersMu.Unlock()
|
|
handlers := p.handlers[*topic]
|
|
if handlers == nil {
|
|
handlers = make(map[*Handler]bool)
|
|
p.handlers[*topic] = handlers
|
|
}
|
|
handlers[&handler] = true
|
|
return func() { p.deregister(topic, &handler) }
|
|
}
|
|
func (p *Pss) deregister(topic *Topic, h *Handler) {
|
|
p.handlersMu.Lock()
|
|
defer p.handlersMu.Unlock()
|
|
handlers := p.handlers[*topic]
|
|
if len(handlers) == 1 {
|
|
delete(p.handlers, *topic)
|
|
return
|
|
}
|
|
delete(handlers, h)
|
|
}
|
|
|
|
// get all registered handlers for respective topics
|
|
func (p *Pss) getHandlers(topic Topic) map[*Handler]bool {
|
|
p.handlersMu.RLock()
|
|
defer p.handlersMu.RUnlock()
|
|
return p.handlers[topic]
|
|
}
|
|
|
|
// Filters incoming messages for processing or forwarding.
|
|
// Check if address partially matches
|
|
// If yes, it CAN be for us, and we process it
|
|
// Only passes error to pss protocol handler if payload is not valid pssmsg
|
|
func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error {
|
|
metrics.GetOrRegisterCounter("pss.handlepssmsg", nil).Inc(1)
|
|
|
|
pssmsg, ok := msg.(*PssMsg)
|
|
|
|
if !ok {
|
|
return fmt.Errorf("invalid message type. Expected *PssMsg, got %T ", msg)
|
|
}
|
|
if int64(pssmsg.Expire) < time.Now().Unix() {
|
|
metrics.GetOrRegisterCounter("pss.expire", nil).Inc(1)
|
|
log.Warn("pss filtered expired message", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", common.ToHex(pssmsg.To))
|
|
return nil
|
|
}
|
|
if p.checkFwdCache(pssmsg) {
|
|
log.Trace("pss relay block-cache match (process)", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", (common.ToHex(pssmsg.To)))
|
|
return nil
|
|
}
|
|
p.addFwdCache(pssmsg)
|
|
|
|
if !p.isSelfPossibleRecipient(pssmsg) {
|
|
log.Trace("pss was for someone else :'( ... forwarding", "pss", common.ToHex(p.BaseAddr()))
|
|
return p.enqueue(pssmsg)
|
|
}
|
|
|
|
log.Trace("pss for us, yay! ... let's process!", "pss", common.ToHex(p.BaseAddr()))
|
|
if err := p.process(pssmsg); err != nil {
|
|
qerr := p.enqueue(pssmsg)
|
|
if qerr != nil {
|
|
return fmt.Errorf("process fail: processerr %v, queueerr: %v", err, qerr)
|
|
}
|
|
}
|
|
return nil
|
|
|
|
}
|
|
|
|
// Entry point to processing a message for which the current node can be the intended recipient.
|
|
// Attempts symmetric and asymmetric decryption with stored keys.
|
|
// Dispatches message to all handlers matching the message topic
|
|
func (p *Pss) process(pssmsg *PssMsg) error {
|
|
metrics.GetOrRegisterCounter("pss.process", nil).Inc(1)
|
|
|
|
var err error
|
|
var recvmsg *whisper.ReceivedMessage
|
|
var payload []byte
|
|
var from *PssAddress
|
|
var asymmetric bool
|
|
var keyid string
|
|
var keyFunc func(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, *PssAddress, error)
|
|
|
|
envelope := pssmsg.Payload
|
|
psstopic := Topic(envelope.Topic)
|
|
if pssmsg.isRaw() {
|
|
if !p.allowRaw {
|
|
return errors.New("raw message support disabled")
|
|
}
|
|
payload = pssmsg.Payload.Data
|
|
} else {
|
|
if pssmsg.isSym() {
|
|
keyFunc = p.processSym
|
|
} else {
|
|
asymmetric = true
|
|
keyFunc = p.processAsym
|
|
}
|
|
|
|
recvmsg, keyid, from, err = keyFunc(envelope)
|
|
if err != nil {
|
|
return errors.New("Decryption failed")
|
|
}
|
|
payload = recvmsg.Payload
|
|
}
|
|
|
|
if len(pssmsg.To) < addressLength {
|
|
if err := p.enqueue(pssmsg); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
p.executeHandlers(psstopic, payload, from, asymmetric, keyid)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
func (p *Pss) executeHandlers(topic Topic, payload []byte, from *PssAddress, asymmetric bool, keyid string) {
|
|
handlers := p.getHandlers(topic)
|
|
peer := p2p.NewPeer(enode.ID{}, fmt.Sprintf("%x", from), []p2p.Cap{})
|
|
for f := range handlers {
|
|
err := (*f)(payload, peer, asymmetric, keyid)
|
|
if err != nil {
|
|
log.Warn("Pss handler %p failed: %v", f, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// will return false if using partial address
|
|
func (p *Pss) isSelfRecipient(msg *PssMsg) bool {
|
|
return bytes.Equal(msg.To, p.Kademlia.BaseAddr())
|
|
}
|
|
|
|
// test match of leftmost bytes in given message to node's Kademlia address
|
|
func (p *Pss) isSelfPossibleRecipient(msg *PssMsg) bool {
|
|
local := p.Kademlia.BaseAddr()
|
|
return bytes.Equal(msg.To, local[:len(msg.To)])
|
|
}
|
|
|
|
/////////////////////////////////////////////////////////////////////
|
|
// SECTION: Encryption
|
|
/////////////////////////////////////////////////////////////////////
|
|
|
|
// Links a peer ECDSA public key to a topic
|
|
//
|
|
// This is required for asymmetric message exchange
|
|
// on the given topic
|
|
//
|
|
// The value in `address` will be used as a routing hint for the
|
|
// public key / topic association
|
|
func (p *Pss) SetPeerPublicKey(pubkey *ecdsa.PublicKey, topic Topic, address *PssAddress) error {
|
|
pubkeybytes := crypto.FromECDSAPub(pubkey)
|
|
if len(pubkeybytes) == 0 {
|
|
return fmt.Errorf("invalid public key: %v", pubkey)
|
|
}
|
|
pubkeyid := common.ToHex(pubkeybytes)
|
|
psp := &pssPeer{
|
|
address: address,
|
|
}
|
|
p.pubKeyPoolMu.Lock()
|
|
if _, ok := p.pubKeyPool[pubkeyid]; !ok {
|
|
p.pubKeyPool[pubkeyid] = make(map[Topic]*pssPeer)
|
|
}
|
|
p.pubKeyPool[pubkeyid][topic] = psp
|
|
p.pubKeyPoolMu.Unlock()
|
|
log.Trace("added pubkey", "pubkeyid", pubkeyid, "topic", topic, "address", common.ToHex(*address))
|
|
return nil
|
|
}
|
|
|
|
// Automatically generate a new symkey for a topic and address hint
|
|
func (p *Pss) GenerateSymmetricKey(topic Topic, address *PssAddress, addToCache bool) (string, error) {
|
|
keyid, err := p.w.GenerateSymKey()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
p.addSymmetricKeyToPool(keyid, topic, address, addToCache, false)
|
|
return keyid, nil
|
|
}
|
|
|
|
// Links a peer symmetric key (arbitrary byte sequence) to a topic
|
|
//
|
|
// This is required for symmetrically encrypted message exchange
|
|
// on the given topic
|
|
//
|
|
// The key is stored in the whisper backend.
|
|
//
|
|
// If addtocache is set to true, the key will be added to the cache of keys
|
|
// used to attempt symmetric decryption of incoming messages.
|
|
//
|
|
// Returns a string id that can be used to retrieve the key bytes
|
|
// from the whisper backend (see pss.GetSymmetricKey())
|
|
func (p *Pss) SetSymmetricKey(key []byte, topic Topic, address *PssAddress, addtocache bool) (string, error) {
|
|
return p.setSymmetricKey(key, topic, address, addtocache, true)
|
|
}
|
|
|
|
func (p *Pss) setSymmetricKey(key []byte, topic Topic, address *PssAddress, addtocache bool, protected bool) (string, error) {
|
|
keyid, err := p.w.AddSymKeyDirect(key)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
p.addSymmetricKeyToPool(keyid, topic, address, addtocache, protected)
|
|
return keyid, nil
|
|
}
|
|
|
|
// adds a symmetric key to the pss key pool, and optionally adds the key
|
|
// to the collection of keys used to attempt symmetric decryption of
|
|
// incoming messages
|
|
func (p *Pss) addSymmetricKeyToPool(keyid string, topic Topic, address *PssAddress, addtocache bool, protected bool) {
|
|
psp := &pssPeer{
|
|
address: address,
|
|
protected: protected,
|
|
}
|
|
p.symKeyPoolMu.Lock()
|
|
if _, ok := p.symKeyPool[keyid]; !ok {
|
|
p.symKeyPool[keyid] = make(map[Topic]*pssPeer)
|
|
}
|
|
p.symKeyPool[keyid][topic] = psp
|
|
p.symKeyPoolMu.Unlock()
|
|
if addtocache {
|
|
p.symKeyDecryptCacheCursor++
|
|
p.symKeyDecryptCache[p.symKeyDecryptCacheCursor%cap(p.symKeyDecryptCache)] = &keyid
|
|
}
|
|
key, _ := p.GetSymmetricKey(keyid)
|
|
log.Trace("added symkey", "symkeyid", keyid, "symkey", common.ToHex(key), "topic", topic, "address", fmt.Sprintf("%p", address), "cache", addtocache)
|
|
}
|
|
|
|
// Returns a symmetric key byte seqyence stored in the whisper backend
|
|
// by its unique id
|
|
//
|
|
// Passes on the error value from the whisper backend
|
|
func (p *Pss) GetSymmetricKey(symkeyid string) ([]byte, error) {
|
|
symkey, err := p.w.GetSymKey(symkeyid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return symkey, nil
|
|
}
|
|
|
|
// Returns all recorded topic and address combination for a specific public key
|
|
func (p *Pss) GetPublickeyPeers(keyid string) (topic []Topic, address []PssAddress, err error) {
|
|
p.pubKeyPoolMu.RLock()
|
|
defer p.pubKeyPoolMu.RUnlock()
|
|
for t, peer := range p.pubKeyPool[keyid] {
|
|
topic = append(topic, t)
|
|
address = append(address, *peer.address)
|
|
}
|
|
|
|
return topic, address, nil
|
|
}
|
|
|
|
func (p *Pss) getPeerAddress(keyid string, topic Topic) (PssAddress, error) {
|
|
p.pubKeyPoolMu.RLock()
|
|
defer p.pubKeyPoolMu.RUnlock()
|
|
if peers, ok := p.pubKeyPool[keyid]; ok {
|
|
if t, ok := peers[topic]; ok {
|
|
return *t.address, nil
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("peer with pubkey %s, topic %x not found", keyid, topic)
|
|
}
|
|
|
|
// Attempt to decrypt, validate and unpack a
|
|
// symmetrically encrypted message
|
|
// If successful, returns the unpacked whisper ReceivedMessage struct
|
|
// encapsulating the decrypted message, and the whisper backend id
|
|
// of the symmetric key used to decrypt the message.
|
|
// It fails if decryption of the message fails or if the message is corrupted
|
|
func (p *Pss) processSym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, *PssAddress, error) {
|
|
metrics.GetOrRegisterCounter("pss.process.sym", nil).Inc(1)
|
|
|
|
for i := p.symKeyDecryptCacheCursor; i > p.symKeyDecryptCacheCursor-cap(p.symKeyDecryptCache) && i > 0; i-- {
|
|
symkeyid := p.symKeyDecryptCache[i%cap(p.symKeyDecryptCache)]
|
|
symkey, err := p.w.GetSymKey(*symkeyid)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
recvmsg, err := envelope.OpenSymmetric(symkey)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if !recvmsg.Validate() {
|
|
return nil, "", nil, fmt.Errorf("symmetrically encrypted message has invalid signature or is corrupt")
|
|
}
|
|
p.symKeyPoolMu.Lock()
|
|
from := p.symKeyPool[*symkeyid][Topic(envelope.Topic)].address
|
|
p.symKeyPoolMu.Unlock()
|
|
p.symKeyDecryptCacheCursor++
|
|
p.symKeyDecryptCache[p.symKeyDecryptCacheCursor%cap(p.symKeyDecryptCache)] = symkeyid
|
|
return recvmsg, *symkeyid, from, nil
|
|
}
|
|
return nil, "", nil, fmt.Errorf("could not decrypt message")
|
|
}
|
|
|
|
// Attempt to decrypt, validate and unpack an
|
|
// asymmetrically encrypted message
|
|
// If successful, returns the unpacked whisper ReceivedMessage struct
|
|
// encapsulating the decrypted message, and the byte representation of
|
|
// the public key used to decrypt the message.
|
|
// It fails if decryption of message fails, or if the message is corrupted
|
|
func (p *Pss) processAsym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, *PssAddress, error) {
|
|
metrics.GetOrRegisterCounter("pss.process.asym", nil).Inc(1)
|
|
|
|
recvmsg, err := envelope.OpenAsymmetric(p.privateKey)
|
|
if err != nil {
|
|
return nil, "", nil, fmt.Errorf("could not decrypt message: %s", err)
|
|
}
|
|
// check signature (if signed), strip padding
|
|
if !recvmsg.Validate() {
|
|
return nil, "", nil, fmt.Errorf("invalid message")
|
|
}
|
|
pubkeyid := common.ToHex(crypto.FromECDSAPub(recvmsg.Src))
|
|
var from *PssAddress
|
|
p.pubKeyPoolMu.Lock()
|
|
if p.pubKeyPool[pubkeyid][Topic(envelope.Topic)] != nil {
|
|
from = p.pubKeyPool[pubkeyid][Topic(envelope.Topic)].address
|
|
}
|
|
p.pubKeyPoolMu.Unlock()
|
|
return recvmsg, pubkeyid, from, nil
|
|
}
|
|
|
|
// Symkey garbage collection
|
|
// a key is removed if:
|
|
// - it is not marked as protected
|
|
// - it is not in the incoming decryption cache
|
|
func (p *Pss) cleanKeys() (count int) {
|
|
for keyid, peertopics := range p.symKeyPool {
|
|
var expiredtopics []Topic
|
|
for topic, psp := range peertopics {
|
|
if psp.protected {
|
|
continue
|
|
}
|
|
|
|
var match bool
|
|
for i := p.symKeyDecryptCacheCursor; i > p.symKeyDecryptCacheCursor-cap(p.symKeyDecryptCache) && i > 0; i-- {
|
|
cacheid := p.symKeyDecryptCache[i%cap(p.symKeyDecryptCache)]
|
|
if *cacheid == keyid {
|
|
match = true
|
|
}
|
|
}
|
|
if !match {
|
|
expiredtopics = append(expiredtopics, topic)
|
|
}
|
|
}
|
|
for _, topic := range expiredtopics {
|
|
p.symKeyPoolMu.Lock()
|
|
delete(p.symKeyPool[keyid], topic)
|
|
log.Trace("symkey cleanup deletion", "symkeyid", keyid, "topic", topic, "val", p.symKeyPool[keyid])
|
|
p.symKeyPoolMu.Unlock()
|
|
count++
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
/////////////////////////////////////////////////////////////////////
|
|
// SECTION: Message sending
|
|
/////////////////////////////////////////////////////////////////////
|
|
|
|
func (p *Pss) enqueue(msg *PssMsg) error {
|
|
select {
|
|
case p.outbox <- msg:
|
|
return nil
|
|
default:
|
|
}
|
|
|
|
metrics.GetOrRegisterCounter("pss.enqueue.outbox.full", nil).Inc(1)
|
|
return errors.New("outbox full")
|
|
}
|
|
|
|
// Send a raw message (any encryption is responsibility of calling client)
|
|
//
|
|
// Will fail if raw messages are disallowed
|
|
func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error {
|
|
if !p.allowRaw {
|
|
return errors.New("Raw messages not enabled")
|
|
}
|
|
pssMsgParams := &msgParams{
|
|
raw: true,
|
|
}
|
|
payload := &whisper.Envelope{
|
|
Data: msg,
|
|
Topic: whisper.TopicType(topic),
|
|
}
|
|
pssMsg := newPssMsg(pssMsgParams)
|
|
pssMsg.To = address
|
|
pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix())
|
|
pssMsg.Payload = payload
|
|
p.addFwdCache(pssMsg)
|
|
return p.enqueue(pssMsg)
|
|
}
|
|
|
|
// Send a message using symmetric encryption
|
|
//
|
|
// Fails if the key id does not match any of the stored symmetric keys
|
|
func (p *Pss) SendSym(symkeyid string, topic Topic, msg []byte) error {
|
|
symkey, err := p.GetSymmetricKey(symkeyid)
|
|
if err != nil {
|
|
return fmt.Errorf("missing valid send symkey %s: %v", symkeyid, err)
|
|
}
|
|
p.symKeyPoolMu.Lock()
|
|
psp, ok := p.symKeyPool[symkeyid][topic]
|
|
p.symKeyPoolMu.Unlock()
|
|
if !ok {
|
|
return fmt.Errorf("invalid topic '%s' for symkey '%s'", topic.String(), symkeyid)
|
|
} else if psp.address == nil {
|
|
return fmt.Errorf("no address hint for topic '%s' symkey '%s'", topic.String(), symkeyid)
|
|
}
|
|
err = p.send(*psp.address, topic, msg, false, symkey)
|
|
return err
|
|
}
|
|
|
|
// Send a message using asymmetric encryption
|
|
//
|
|
// Fails if the key id does not match any in of the stored public keys
|
|
func (p *Pss) SendAsym(pubkeyid string, topic Topic, msg []byte) error {
|
|
if _, err := crypto.UnmarshalPubkey(common.FromHex(pubkeyid)); err != nil {
|
|
return fmt.Errorf("Cannot unmarshal pubkey: %x", pubkeyid)
|
|
}
|
|
p.pubKeyPoolMu.Lock()
|
|
psp, ok := p.pubKeyPool[pubkeyid][topic]
|
|
p.pubKeyPoolMu.Unlock()
|
|
if !ok {
|
|
return fmt.Errorf("invalid topic '%s' for pubkey '%s'", topic.String(), pubkeyid)
|
|
} else if psp.address == nil {
|
|
return fmt.Errorf("no address hint for topic '%s' pubkey '%s'", topic.String(), pubkeyid)
|
|
}
|
|
go func() {
|
|
p.send(*psp.address, topic, msg, true, common.FromHex(pubkeyid))
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
// Send is payload agnostic, and will accept any byte slice as payload
|
|
// It generates an whisper envelope for the specified recipient and topic,
|
|
// and wraps the message payload in it.
|
|
// TODO: Implement proper message padding
|
|
func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []byte) error {
|
|
metrics.GetOrRegisterCounter("pss.send", nil).Inc(1)
|
|
|
|
if key == nil || bytes.Equal(key, []byte{}) {
|
|
return fmt.Errorf("Zero length key passed to pss send")
|
|
}
|
|
padding := make([]byte, p.paddingByteSize)
|
|
c, err := rand.Read(padding)
|
|
if err != nil {
|
|
return err
|
|
} else if c < p.paddingByteSize {
|
|
return fmt.Errorf("invalid padding length: %d", c)
|
|
}
|
|
wparams := &whisper.MessageParams{
|
|
TTL: defaultWhisperTTL,
|
|
Src: p.privateKey,
|
|
Topic: whisper.TopicType(topic),
|
|
WorkTime: defaultWhisperWorkTime,
|
|
PoW: defaultWhisperPoW,
|
|
Payload: msg,
|
|
Padding: padding,
|
|
}
|
|
if asymmetric {
|
|
pk, err := crypto.UnmarshalPubkey(key)
|
|
if err != nil {
|
|
return fmt.Errorf("Cannot unmarshal pubkey: %x", key)
|
|
}
|
|
wparams.Dst = pk
|
|
} else {
|
|
wparams.KeySym = key
|
|
}
|
|
// set up outgoing message container, which does encryption and envelope wrapping
|
|
woutmsg, err := whisper.NewSentMessage(wparams)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to generate whisper message encapsulation: %v", err)
|
|
}
|
|
// performs encryption.
|
|
// Does NOT perform / performs negligible PoW due to very low difficulty setting
|
|
// after this the message is ready for sending
|
|
envelope, err := woutmsg.Wrap(wparams)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to perform whisper encryption: %v", err)
|
|
}
|
|
log.Trace("pssmsg whisper done", "env", envelope, "wparams payload", common.ToHex(wparams.Payload), "to", common.ToHex(to), "asym", asymmetric, "key", common.ToHex(key))
|
|
|
|
// prepare for devp2p transport
|
|
pssMsgParams := &msgParams{
|
|
sym: !asymmetric,
|
|
}
|
|
pssMsg := newPssMsg(pssMsgParams)
|
|
pssMsg.To = to
|
|
pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix())
|
|
pssMsg.Payload = envelope
|
|
return p.enqueue(pssMsg)
|
|
}
|
|
|
|
// Forwards a pss message to the peer(s) closest to the to recipient address in the PssMsg struct
|
|
// The recipient address can be of any length, and the byte slice will be matched to the MSB slice
|
|
// of the peer address of the equivalent length.
|
|
func (p *Pss) forward(msg *PssMsg) error {
|
|
metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1)
|
|
|
|
to := make([]byte, addressLength)
|
|
copy(to[:len(msg.To)], msg.To)
|
|
|
|
// send with kademlia
|
|
// find the closest peer to the recipient and attempt to send
|
|
sent := 0
|
|
p.Kademlia.EachConn(to, 256, func(sp *network.Peer, po int, isproxbin bool) bool {
|
|
info := sp.Info()
|
|
|
|
// check if the peer is running pss
|
|
var ispss bool
|
|
for _, cap := range info.Caps {
|
|
if cap == p.capstring {
|
|
ispss = true
|
|
break
|
|
}
|
|
}
|
|
if !ispss {
|
|
log.Trace("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps)
|
|
return true
|
|
}
|
|
|
|
// get the protocol peer from the forwarding peer cache
|
|
sendMsg := fmt.Sprintf("MSG TO %x FROM %x VIA %x", to, p.BaseAddr(), sp.Address())
|
|
p.fwdPoolMu.RLock()
|
|
pp := p.fwdPool[sp.Info().ID]
|
|
p.fwdPoolMu.RUnlock()
|
|
|
|
// attempt to send the message
|
|
err := pp.Send(context.TODO(), msg)
|
|
if err != nil {
|
|
metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1)
|
|
log.Error(err.Error())
|
|
return true
|
|
}
|
|
sent++
|
|
log.Trace(fmt.Sprintf("%v: successfully forwarded", sendMsg))
|
|
|
|
// continue forwarding if:
|
|
// - if the peer is end recipient but the full address has not been disclosed
|
|
// - if the peer address matches the partial address fully
|
|
// - if the peer is in proxbin
|
|
if len(msg.To) < addressLength && bytes.Equal(msg.To, sp.Address()[:len(msg.To)]) {
|
|
log.Trace(fmt.Sprintf("Pss keep forwarding: Partial address + full partial match"))
|
|
return true
|
|
} else if isproxbin {
|
|
log.Trace(fmt.Sprintf("%x is in proxbin, keep forwarding", common.ToHex(sp.Address())))
|
|
return true
|
|
}
|
|
// at this point we stop forwarding, and the state is as follows:
|
|
// - the peer is end recipient and we have full address
|
|
// - we are not in proxbin (directed routing)
|
|
// - partial addresses don't fully match
|
|
return false
|
|
})
|
|
|
|
if sent == 0 {
|
|
log.Debug("unable to forward to any peers")
|
|
if err := p.enqueue(msg); err != nil {
|
|
metrics.GetOrRegisterCounter("pss.forward.enqueue.error", nil).Inc(1)
|
|
log.Error(err.Error())
|
|
return err
|
|
}
|
|
}
|
|
|
|
// cache the message
|
|
p.addFwdCache(msg)
|
|
return nil
|
|
}
|
|
|
|
/////////////////////////////////////////////////////////////////////
|
|
// SECTION: Caching
|
|
/////////////////////////////////////////////////////////////////////
|
|
|
|
// cleanFwdCache is used to periodically remove expired entries from the forward cache
|
|
func (p *Pss) cleanFwdCache() {
|
|
metrics.GetOrRegisterCounter("pss.cleanfwdcache", nil).Inc(1)
|
|
p.fwdCacheMu.Lock()
|
|
defer p.fwdCacheMu.Unlock()
|
|
for k, v := range p.fwdCache {
|
|
if v.expiresAt.Before(time.Now()) {
|
|
delete(p.fwdCache, k)
|
|
}
|
|
}
|
|
}
|
|
|
|
// add a message to the cache
|
|
func (p *Pss) addFwdCache(msg *PssMsg) error {
|
|
metrics.GetOrRegisterCounter("pss.addfwdcache", nil).Inc(1)
|
|
|
|
var entry pssCacheEntry
|
|
var ok bool
|
|
|
|
p.fwdCacheMu.Lock()
|
|
defer p.fwdCacheMu.Unlock()
|
|
|
|
digest := p.digest(msg)
|
|
if entry, ok = p.fwdCache[digest]; !ok {
|
|
entry = pssCacheEntry{}
|
|
}
|
|
entry.expiresAt = time.Now().Add(p.cacheTTL)
|
|
p.fwdCache[digest] = entry
|
|
return nil
|
|
}
|
|
|
|
// check if message is in the cache
|
|
func (p *Pss) checkFwdCache(msg *PssMsg) bool {
|
|
p.fwdCacheMu.Lock()
|
|
defer p.fwdCacheMu.Unlock()
|
|
|
|
digest := p.digest(msg)
|
|
entry, ok := p.fwdCache[digest]
|
|
if ok {
|
|
if entry.expiresAt.After(time.Now()) {
|
|
log.Trace("unexpired cache", "digest", fmt.Sprintf("%x", digest))
|
|
metrics.GetOrRegisterCounter("pss.checkfwdcache.unexpired", nil).Inc(1)
|
|
return true
|
|
}
|
|
metrics.GetOrRegisterCounter("pss.checkfwdcache.expired", nil).Inc(1)
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Digest of message
|
|
func (p *Pss) digest(msg *PssMsg) pssDigest {
|
|
hasher := p.hashPool.Get().(storage.SwarmHash)
|
|
defer p.hashPool.Put(hasher)
|
|
hasher.Reset()
|
|
hasher.Write(msg.serialize())
|
|
digest := pssDigest{}
|
|
key := hasher.Sum(nil)
|
|
copy(digest[:], key[:digestLength])
|
|
return digest
|
|
}
|