erigon-pulse/p2p/discv5/ticket.go

955 lines
25 KiB
Go

// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package discv5
import (
"bytes"
"encoding/binary"
"fmt"
"math"
"math/rand"
"sort"
"time"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/mclock"
"github.com/ledgerwatch/turbo-geth/crypto"
"github.com/ledgerwatch/turbo-geth/log"
)
const (
ticketTimeBucketLen = time.Minute
timeWindow = 10 // * ticketTimeBucketLen
wantTicketsInWindow = 10
collectFrequency = time.Second * 30
registerFrequency = time.Second * 60
maxCollectDebt = 10
maxRegisterDebt = 5
keepTicketConst = time.Minute * 10
keepTicketExp = time.Minute * 5
targetWaitTime = time.Minute * 10
topicQueryTimeout = time.Second * 5
topicQueryResend = time.Minute
// topic radius detection
maxRadius = 0xffffffffffffffff
radiusTC = time.Minute * 20
radiusBucketsPerBit = 8
minSlope = 1
minPeakSize = 40
maxNoAdjust = 20
lookupWidth = 8
minRightSum = 20
searchForceQuery = 4
)
// timeBucket represents absolute monotonic time in minutes.
// It is used as the index into the per-topic ticket buckets.
type timeBucket int
type ticket struct {
topics []Topic
regTime []mclock.AbsTime // Per-topic local absolute time when the ticket can be used.
// The serial number that was issued by the server.
serial uint32
// Used by registrar, tracks absolute time when the ticket was created.
issueTime mclock.AbsTime
// Fields used only by registrants
node *Node // the registrar node that signed this ticket
refCnt int // tracks number of topics that will be registered using this ticket
pong []byte // encoded pong packet signed by the registrar
}
// ticketRef refers to a single topic in a ticket.
type ticketRef struct {
t *ticket
idx int // index of the topic in t.topics and t.regTime
}
func (ref ticketRef) topic() Topic {
return ref.t.topics[ref.idx]
}
func (ref ticketRef) topicRegTime() mclock.AbsTime {
return ref.t.regTime[ref.idx]
}
func pongToTicket(localTime mclock.AbsTime, topics []Topic, node *Node, p *ingressPacket) (*ticket, error) {
wps := p.data.(*pong).WaitPeriods
if len(topics) != len(wps) {
return nil, fmt.Errorf("bad wait period list: got %d values, want %d", len(topics), len(wps))
}
if rlpHash(topics) != p.data.(*pong).TopicHash {
return nil, fmt.Errorf("bad topic hash")
}
t := &ticket{
issueTime: localTime,
node: node,
topics: topics,
pong: p.rawData,
regTime: make([]mclock.AbsTime, len(wps)),
}
// Convert wait periods to local absolute time.
for i, wp := range wps {
t.regTime[i] = localTime + mclock.AbsTime(time.Second*time.Duration(wp))
}
return t, nil
}
func ticketToPong(t *ticket, pong *pong) {
pong.Expiration = uint64(t.issueTime / mclock.AbsTime(time.Second))
pong.TopicHash = rlpHash(t.topics)
pong.TicketSerial = t.serial
pong.WaitPeriods = make([]uint32, len(t.regTime))
for i, regTime := range t.regTime {
pong.WaitPeriods[i] = uint32(time.Duration(regTime-t.issueTime) / time.Second)
}
}
type ticketStore struct {
// radius detector and target address generator
// exists for both searched and registered topics
radius map[Topic]*topicRadius
// Contains buckets (for each absolute minute) of tickets
// that can be used in that minute.
// This is only set if the topic is being registered.
tickets map[Topic]*topicTickets
regQueue []Topic // Topic registration queue for round robin attempts
regSet map[Topic]struct{} // Topic registration queue contents for fast filling
nodes map[*Node]*ticket
nodeLastReq map[*Node]reqInfo
lastBucketFetched timeBucket
nextTicketCached *ticketRef
nextTicketReg mclock.AbsTime
searchTopicMap map[Topic]searchTopic
nextTopicQueryCleanup mclock.AbsTime
queriesSent map[*Node]map[common.Hash]sentQuery
}
type searchTopic struct {
foundChn chan<- *Node
}
type sentQuery struct {
sent mclock.AbsTime
lookup lookupInfo
}
type topicTickets struct {
buckets map[timeBucket][]ticketRef
nextLookup mclock.AbsTime
nextReg mclock.AbsTime
}
func newTicketStore() *ticketStore {
return &ticketStore{
radius: make(map[Topic]*topicRadius),
tickets: make(map[Topic]*topicTickets),
regSet: make(map[Topic]struct{}),
nodes: make(map[*Node]*ticket),
nodeLastReq: make(map[*Node]reqInfo),
searchTopicMap: make(map[Topic]searchTopic),
queriesSent: make(map[*Node]map[common.Hash]sentQuery),
}
}
// addTopic starts tracking a topic. If register is true,
// the local node will register the topic and tickets will be collected.
func (s *ticketStore) addTopic(topic Topic, register bool) {
log.Trace("Adding discovery topic", "topic", topic, "register", register)
if s.radius[topic] == nil {
s.radius[topic] = newTopicRadius(topic)
}
if register && s.tickets[topic] == nil {
s.tickets[topic] = &topicTickets{buckets: make(map[timeBucket][]ticketRef)}
}
}
func (s *ticketStore) addSearchTopic(t Topic, foundChn chan<- *Node) {
s.addTopic(t, false)
if s.searchTopicMap[t].foundChn == nil {
s.searchTopicMap[t] = searchTopic{foundChn: foundChn}
}
}
func (s *ticketStore) removeSearchTopic(t Topic) {
if st := s.searchTopicMap[t]; st.foundChn != nil {
delete(s.searchTopicMap, t)
}
}
// removeRegisterTopic deletes all tickets for the given topic.
func (s *ticketStore) removeRegisterTopic(topic Topic) {
log.Trace("Removing discovery topic", "topic", topic)
if s.tickets[topic] == nil {
log.Warn("Removing non-existent discovery topic", "topic", topic)
return
}
for _, list := range s.tickets[topic].buckets {
for _, ref := range list {
ref.t.refCnt--
if ref.t.refCnt == 0 {
delete(s.nodes, ref.t.node)
delete(s.nodeLastReq, ref.t.node)
}
}
}
delete(s.tickets, topic)
}
func (s *ticketStore) regTopicSet() []Topic {
topics := make([]Topic, 0, len(s.tickets))
for topic := range s.tickets {
topics = append(topics, topic)
}
return topics
}
// nextRegisterLookup returns the target of the next lookup for ticket collection.
func (s *ticketStore) nextRegisterLookup() (lookupInfo, time.Duration) {
// Queue up any new topics (or discarded ones), preserving iteration order
for topic := range s.tickets {
if _, ok := s.regSet[topic]; !ok {
s.regQueue = append(s.regQueue, topic)
s.regSet[topic] = struct{}{}
}
}
// Iterate over the set of all topics and look up the next suitable one
for len(s.regQueue) > 0 {
// Fetch the next topic from the queue, and ensure it still exists
topic := s.regQueue[0]
s.regQueue = s.regQueue[1:]
delete(s.regSet, topic)
if s.tickets[topic] == nil {
continue
}
// If the topic needs more tickets, return it
if s.tickets[topic].nextLookup < mclock.Now() {
next, delay := s.radius[topic].nextTarget(false), 100*time.Millisecond
log.Trace("Found discovery topic to register", "topic", topic, "target", next.target, "delay", delay)
return next, delay
}
}
// No registration topics found or all exhausted, sleep
delay := 40 * time.Second
log.Trace("No topic found to register", "delay", delay)
return lookupInfo{}, delay
}
func (s *ticketStore) nextSearchLookup(topic Topic) lookupInfo {
tr := s.radius[topic]
target := tr.nextTarget(tr.radiusLookupCnt >= searchForceQuery)
if target.radiusLookup {
tr.radiusLookupCnt++
} else {
tr.radiusLookupCnt = 0
}
return target
}
// ticketsInWindow returns the tickets of a given topic in the registration window.
func (s *ticketStore) ticketsInWindow(topic Topic) []ticketRef {
// Sanity check that the topic still exists before operating on it
if s.tickets[topic] == nil {
log.Warn("Listing non-existing discovery tickets", "topic", topic)
return nil
}
// Gather all the tickers in the next time window
var tickets []ticketRef
buckets := s.tickets[topic].buckets
for idx := timeBucket(0); idx < timeWindow; idx++ {
tickets = append(tickets, buckets[s.lastBucketFetched+idx]...)
}
log.Trace("Retrieved discovery registration tickets", "topic", topic, "from", s.lastBucketFetched, "tickets", len(tickets))
return tickets
}
func (s *ticketStore) removeExcessTickets(t Topic) {
tickets := s.ticketsInWindow(t)
if len(tickets) <= wantTicketsInWindow {
return
}
sort.Sort(ticketRefByWaitTime(tickets))
for _, r := range tickets[wantTicketsInWindow:] {
s.removeTicketRef(r)
}
}
type ticketRefByWaitTime []ticketRef
// Len is the number of elements in the collection.
func (s ticketRefByWaitTime) Len() int {
return len(s)
}
func (ref ticketRef) waitTime() mclock.AbsTime {
return ref.t.regTime[ref.idx] - ref.t.issueTime
}
// Less reports whether the element with
// index i should sort before the element with index j.
func (s ticketRefByWaitTime) Less(i, j int) bool {
return s[i].waitTime() < s[j].waitTime()
}
// Swap swaps the elements with indexes i and j.
func (s ticketRefByWaitTime) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s *ticketStore) addTicketRef(r ticketRef) {
topic := r.t.topics[r.idx]
tickets := s.tickets[topic]
if tickets == nil {
log.Warn("Adding ticket to non-existent topic", "topic", topic)
return
}
bucket := timeBucket(r.t.regTime[r.idx] / mclock.AbsTime(ticketTimeBucketLen))
tickets.buckets[bucket] = append(tickets.buckets[bucket], r)
r.t.refCnt++
min := mclock.Now() - mclock.AbsTime(collectFrequency)*maxCollectDebt
if tickets.nextLookup < min {
tickets.nextLookup = min
}
tickets.nextLookup += mclock.AbsTime(collectFrequency)
//s.removeExcessTickets(topic)
}
func (s *ticketStore) nextFilteredTicket() (*ticketRef, time.Duration) {
now := mclock.Now()
for {
ticket, wait := s.nextRegisterableTicket()
if ticket == nil {
return ticket, wait
}
log.Trace("Found discovery ticket to register", "node", ticket.t.node, "serial", ticket.t.serial, "wait", wait)
regTime := now + mclock.AbsTime(wait)
topic := ticket.t.topics[ticket.idx]
if s.tickets[topic] != nil && regTime >= s.tickets[topic].nextReg {
return ticket, wait
}
s.removeTicketRef(*ticket)
}
}
func (s *ticketStore) ticketRegistered(ref ticketRef) {
now := mclock.Now()
topic := ref.t.topics[ref.idx]
tickets := s.tickets[topic]
min := now - mclock.AbsTime(registerFrequency)*maxRegisterDebt
if min > tickets.nextReg {
tickets.nextReg = min
}
tickets.nextReg += mclock.AbsTime(registerFrequency)
s.tickets[topic] = tickets
s.removeTicketRef(ref)
}
// nextRegisterableTicket returns the next ticket that can be used
// to register.
//
// If the returned wait time <= zero the ticket can be used. For a positive
// wait time, the caller should requery the next ticket later.
//
// A ticket can be returned more than once with <= zero wait time in case
// the ticket contains multiple topics.
func (s *ticketStore) nextRegisterableTicket() (*ticketRef, time.Duration) {
now := mclock.Now()
if s.nextTicketCached != nil {
return s.nextTicketCached, time.Duration(s.nextTicketCached.topicRegTime() - now)
}
for bucket := s.lastBucketFetched; ; bucket++ {
var (
empty = true // true if there are no tickets
nextTicket ticketRef // uninitialized if this bucket is empty
)
for _, tickets := range s.tickets {
//s.removeExcessTickets(topic)
if len(tickets.buckets) != 0 {
empty = false
list := tickets.buckets[bucket]
for _, ref := range list {
//debugLog(fmt.Sprintf(" nrt bucket = %d node = %x sn = %v wait = %v", bucket, ref.t.node.ID[:8], ref.t.serial, time.Duration(ref.topicRegTime()-now)))
if nextTicket.t == nil || ref.topicRegTime() < nextTicket.topicRegTime() {
nextTicket = ref
}
}
}
}
if empty {
return nil, 0
}
if nextTicket.t != nil {
s.nextTicketCached = &nextTicket
return &nextTicket, time.Duration(nextTicket.topicRegTime() - now)
}
s.lastBucketFetched = bucket
}
}
// removeTicket removes a ticket from the ticket store
func (s *ticketStore) removeTicketRef(ref ticketRef) {
log.Trace("Removing discovery ticket reference", "node", ref.t.node.ID, "serial", ref.t.serial)
// Make nextRegisterableTicket return the next available ticket.
s.nextTicketCached = nil
topic := ref.topic()
tickets := s.tickets[topic]
if tickets == nil {
log.Trace("Removing tickets from unknown topic", "topic", topic)
return
}
bucket := timeBucket(ref.t.regTime[ref.idx] / mclock.AbsTime(ticketTimeBucketLen))
list := tickets.buckets[bucket]
idx := -1
for i, bt := range list {
if bt.t == ref.t {
idx = i
break
}
}
if idx == -1 {
panic(nil)
}
list = append(list[:idx], list[idx+1:]...)
if len(list) != 0 {
tickets.buckets[bucket] = list
} else {
delete(tickets.buckets, bucket)
}
ref.t.refCnt--
if ref.t.refCnt == 0 {
delete(s.nodes, ref.t.node)
delete(s.nodeLastReq, ref.t.node)
}
}
type lookupInfo struct {
target common.Hash
topic Topic
radiusLookup bool
}
type reqInfo struct {
pingHash []byte
lookup lookupInfo
time mclock.AbsTime
}
// returns -1 if not found
func (t *ticket) findIdx(topic Topic) int {
for i, tt := range t.topics {
if tt == topic {
return i
}
}
return -1
}
func (s *ticketStore) registerLookupDone(lookup lookupInfo, nodes []*Node, ping func(n *Node) []byte) {
now := mclock.Now()
for i, n := range nodes {
if i == 0 || (binary.BigEndian.Uint64(n.sha[:8])^binary.BigEndian.Uint64(lookup.target[:8])) < s.radius[lookup.topic].minRadius {
if lookup.radiusLookup {
if lastReq, ok := s.nodeLastReq[n]; !ok || time.Duration(now-lastReq.time) > radiusTC {
s.nodeLastReq[n] = reqInfo{pingHash: ping(n), lookup: lookup, time: now}
}
} else {
if s.nodes[n] == nil {
s.nodeLastReq[n] = reqInfo{pingHash: ping(n), lookup: lookup, time: now}
}
}
}
}
}
func (s *ticketStore) searchLookupDone(lookup lookupInfo, nodes []*Node, query func(n *Node, topic Topic) []byte) {
now := mclock.Now()
for i, n := range nodes {
if i == 0 || (binary.BigEndian.Uint64(n.sha[:8])^binary.BigEndian.Uint64(lookup.target[:8])) < s.radius[lookup.topic].minRadius {
if lookup.radiusLookup {
if lastReq, ok := s.nodeLastReq[n]; !ok || time.Duration(now-lastReq.time) > radiusTC {
s.nodeLastReq[n] = reqInfo{pingHash: nil, lookup: lookup, time: now}
}
} // else {
if s.canQueryTopic(n, lookup.topic) {
hash := query(n, lookup.topic)
if hash != nil {
s.addTopicQuery(common.BytesToHash(hash), n, lookup)
}
}
//}
}
}
}
func (s *ticketStore) adjustWithTicket(now mclock.AbsTime, targetHash common.Hash, t *ticket) {
for i, topic := range t.topics {
if tt, ok := s.radius[topic]; ok {
tt.adjustWithTicket(now, targetHash, ticketRef{t, i})
}
}
}
func (s *ticketStore) addTicket(localTime mclock.AbsTime, pingHash []byte, ticket *ticket) {
log.Trace("Adding discovery ticket", "node", ticket.node.ID, "serial", ticket.serial)
lastReq, ok := s.nodeLastReq[ticket.node]
if !(ok && bytes.Equal(pingHash, lastReq.pingHash)) {
return
}
s.adjustWithTicket(localTime, lastReq.lookup.target, ticket)
if lastReq.lookup.radiusLookup || s.nodes[ticket.node] != nil {
return
}
topic := lastReq.lookup.topic
topicIdx := ticket.findIdx(topic)
if topicIdx == -1 {
return
}
bucket := timeBucket(localTime / mclock.AbsTime(ticketTimeBucketLen))
if s.lastBucketFetched == 0 || bucket < s.lastBucketFetched {
s.lastBucketFetched = bucket
}
if _, ok := s.tickets[topic]; ok {
wait := ticket.regTime[topicIdx] - localTime
rnd := rand.ExpFloat64()
if rnd > 10 {
rnd = 10
}
if float64(wait) < float64(keepTicketConst)+float64(keepTicketExp)*rnd {
// use the ticket to register this topic
//fmt.Println("addTicket", ticket.node.ID[:8], ticket.node.addr().String(), ticket.serial, ticket.pong)
s.addTicketRef(ticketRef{ticket, topicIdx})
}
}
if ticket.refCnt > 0 {
s.nextTicketCached = nil
s.nodes[ticket.node] = ticket
}
}
func (s *ticketStore) getNodeTicket(node *Node) *ticket {
if s.nodes[node] == nil {
log.Trace("Retrieving node ticket", "node", node.ID, "serial", nil)
} else {
log.Trace("Retrieving node ticket", "node", node.ID, "serial", s.nodes[node].serial)
}
return s.nodes[node]
}
func (s *ticketStore) canQueryTopic(node *Node, topic Topic) bool {
qq := s.queriesSent[node]
if qq != nil {
now := mclock.Now()
for _, sq := range qq {
if sq.lookup.topic == topic && sq.sent > now-mclock.AbsTime(topicQueryResend) {
return false
}
}
}
return true
}
func (s *ticketStore) addTopicQuery(hash common.Hash, node *Node, lookup lookupInfo) {
now := mclock.Now()
qq := s.queriesSent[node]
if qq == nil {
qq = make(map[common.Hash]sentQuery)
s.queriesSent[node] = qq
}
qq[hash] = sentQuery{sent: now, lookup: lookup}
s.cleanupTopicQueries(now)
}
func (s *ticketStore) cleanupTopicQueries(now mclock.AbsTime) {
if s.nextTopicQueryCleanup > now {
return
}
exp := now - mclock.AbsTime(topicQueryResend)
for n, qq := range s.queriesSent {
for h, q := range qq {
if q.sent < exp {
delete(qq, h)
}
}
if len(qq) == 0 {
delete(s.queriesSent, n)
}
}
s.nextTopicQueryCleanup = now + mclock.AbsTime(topicQueryTimeout)
}
func (s *ticketStore) gotTopicNodes(from *Node, hash common.Hash, nodes []rpcNode) (timeout bool) {
now := mclock.Now()
//fmt.Println("got", from.addr().String(), hash, len(nodes))
qq := s.queriesSent[from]
if qq == nil {
return true
}
q, ok := qq[hash]
if !ok || now > q.sent+mclock.AbsTime(topicQueryTimeout) {
return true
}
inside := float64(0)
if len(nodes) > 0 {
inside = 1
}
s.radius[q.lookup.topic].adjust(now, q.lookup.target, from.sha, inside)
chn := s.searchTopicMap[q.lookup.topic].foundChn
if chn == nil {
//fmt.Println("no channel")
return false
}
for _, node := range nodes {
ip := node.IP
if ip.IsUnspecified() || ip.IsLoopback() {
ip = from.IP
}
n := NewNode(node.ID, ip, node.UDP, node.TCP)
select {
case chn <- n:
default:
return false
}
}
return false
}
type topicRadius struct {
topic Topic
topicHashPrefix uint64
radius, minRadius uint64
buckets []topicRadiusBucket
converged bool
radiusLookupCnt int
}
type topicRadiusEvent int
const (
trOutside topicRadiusEvent = iota
trInside
trNoAdjust
trCount
)
type topicRadiusBucket struct {
weights [trCount]float64
lastTime mclock.AbsTime
value float64
lookupSent map[common.Hash]mclock.AbsTime
}
func (b *topicRadiusBucket) update(now mclock.AbsTime) {
if now == b.lastTime {
return
}
exp := math.Exp(-float64(now-b.lastTime) / float64(radiusTC))
for i, w := range b.weights {
b.weights[i] = w * exp
}
b.lastTime = now
for target, tm := range b.lookupSent {
if now-tm > mclock.AbsTime(respTimeout) {
b.weights[trNoAdjust] += 1
delete(b.lookupSent, target)
}
}
}
func (b *topicRadiusBucket) adjust(now mclock.AbsTime, inside float64) {
b.update(now)
if inside <= 0 {
b.weights[trOutside] += 1
} else {
if inside >= 1 {
b.weights[trInside] += 1
} else {
b.weights[trInside] += inside
b.weights[trOutside] += 1 - inside
}
}
}
func newTopicRadius(t Topic) *topicRadius {
topicHash := crypto.Keccak256Hash([]byte(t))
topicHashPrefix := binary.BigEndian.Uint64(topicHash[0:8])
return &topicRadius{
topic: t,
topicHashPrefix: topicHashPrefix,
radius: maxRadius,
minRadius: maxRadius,
}
}
func (r *topicRadius) getBucketIdx(addrHash common.Hash) int {
prefix := binary.BigEndian.Uint64(addrHash[0:8])
var log2 float64
if prefix != r.topicHashPrefix {
log2 = math.Log2(float64(prefix ^ r.topicHashPrefix))
}
bucket := int((64 - log2) * radiusBucketsPerBit)
max := 64*radiusBucketsPerBit - 1
if bucket > max {
return max
}
if bucket < 0 {
return 0
}
return bucket
}
func (r *topicRadius) targetForBucket(bucket int) common.Hash {
min := math.Pow(2, 64-float64(bucket+1)/radiusBucketsPerBit)
max := math.Pow(2, 64-float64(bucket)/radiusBucketsPerBit)
a := uint64(min)
b := randUint64n(uint64(max - min))
xor := a + b
if xor < a {
xor = ^uint64(0)
}
prefix := r.topicHashPrefix ^ xor
var target common.Hash
binary.BigEndian.PutUint64(target[0:8], prefix)
globalRandRead(target[8:])
return target
}
// package rand provides a Read function in Go 1.6 and later, but
// we can't use it yet because we still support Go 1.5.
func globalRandRead(b []byte) {
pos := 0
val := 0
for n := 0; n < len(b); n++ {
if pos == 0 {
val = rand.Int()
pos = 7
}
b[n] = byte(val)
val >>= 8
pos--
}
}
func (r *topicRadius) isInRadius(addrHash common.Hash) bool {
nodePrefix := binary.BigEndian.Uint64(addrHash[0:8])
dist := nodePrefix ^ r.topicHashPrefix
return dist < r.radius
}
func (r *topicRadius) chooseLookupBucket(a, b int) int {
if a < 0 {
a = 0
}
if a > b {
return -1
}
c := 0
for i := a; i <= b; i++ {
if i >= len(r.buckets) || r.buckets[i].weights[trNoAdjust] < maxNoAdjust {
c++
}
}
if c == 0 {
return -1
}
rnd := randUint(uint32(c))
for i := a; i <= b; i++ {
if i >= len(r.buckets) || r.buckets[i].weights[trNoAdjust] < maxNoAdjust {
if rnd == 0 {
return i
}
rnd--
}
}
panic(nil) // should never happen
}
func (r *topicRadius) needMoreLookups(a, b int, maxValue float64) bool {
var max float64
if a < 0 {
a = 0
}
if b >= len(r.buckets) {
b = len(r.buckets) - 1
if r.buckets[b].value > max {
max = r.buckets[b].value
}
}
if b >= a {
for i := a; i <= b; i++ {
if r.buckets[i].value > max {
max = r.buckets[i].value
}
}
}
return maxValue-max < minPeakSize
}
func (r *topicRadius) recalcRadius() (radius uint64, radiusLookup int) {
maxBucket := 0
maxValue := float64(0)
now := mclock.Now()
v := float64(0)
for i := range r.buckets {
r.buckets[i].update(now)
v += r.buckets[i].weights[trOutside] - r.buckets[i].weights[trInside]
r.buckets[i].value = v
//fmt.Printf("%v %v | ", v, r.buckets[i].weights[trNoAdjust])
}
//fmt.Println()
slopeCross := -1
for i, b := range r.buckets {
v := b.value
if v < float64(i)*minSlope {
slopeCross = i
break
}
if v > maxValue {
maxValue = v
maxBucket = i + 1
}
}
minRadBucket := len(r.buckets)
sum := float64(0)
for minRadBucket > 0 && sum < minRightSum {
minRadBucket--
b := r.buckets[minRadBucket]
sum += b.weights[trInside] + b.weights[trOutside]
}
r.minRadius = uint64(math.Pow(2, 64-float64(minRadBucket)/radiusBucketsPerBit))
lookupLeft := -1
if r.needMoreLookups(0, maxBucket-lookupWidth-1, maxValue) {
lookupLeft = r.chooseLookupBucket(maxBucket-lookupWidth, maxBucket-1)
}
lookupRight := -1
if slopeCross != maxBucket && (minRadBucket <= maxBucket || r.needMoreLookups(maxBucket+lookupWidth, len(r.buckets)-1, maxValue)) {
for len(r.buckets) <= maxBucket+lookupWidth {
r.buckets = append(r.buckets, topicRadiusBucket{lookupSent: make(map[common.Hash]mclock.AbsTime)})
}
lookupRight = r.chooseLookupBucket(maxBucket, maxBucket+lookupWidth-1)
}
if lookupLeft == -1 {
radiusLookup = lookupRight
} else {
if lookupRight == -1 {
radiusLookup = lookupLeft
} else {
if randUint(2) == 0 {
radiusLookup = lookupLeft
} else {
radiusLookup = lookupRight
}
}
}
//fmt.Println("mb", maxBucket, "sc", slopeCross, "mrb", minRadBucket, "ll", lookupLeft, "lr", lookupRight, "mv", maxValue)
if radiusLookup == -1 {
// no more radius lookups needed at the moment, return a radius
r.converged = true
rad := maxBucket
if minRadBucket < rad {
rad = minRadBucket
}
radius = ^uint64(0)
if rad > 0 {
radius = uint64(math.Pow(2, 64-float64(rad)/radiusBucketsPerBit))
}
r.radius = radius
}
return
}
func (r *topicRadius) nextTarget(forceRegular bool) lookupInfo {
if !forceRegular {
_, radiusLookup := r.recalcRadius()
if radiusLookup != -1 {
target := r.targetForBucket(radiusLookup)
r.buckets[radiusLookup].lookupSent[target] = mclock.Now()
return lookupInfo{target: target, topic: r.topic, radiusLookup: true}
}
}
radExt := r.radius / 2
if radExt > maxRadius-r.radius {
radExt = maxRadius - r.radius
}
rnd := randUint64n(r.radius) + randUint64n(2*radExt)
if rnd > radExt {
rnd -= radExt
} else {
rnd = radExt - rnd
}
prefix := r.topicHashPrefix ^ rnd
var target common.Hash
binary.BigEndian.PutUint64(target[0:8], prefix)
globalRandRead(target[8:])
return lookupInfo{target: target, topic: r.topic, radiusLookup: false}
}
func (r *topicRadius) adjustWithTicket(now mclock.AbsTime, targetHash common.Hash, t ticketRef) {
wait := t.t.regTime[t.idx] - t.t.issueTime
inside := float64(wait)/float64(targetWaitTime) - 0.5
if inside > 1 {
inside = 1
}
if inside < 0 {
inside = 0
}
r.adjust(now, targetHash, t.t.node.sha, inside)
}
func (r *topicRadius) adjust(now mclock.AbsTime, targetHash, addrHash common.Hash, inside float64) {
bucket := r.getBucketIdx(addrHash)
//fmt.Println("adjust", bucket, len(r.buckets), inside)
if bucket >= len(r.buckets) {
return
}
r.buckets[bucket].adjust(now, inside)
delete(r.buckets[bucket].lookupSent, targetHash)
}