go-pulse/swarm/pss/pss.go
Felix Lange 30cd5c1854
all: new p2p node representation (#17643)
Package p2p/enode provides a generalized representation of p2p nodes
which can contain arbitrary information in key/value pairs. It is also
the new home for the node database. The "v4" identity scheme is also
moved here from p2p/enr to remove the dependency on Ethereum crypto from
that package.

Record signature handling is changed significantly. The identity scheme
registry is removed and acceptable schemes must be passed to any method
that needs identity. This means records must now be validated explicitly
after decoding.

The enode API is designed to make signature handling easy and safe: most
APIs around the codebase work with enode.Node, which is a wrapper around
a valid record. Going from enr.Record to enode.Node requires a valid
signature.

* p2p/discover: port to p2p/enode

This ports the discovery code to the new node representation in
p2p/enode. The wire protocol is unchanged, this can be considered a
refactoring change. The Kademlia table can now deal with nodes using an
arbitrary identity scheme. This requires a few incompatible API changes:

  - Table.Lookup is not available anymore. It used to take a public key
    as argument because v4 protocol requires one. Its replacement is
    LookupRandom.
  - Table.Resolve takes *enode.Node instead of NodeID. This is also for
    v4 protocol compatibility because nodes cannot be looked up by ID
    alone.
  - Types Node and NodeID are gone. Further commits in the series will be
    fixes all over the the codebase to deal with those removals.

* p2p: port to p2p/enode and discovery changes

This adapts package p2p to the changes in p2p/discover. All uses of
discover.Node and discover.NodeID are replaced by their equivalents from
p2p/enode.

New API is added to retrieve the enode.Node instance of a peer. The
behavior of Server.Self with discovery disabled is improved. It now
tries much harder to report a working IP address, falling back to
127.0.0.1 if no suitable address can be determined through other means.
These changes were needed for tests of other packages later in the
series.

* p2p/simulations, p2p/testing: port to p2p/enode

No surprises here, mostly replacements of discover.Node, discover.NodeID
with their new equivalents. The 'interesting' API changes are:

 - testing.ProtocolSession tracks complete nodes, not just their IDs.
 - adapters.NodeConfig has a new method to create a complete node.

These changes were needed to make swarm tests work.

Note that the NodeID change makes the code incompatible with old
simulation snapshots.

* whisper/whisperv5, whisper/whisperv6: port to p2p/enode

This port was easy because whisper uses []byte for node IDs and
URL strings in the API.

* eth: port to p2p/enode

Again, easy to port because eth uses strings for node IDs and doesn't
care about node information in any way.

* les: port to p2p/enode

Apart from replacing discover.NodeID with enode.ID, most changes are in
the server pool code. It now deals with complete nodes instead
of (Pubkey, IP, Port) triples. The database format is unchanged for now,
but we should probably change it to use the node database later.

* node: port to p2p/enode

This change simply replaces discover.Node and discover.NodeID with their
new equivalents.

* swarm/network: port to p2p/enode

Swarm has its own node address representation, BzzAddr, containing both
an overlay address (the hash of a secp256k1 public key) and an underlay
address (enode:// URL).

There are no changes to the BzzAddr format in this commit, but certain
operations such as creating a BzzAddr from a node ID are now impossible
because node IDs aren't public keys anymore.

Most swarm-related changes in the series remove uses of
NewAddrFromNodeID, replacing it with NewAddr which takes a complete node
as argument. ToOverlayAddr is removed because we can just use the node
ID directly.
2018-09-25 00:59:00 +02:00

946 lines
29 KiB
Go

// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package pss
import (
"bytes"
"context"
"crypto/ecdsa"
"crypto/rand"
"errors"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/pot"
"github.com/ethereum/go-ethereum/swarm/storage"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
)
const (
defaultPaddingByteSize = 16
DefaultMsgTTL = time.Second * 120
defaultDigestCacheTTL = time.Second * 10
defaultSymKeyCacheCapacity = 512
digestLength = 32 // byte length of digest used for pss cache (currently same as swarm chunk hash)
defaultWhisperWorkTime = 3
defaultWhisperPoW = 0.0000000001
defaultMaxMsgSize = 1024 * 1024
defaultCleanInterval = time.Second * 60 * 10
defaultOutboxCapacity = 100000
pssProtocolName = "pss"
pssVersion = 2
hasherCount = 8
)
var (
addressLength = len(pot.Address{})
)
// cache is used for preventing backwards routing
// will also be instrumental in flood guard mechanism
// and mailbox implementation
type pssCacheEntry struct {
expiresAt time.Time
}
// abstraction to enable access to p2p.protocols.Peer.Send
type senderPeer interface {
Info() *p2p.PeerInfo
ID() enode.ID
Address() []byte
Send(context.Context, interface{}) error
}
// per-key peer related information
// member `protected` prevents garbage collection of the instance
type pssPeer struct {
lastSeen time.Time
address *PssAddress
protected bool
}
// Pss configuration parameters
type PssParams struct {
MsgTTL time.Duration
CacheTTL time.Duration
privateKey *ecdsa.PrivateKey
SymKeyCacheCapacity int
AllowRaw bool // If true, enables sending and receiving messages without builtin pss encryption
}
// Sane defaults for Pss
func NewPssParams() *PssParams {
return &PssParams{
MsgTTL: DefaultMsgTTL,
CacheTTL: defaultDigestCacheTTL,
SymKeyCacheCapacity: defaultSymKeyCacheCapacity,
}
}
func (params *PssParams) WithPrivateKey(privatekey *ecdsa.PrivateKey) *PssParams {
params.privateKey = privatekey
return params
}
// Toplevel pss object, takes care of message sending, receiving, decryption and encryption, message handler dispatchers and message forwarding.
//
// Implements node.Service
type Pss struct {
*network.Kademlia // we can get the Kademlia address from this
privateKey *ecdsa.PrivateKey // pss can have it's own independent key
w *whisper.Whisper // key and encryption backend
auxAPIs []rpc.API // builtins (handshake, test) can add APIs
// sending and forwarding
fwdPool map[string]*protocols.Peer // keep track of all peers sitting on the pssmsg routing layer
fwdPoolMu sync.RWMutex
fwdCache map[pssDigest]pssCacheEntry // checksum of unique fields from pssmsg mapped to expiry, cache to determine whether to drop msg
fwdCacheMu sync.RWMutex
cacheTTL time.Duration // how long to keep messages in fwdCache (not implemented)
msgTTL time.Duration
paddingByteSize int
capstring string
outbox chan *PssMsg
// keys and peers
pubKeyPool map[string]map[Topic]*pssPeer // mapping of hex public keys to peer address by topic.
pubKeyPoolMu sync.RWMutex
symKeyPool map[string]map[Topic]*pssPeer // mapping of symkeyids to peer address by topic.
symKeyPoolMu sync.RWMutex
symKeyDecryptCache []*string // fast lookup of symkeys recently used for decryption; last used is on top of stack
symKeyDecryptCacheCursor int // modular cursor pointing to last used, wraps on symKeyDecryptCache array
symKeyDecryptCacheCapacity int // max amount of symkeys to keep.
// message handling
handlers map[Topic]map[*Handler]bool // topic and version based pss payload handlers. See pss.Handle()
handlersMu sync.RWMutex
allowRaw bool
hashPool sync.Pool
// process
quitC chan struct{}
}
func (p *Pss) String() string {
return fmt.Sprintf("pss: addr %x, pubkey %v", p.BaseAddr(), common.ToHex(crypto.FromECDSAPub(&p.privateKey.PublicKey)))
}
// Creates a new Pss instance.
//
// In addition to params, it takes a swarm network Kademlia
// and a FileStore storage for message cache storage.
func NewPss(k *network.Kademlia, params *PssParams) (*Pss, error) {
if params.privateKey == nil {
return nil, errors.New("missing private key for pss")
}
cap := p2p.Cap{
Name: pssProtocolName,
Version: pssVersion,
}
ps := &Pss{
Kademlia: k,
privateKey: params.privateKey,
w: whisper.New(&whisper.DefaultConfig),
quitC: make(chan struct{}),
fwdPool: make(map[string]*protocols.Peer),
fwdCache: make(map[pssDigest]pssCacheEntry),
cacheTTL: params.CacheTTL,
msgTTL: params.MsgTTL,
paddingByteSize: defaultPaddingByteSize,
capstring: cap.String(),
outbox: make(chan *PssMsg, defaultOutboxCapacity),
pubKeyPool: make(map[string]map[Topic]*pssPeer),
symKeyPool: make(map[string]map[Topic]*pssPeer),
symKeyDecryptCache: make([]*string, params.SymKeyCacheCapacity),
symKeyDecryptCacheCapacity: params.SymKeyCacheCapacity,
handlers: make(map[Topic]map[*Handler]bool),
allowRaw: params.AllowRaw,
hashPool: sync.Pool{
New: func() interface{} {
return storage.MakeHashFunc(storage.DefaultHash)()
},
},
}
for i := 0; i < hasherCount; i++ {
hashfunc := storage.MakeHashFunc(storage.DefaultHash)()
ps.hashPool.Put(hashfunc)
}
return ps, nil
}
/////////////////////////////////////////////////////////////////////
// SECTION: node.Service interface
/////////////////////////////////////////////////////////////////////
func (p *Pss) Start(srv *p2p.Server) error {
go func() {
ticker := time.NewTicker(defaultCleanInterval)
cacheTicker := time.NewTicker(p.cacheTTL)
defer ticker.Stop()
defer cacheTicker.Stop()
for {
select {
case <-cacheTicker.C:
p.cleanFwdCache()
case <-ticker.C:
p.cleanKeys()
case <-p.quitC:
return
}
}
}()
go func() {
for {
select {
case msg := <-p.outbox:
err := p.forward(msg)
if err != nil {
log.Error(err.Error())
metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1)
}
case <-p.quitC:
return
}
}
}()
log.Info("Started Pss")
log.Info("Loaded EC keys", "pubkey", common.ToHex(crypto.FromECDSAPub(p.PublicKey())), "secp256", common.ToHex(crypto.CompressPubkey(p.PublicKey())))
return nil
}
func (p *Pss) Stop() error {
log.Info("Pss shutting down")
close(p.quitC)
return nil
}
var pssSpec = &protocols.Spec{
Name: pssProtocolName,
Version: pssVersion,
MaxMsgSize: defaultMaxMsgSize,
Messages: []interface{}{
PssMsg{},
},
}
func (p *Pss) Protocols() []p2p.Protocol {
return []p2p.Protocol{
{
Name: pssSpec.Name,
Version: pssSpec.Version,
Length: pssSpec.Length(),
Run: p.Run,
},
}
}
func (p *Pss) Run(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
pp := protocols.NewPeer(peer, rw, pssSpec)
p.fwdPoolMu.Lock()
p.fwdPool[peer.Info().ID] = pp
p.fwdPoolMu.Unlock()
return pp.Run(p.handlePssMsg)
}
func (p *Pss) APIs() []rpc.API {
apis := []rpc.API{
{
Namespace: "pss",
Version: "1.0",
Service: NewAPI(p),
Public: true,
},
}
apis = append(apis, p.auxAPIs...)
return apis
}
// add API methods to the pss API
// must be run before node is started
func (p *Pss) addAPI(api rpc.API) {
p.auxAPIs = append(p.auxAPIs, api)
}
// Returns the swarm Kademlia address of the pss node
func (p *Pss) BaseAddr() []byte {
return p.Kademlia.BaseAddr()
}
// Returns the pss node's public key
func (p *Pss) PublicKey() *ecdsa.PublicKey {
return &p.privateKey.PublicKey
}
/////////////////////////////////////////////////////////////////////
// SECTION: Message handling
/////////////////////////////////////////////////////////////////////
// Links a handler function to a Topic
//
// All incoming messages with an envelope Topic matching the
// topic specified will be passed to the given Handler function.
//
// There may be an arbitrary number of handler functions per topic.
//
// Returns a deregister function which needs to be called to
// deregister the handler,
func (p *Pss) Register(topic *Topic, handler Handler) func() {
p.handlersMu.Lock()
defer p.handlersMu.Unlock()
handlers := p.handlers[*topic]
if handlers == nil {
handlers = make(map[*Handler]bool)
p.handlers[*topic] = handlers
}
handlers[&handler] = true
return func() { p.deregister(topic, &handler) }
}
func (p *Pss) deregister(topic *Topic, h *Handler) {
p.handlersMu.Lock()
defer p.handlersMu.Unlock()
handlers := p.handlers[*topic]
if len(handlers) == 1 {
delete(p.handlers, *topic)
return
}
delete(handlers, h)
}
// get all registered handlers for respective topics
func (p *Pss) getHandlers(topic Topic) map[*Handler]bool {
p.handlersMu.RLock()
defer p.handlersMu.RUnlock()
return p.handlers[topic]
}
// Filters incoming messages for processing or forwarding.
// Check if address partially matches
// If yes, it CAN be for us, and we process it
// Only passes error to pss protocol handler if payload is not valid pssmsg
func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error {
metrics.GetOrRegisterCounter("pss.handlepssmsg", nil).Inc(1)
pssmsg, ok := msg.(*PssMsg)
if !ok {
return fmt.Errorf("invalid message type. Expected *PssMsg, got %T ", msg)
}
if int64(pssmsg.Expire) < time.Now().Unix() {
metrics.GetOrRegisterCounter("pss.expire", nil).Inc(1)
log.Warn("pss filtered expired message", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", common.ToHex(pssmsg.To))
return nil
}
if p.checkFwdCache(pssmsg) {
log.Trace("pss relay block-cache match (process)", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", (common.ToHex(pssmsg.To)))
return nil
}
p.addFwdCache(pssmsg)
if !p.isSelfPossibleRecipient(pssmsg) {
log.Trace("pss was for someone else :'( ... forwarding", "pss", common.ToHex(p.BaseAddr()))
return p.enqueue(pssmsg)
}
log.Trace("pss for us, yay! ... let's process!", "pss", common.ToHex(p.BaseAddr()))
if err := p.process(pssmsg); err != nil {
qerr := p.enqueue(pssmsg)
if qerr != nil {
return fmt.Errorf("process fail: processerr %v, queueerr: %v", err, qerr)
}
}
return nil
}
// Entry point to processing a message for which the current node can be the intended recipient.
// Attempts symmetric and asymmetric decryption with stored keys.
// Dispatches message to all handlers matching the message topic
func (p *Pss) process(pssmsg *PssMsg) error {
metrics.GetOrRegisterCounter("pss.process", nil).Inc(1)
var err error
var recvmsg *whisper.ReceivedMessage
var payload []byte
var from *PssAddress
var asymmetric bool
var keyid string
var keyFunc func(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, *PssAddress, error)
envelope := pssmsg.Payload
psstopic := Topic(envelope.Topic)
if pssmsg.isRaw() {
if !p.allowRaw {
return errors.New("raw message support disabled")
}
payload = pssmsg.Payload.Data
} else {
if pssmsg.isSym() {
keyFunc = p.processSym
} else {
asymmetric = true
keyFunc = p.processAsym
}
recvmsg, keyid, from, err = keyFunc(envelope)
if err != nil {
return errors.New("Decryption failed")
}
payload = recvmsg.Payload
}
if len(pssmsg.To) < addressLength {
if err := p.enqueue(pssmsg); err != nil {
return err
}
}
p.executeHandlers(psstopic, payload, from, asymmetric, keyid)
return nil
}
func (p *Pss) executeHandlers(topic Topic, payload []byte, from *PssAddress, asymmetric bool, keyid string) {
handlers := p.getHandlers(topic)
peer := p2p.NewPeer(enode.ID{}, fmt.Sprintf("%x", from), []p2p.Cap{})
for f := range handlers {
err := (*f)(payload, peer, asymmetric, keyid)
if err != nil {
log.Warn("Pss handler %p failed: %v", f, err)
}
}
}
// will return false if using partial address
func (p *Pss) isSelfRecipient(msg *PssMsg) bool {
return bytes.Equal(msg.To, p.Kademlia.BaseAddr())
}
// test match of leftmost bytes in given message to node's Kademlia address
func (p *Pss) isSelfPossibleRecipient(msg *PssMsg) bool {
local := p.Kademlia.BaseAddr()
return bytes.Equal(msg.To, local[:len(msg.To)])
}
/////////////////////////////////////////////////////////////////////
// SECTION: Encryption
/////////////////////////////////////////////////////////////////////
// Links a peer ECDSA public key to a topic
//
// This is required for asymmetric message exchange
// on the given topic
//
// The value in `address` will be used as a routing hint for the
// public key / topic association
func (p *Pss) SetPeerPublicKey(pubkey *ecdsa.PublicKey, topic Topic, address *PssAddress) error {
pubkeybytes := crypto.FromECDSAPub(pubkey)
if len(pubkeybytes) == 0 {
return fmt.Errorf("invalid public key: %v", pubkey)
}
pubkeyid := common.ToHex(pubkeybytes)
psp := &pssPeer{
address: address,
}
p.pubKeyPoolMu.Lock()
if _, ok := p.pubKeyPool[pubkeyid]; !ok {
p.pubKeyPool[pubkeyid] = make(map[Topic]*pssPeer)
}
p.pubKeyPool[pubkeyid][topic] = psp
p.pubKeyPoolMu.Unlock()
log.Trace("added pubkey", "pubkeyid", pubkeyid, "topic", topic, "address", common.ToHex(*address))
return nil
}
// Automatically generate a new symkey for a topic and address hint
func (p *Pss) GenerateSymmetricKey(topic Topic, address *PssAddress, addToCache bool) (string, error) {
keyid, err := p.w.GenerateSymKey()
if err != nil {
return "", err
}
p.addSymmetricKeyToPool(keyid, topic, address, addToCache, false)
return keyid, nil
}
// Links a peer symmetric key (arbitrary byte sequence) to a topic
//
// This is required for symmetrically encrypted message exchange
// on the given topic
//
// The key is stored in the whisper backend.
//
// If addtocache is set to true, the key will be added to the cache of keys
// used to attempt symmetric decryption of incoming messages.
//
// Returns a string id that can be used to retrieve the key bytes
// from the whisper backend (see pss.GetSymmetricKey())
func (p *Pss) SetSymmetricKey(key []byte, topic Topic, address *PssAddress, addtocache bool) (string, error) {
return p.setSymmetricKey(key, topic, address, addtocache, true)
}
func (p *Pss) setSymmetricKey(key []byte, topic Topic, address *PssAddress, addtocache bool, protected bool) (string, error) {
keyid, err := p.w.AddSymKeyDirect(key)
if err != nil {
return "", err
}
p.addSymmetricKeyToPool(keyid, topic, address, addtocache, protected)
return keyid, nil
}
// adds a symmetric key to the pss key pool, and optionally adds the key
// to the collection of keys used to attempt symmetric decryption of
// incoming messages
func (p *Pss) addSymmetricKeyToPool(keyid string, topic Topic, address *PssAddress, addtocache bool, protected bool) {
psp := &pssPeer{
address: address,
protected: protected,
}
p.symKeyPoolMu.Lock()
if _, ok := p.symKeyPool[keyid]; !ok {
p.symKeyPool[keyid] = make(map[Topic]*pssPeer)
}
p.symKeyPool[keyid][topic] = psp
p.symKeyPoolMu.Unlock()
if addtocache {
p.symKeyDecryptCacheCursor++
p.symKeyDecryptCache[p.symKeyDecryptCacheCursor%cap(p.symKeyDecryptCache)] = &keyid
}
key, _ := p.GetSymmetricKey(keyid)
log.Trace("added symkey", "symkeyid", keyid, "symkey", common.ToHex(key), "topic", topic, "address", fmt.Sprintf("%p", address), "cache", addtocache)
}
// Returns a symmetric key byte seqyence stored in the whisper backend
// by its unique id
//
// Passes on the error value from the whisper backend
func (p *Pss) GetSymmetricKey(symkeyid string) ([]byte, error) {
symkey, err := p.w.GetSymKey(symkeyid)
if err != nil {
return nil, err
}
return symkey, nil
}
// Returns all recorded topic and address combination for a specific public key
func (p *Pss) GetPublickeyPeers(keyid string) (topic []Topic, address []PssAddress, err error) {
p.pubKeyPoolMu.RLock()
defer p.pubKeyPoolMu.RUnlock()
for t, peer := range p.pubKeyPool[keyid] {
topic = append(topic, t)
address = append(address, *peer.address)
}
return topic, address, nil
}
func (p *Pss) getPeerAddress(keyid string, topic Topic) (PssAddress, error) {
p.pubKeyPoolMu.RLock()
defer p.pubKeyPoolMu.RUnlock()
if peers, ok := p.pubKeyPool[keyid]; ok {
if t, ok := peers[topic]; ok {
return *t.address, nil
}
}
return nil, fmt.Errorf("peer with pubkey %s, topic %x not found", keyid, topic)
}
// Attempt to decrypt, validate and unpack a
// symmetrically encrypted message
// If successful, returns the unpacked whisper ReceivedMessage struct
// encapsulating the decrypted message, and the whisper backend id
// of the symmetric key used to decrypt the message.
// It fails if decryption of the message fails or if the message is corrupted
func (p *Pss) processSym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, *PssAddress, error) {
metrics.GetOrRegisterCounter("pss.process.sym", nil).Inc(1)
for i := p.symKeyDecryptCacheCursor; i > p.symKeyDecryptCacheCursor-cap(p.symKeyDecryptCache) && i > 0; i-- {
symkeyid := p.symKeyDecryptCache[i%cap(p.symKeyDecryptCache)]
symkey, err := p.w.GetSymKey(*symkeyid)
if err != nil {
continue
}
recvmsg, err := envelope.OpenSymmetric(symkey)
if err != nil {
continue
}
if !recvmsg.Validate() {
return nil, "", nil, fmt.Errorf("symmetrically encrypted message has invalid signature or is corrupt")
}
p.symKeyPoolMu.Lock()
from := p.symKeyPool[*symkeyid][Topic(envelope.Topic)].address
p.symKeyPoolMu.Unlock()
p.symKeyDecryptCacheCursor++
p.symKeyDecryptCache[p.symKeyDecryptCacheCursor%cap(p.symKeyDecryptCache)] = symkeyid
return recvmsg, *symkeyid, from, nil
}
return nil, "", nil, fmt.Errorf("could not decrypt message")
}
// Attempt to decrypt, validate and unpack an
// asymmetrically encrypted message
// If successful, returns the unpacked whisper ReceivedMessage struct
// encapsulating the decrypted message, and the byte representation of
// the public key used to decrypt the message.
// It fails if decryption of message fails, or if the message is corrupted
func (p *Pss) processAsym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, *PssAddress, error) {
metrics.GetOrRegisterCounter("pss.process.asym", nil).Inc(1)
recvmsg, err := envelope.OpenAsymmetric(p.privateKey)
if err != nil {
return nil, "", nil, fmt.Errorf("could not decrypt message: %s", err)
}
// check signature (if signed), strip padding
if !recvmsg.Validate() {
return nil, "", nil, fmt.Errorf("invalid message")
}
pubkeyid := common.ToHex(crypto.FromECDSAPub(recvmsg.Src))
var from *PssAddress
p.pubKeyPoolMu.Lock()
if p.pubKeyPool[pubkeyid][Topic(envelope.Topic)] != nil {
from = p.pubKeyPool[pubkeyid][Topic(envelope.Topic)].address
}
p.pubKeyPoolMu.Unlock()
return recvmsg, pubkeyid, from, nil
}
// Symkey garbage collection
// a key is removed if:
// - it is not marked as protected
// - it is not in the incoming decryption cache
func (p *Pss) cleanKeys() (count int) {
for keyid, peertopics := range p.symKeyPool {
var expiredtopics []Topic
for topic, psp := range peertopics {
if psp.protected {
continue
}
var match bool
for i := p.symKeyDecryptCacheCursor; i > p.symKeyDecryptCacheCursor-cap(p.symKeyDecryptCache) && i > 0; i-- {
cacheid := p.symKeyDecryptCache[i%cap(p.symKeyDecryptCache)]
if *cacheid == keyid {
match = true
}
}
if !match {
expiredtopics = append(expiredtopics, topic)
}
}
for _, topic := range expiredtopics {
p.symKeyPoolMu.Lock()
delete(p.symKeyPool[keyid], topic)
log.Trace("symkey cleanup deletion", "symkeyid", keyid, "topic", topic, "val", p.symKeyPool[keyid])
p.symKeyPoolMu.Unlock()
count++
}
}
return
}
/////////////////////////////////////////////////////////////////////
// SECTION: Message sending
/////////////////////////////////////////////////////////////////////
func (p *Pss) enqueue(msg *PssMsg) error {
select {
case p.outbox <- msg:
return nil
default:
}
metrics.GetOrRegisterCounter("pss.enqueue.outbox.full", nil).Inc(1)
return errors.New("outbox full")
}
// Send a raw message (any encryption is responsibility of calling client)
//
// Will fail if raw messages are disallowed
func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error {
if !p.allowRaw {
return errors.New("Raw messages not enabled")
}
pssMsgParams := &msgParams{
raw: true,
}
payload := &whisper.Envelope{
Data: msg,
Topic: whisper.TopicType(topic),
}
pssMsg := newPssMsg(pssMsgParams)
pssMsg.To = address
pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix())
pssMsg.Payload = payload
p.addFwdCache(pssMsg)
return p.enqueue(pssMsg)
}
// Send a message using symmetric encryption
//
// Fails if the key id does not match any of the stored symmetric keys
func (p *Pss) SendSym(symkeyid string, topic Topic, msg []byte) error {
symkey, err := p.GetSymmetricKey(symkeyid)
if err != nil {
return fmt.Errorf("missing valid send symkey %s: %v", symkeyid, err)
}
p.symKeyPoolMu.Lock()
psp, ok := p.symKeyPool[symkeyid][topic]
p.symKeyPoolMu.Unlock()
if !ok {
return fmt.Errorf("invalid topic '%s' for symkey '%s'", topic.String(), symkeyid)
} else if psp.address == nil {
return fmt.Errorf("no address hint for topic '%s' symkey '%s'", topic.String(), symkeyid)
}
err = p.send(*psp.address, topic, msg, false, symkey)
return err
}
// Send a message using asymmetric encryption
//
// Fails if the key id does not match any in of the stored public keys
func (p *Pss) SendAsym(pubkeyid string, topic Topic, msg []byte) error {
if _, err := crypto.UnmarshalPubkey(common.FromHex(pubkeyid)); err != nil {
return fmt.Errorf("Cannot unmarshal pubkey: %x", pubkeyid)
}
p.pubKeyPoolMu.Lock()
psp, ok := p.pubKeyPool[pubkeyid][topic]
p.pubKeyPoolMu.Unlock()
if !ok {
return fmt.Errorf("invalid topic '%s' for pubkey '%s'", topic.String(), pubkeyid)
} else if psp.address == nil {
return fmt.Errorf("no address hint for topic '%s' pubkey '%s'", topic.String(), pubkeyid)
}
go func() {
p.send(*psp.address, topic, msg, true, common.FromHex(pubkeyid))
}()
return nil
}
// Send is payload agnostic, and will accept any byte slice as payload
// It generates an whisper envelope for the specified recipient and topic,
// and wraps the message payload in it.
// TODO: Implement proper message padding
func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []byte) error {
metrics.GetOrRegisterCounter("pss.send", nil).Inc(1)
if key == nil || bytes.Equal(key, []byte{}) {
return fmt.Errorf("Zero length key passed to pss send")
}
padding := make([]byte, p.paddingByteSize)
c, err := rand.Read(padding)
if err != nil {
return err
} else if c < p.paddingByteSize {
return fmt.Errorf("invalid padding length: %d", c)
}
wparams := &whisper.MessageParams{
TTL: defaultWhisperTTL,
Src: p.privateKey,
Topic: whisper.TopicType(topic),
WorkTime: defaultWhisperWorkTime,
PoW: defaultWhisperPoW,
Payload: msg,
Padding: padding,
}
if asymmetric {
pk, err := crypto.UnmarshalPubkey(key)
if err != nil {
return fmt.Errorf("Cannot unmarshal pubkey: %x", key)
}
wparams.Dst = pk
} else {
wparams.KeySym = key
}
// set up outgoing message container, which does encryption and envelope wrapping
woutmsg, err := whisper.NewSentMessage(wparams)
if err != nil {
return fmt.Errorf("failed to generate whisper message encapsulation: %v", err)
}
// performs encryption.
// Does NOT perform / performs negligible PoW due to very low difficulty setting
// after this the message is ready for sending
envelope, err := woutmsg.Wrap(wparams)
if err != nil {
return fmt.Errorf("failed to perform whisper encryption: %v", err)
}
log.Trace("pssmsg whisper done", "env", envelope, "wparams payload", common.ToHex(wparams.Payload), "to", common.ToHex(to), "asym", asymmetric, "key", common.ToHex(key))
// prepare for devp2p transport
pssMsgParams := &msgParams{
sym: !asymmetric,
}
pssMsg := newPssMsg(pssMsgParams)
pssMsg.To = to
pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix())
pssMsg.Payload = envelope
return p.enqueue(pssMsg)
}
// Forwards a pss message to the peer(s) closest to the to recipient address in the PssMsg struct
// The recipient address can be of any length, and the byte slice will be matched to the MSB slice
// of the peer address of the equivalent length.
func (p *Pss) forward(msg *PssMsg) error {
metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1)
to := make([]byte, addressLength)
copy(to[:len(msg.To)], msg.To)
// send with kademlia
// find the closest peer to the recipient and attempt to send
sent := 0
p.Kademlia.EachConn(to, 256, func(sp *network.Peer, po int, isproxbin bool) bool {
info := sp.Info()
// check if the peer is running pss
var ispss bool
for _, cap := range info.Caps {
if cap == p.capstring {
ispss = true
break
}
}
if !ispss {
log.Trace("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps)
return true
}
// get the protocol peer from the forwarding peer cache
sendMsg := fmt.Sprintf("MSG TO %x FROM %x VIA %x", to, p.BaseAddr(), sp.Address())
p.fwdPoolMu.RLock()
pp := p.fwdPool[sp.Info().ID]
p.fwdPoolMu.RUnlock()
// attempt to send the message
err := pp.Send(context.TODO(), msg)
if err != nil {
metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1)
log.Error(err.Error())
return true
}
sent++
log.Trace(fmt.Sprintf("%v: successfully forwarded", sendMsg))
// continue forwarding if:
// - if the peer is end recipient but the full address has not been disclosed
// - if the peer address matches the partial address fully
// - if the peer is in proxbin
if len(msg.To) < addressLength && bytes.Equal(msg.To, sp.Address()[:len(msg.To)]) {
log.Trace(fmt.Sprintf("Pss keep forwarding: Partial address + full partial match"))
return true
} else if isproxbin {
log.Trace(fmt.Sprintf("%x is in proxbin, keep forwarding", common.ToHex(sp.Address())))
return true
}
// at this point we stop forwarding, and the state is as follows:
// - the peer is end recipient and we have full address
// - we are not in proxbin (directed routing)
// - partial addresses don't fully match
return false
})
if sent == 0 {
log.Debug("unable to forward to any peers")
if err := p.enqueue(msg); err != nil {
metrics.GetOrRegisterCounter("pss.forward.enqueue.error", nil).Inc(1)
log.Error(err.Error())
return err
}
}
// cache the message
p.addFwdCache(msg)
return nil
}
/////////////////////////////////////////////////////////////////////
// SECTION: Caching
/////////////////////////////////////////////////////////////////////
// cleanFwdCache is used to periodically remove expired entries from the forward cache
func (p *Pss) cleanFwdCache() {
metrics.GetOrRegisterCounter("pss.cleanfwdcache", nil).Inc(1)
p.fwdCacheMu.Lock()
defer p.fwdCacheMu.Unlock()
for k, v := range p.fwdCache {
if v.expiresAt.Before(time.Now()) {
delete(p.fwdCache, k)
}
}
}
// add a message to the cache
func (p *Pss) addFwdCache(msg *PssMsg) error {
metrics.GetOrRegisterCounter("pss.addfwdcache", nil).Inc(1)
var entry pssCacheEntry
var ok bool
p.fwdCacheMu.Lock()
defer p.fwdCacheMu.Unlock()
digest := p.digest(msg)
if entry, ok = p.fwdCache[digest]; !ok {
entry = pssCacheEntry{}
}
entry.expiresAt = time.Now().Add(p.cacheTTL)
p.fwdCache[digest] = entry
return nil
}
// check if message is in the cache
func (p *Pss) checkFwdCache(msg *PssMsg) bool {
p.fwdCacheMu.Lock()
defer p.fwdCacheMu.Unlock()
digest := p.digest(msg)
entry, ok := p.fwdCache[digest]
if ok {
if entry.expiresAt.After(time.Now()) {
log.Trace("unexpired cache", "digest", fmt.Sprintf("%x", digest))
metrics.GetOrRegisterCounter("pss.checkfwdcache.unexpired", nil).Inc(1)
return true
}
metrics.GetOrRegisterCounter("pss.checkfwdcache.expired", nil).Inc(1)
}
return false
}
// Digest of message
func (p *Pss) digest(msg *PssMsg) pssDigest {
hasher := p.hashPool.Get().(storage.SwarmHash)
defer p.hashPool.Put(hasher)
hasher.Reset()
hasher.Write(msg.serialize())
digest := pssDigest{}
key := hasher.Sum(nil)
copy(digest[:], key[:digestLength])
return digest
}