mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-22 03:30:35 +00:00
fix
Former-commit-id: 46792bf21cbfd109e0a912577c47e7c68c1fe09b [formerly 14503cf42f8cb0d70f7b09c9fdefd15a3deefaf7] Former-commit-id: 72a23597a36b67bd2babb278132fc64807e86ea0
This commit is contained in:
commit
31d4fd0678
2
.gitignore
vendored
2
.gitignore
vendored
@ -38,4 +38,4 @@ profile.cov
|
||||
/dashboard/assets/flow-typed
|
||||
/dashboard/assets/node_modules
|
||||
/dashboard/assets/stats.json
|
||||
/dashboard/assets/public/bundle.js
|
||||
/dashboard/assets/bundle.js
|
||||
|
@ -158,7 +158,7 @@ func makeFullNode(ctx *cli.Context) *node.Node {
|
||||
utils.RegisterEthService(stack, &cfg.Eth)
|
||||
|
||||
if ctx.GlobalBool(utils.DashboardEnabledFlag.Name) {
|
||||
utils.RegisterDashboardService(stack, &cfg.Dashboard)
|
||||
utils.RegisterDashboardService(stack, &cfg.Dashboard, gitCommit)
|
||||
}
|
||||
// Whisper must be explicitly enabled by specifying at least 1 whisper flag or in dev mode
|
||||
shhEnabled := enableWhisper(ctx)
|
||||
|
@ -278,9 +278,12 @@ func startNode(ctx *cli.Context, stack *node.Node) {
|
||||
// Start auxiliary services if enabled
|
||||
if ctx.GlobalBool(utils.MiningEnabledFlag.Name) || ctx.GlobalBool(utils.DeveloperFlag.Name) {
|
||||
// Mining only makes sense if a full Ethereum node is running
|
||||
if ctx.GlobalBool(utils.LightModeFlag.Name) || ctx.GlobalString(utils.SyncModeFlag.Name) == "light" {
|
||||
utils.Fatalf("Light clients do not support mining")
|
||||
}
|
||||
var ethereum *eth.Ethereum
|
||||
if err := stack.Service(ðereum); err != nil {
|
||||
utils.Fatalf("ethereum service not running: %v", err)
|
||||
utils.Fatalf("Ethereum service not running: %v", err)
|
||||
}
|
||||
// Use a reduced number of threads if requested
|
||||
if threads := ctx.GlobalInt(utils.MinerThreadsFlag.Name); threads > 0 {
|
||||
|
@ -1104,9 +1104,9 @@ func RegisterEthService(stack *node.Node, cfg *eth.Config) {
|
||||
}
|
||||
|
||||
// RegisterDashboardService adds a dashboard to the stack.
|
||||
func RegisterDashboardService(stack *node.Node, cfg *dashboard.Config) {
|
||||
func RegisterDashboardService(stack *node.Node, cfg *dashboard.Config, commit string) {
|
||||
stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
|
||||
return dashboard.New(cfg)
|
||||
return dashboard.New(cfg, commit)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -44,6 +44,11 @@ func (gp *GasPool) SubGas(amount uint64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Gas returns the amount of gas remaining in the pool.
|
||||
func (gp *GasPool) Gas() uint64 {
|
||||
return uint64(*gp)
|
||||
}
|
||||
|
||||
func (gp *GasPool) String() string {
|
||||
return fmt.Sprintf("%d", *gp)
|
||||
}
|
||||
|
@ -20,7 +20,7 @@ Normally the dashboard assets are bundled into Geth via `go-bindata` to avoid ex
|
||||
|
||||
```
|
||||
$ (cd dashboard/assets && ./node_modules/.bin/webpack --watch)
|
||||
$ geth --dashboard --dashboard.assets=dashboard/assets/public --vmodule=dashboard=5
|
||||
$ geth --dashboard --dashboard.assets=dashboard/assets --vmodule=dashboard=5
|
||||
```
|
||||
|
||||
To bundle up the final UI into Geth, run `go generate`:
|
||||
|
@ -1 +1 @@
|
||||
043a5d72867d2c62b1605e251cb4e6c8387be6e9
|
||||
b2c12032341ac1818ee5f325c5e81f3bcaf6c775
|
@ -62,32 +62,4 @@ export type MenuProp = {|...ProvidedMenuProp, id: string|};
|
||||
// This way the mistyping is prevented.
|
||||
export const MENU: Map<string, {...MenuProp}> = new Map(menuSkeletons.map(({id, menu}) => ([id, {id, ...menu}])));
|
||||
|
||||
type ProvidedSampleProp = {|limit: number|};
|
||||
const sampleSkeletons: Array<{|id: string, sample: ProvidedSampleProp|}> = [
|
||||
{
|
||||
id: 'memory',
|
||||
sample: {
|
||||
limit: 200,
|
||||
},
|
||||
}, {
|
||||
id: 'traffic',
|
||||
sample: {
|
||||
limit: 200,
|
||||
},
|
||||
}, {
|
||||
id: 'logs',
|
||||
sample: {
|
||||
limit: 200,
|
||||
},
|
||||
},
|
||||
];
|
||||
export type SampleProp = {|...ProvidedSampleProp, id: string|};
|
||||
export const SAMPLE: Map<string, {...SampleProp}> = new Map(sampleSkeletons.map(({id, sample}) => ([id, {id, ...sample}])));
|
||||
|
||||
export const DURATION = 200;
|
||||
|
||||
export const LENS: Map<string, string> = new Map([
|
||||
'content',
|
||||
...menuSkeletons.map(({id}) => id),
|
||||
...sampleSkeletons.map(({id}) => id),
|
||||
].map(lens => [lens, lens]));
|
||||
|
@ -19,37 +19,99 @@
|
||||
import React, {Component} from 'react';
|
||||
|
||||
import withStyles from 'material-ui/styles/withStyles';
|
||||
import {lensPath, view, set} from 'ramda';
|
||||
|
||||
import Header from './Header';
|
||||
import Body from './Body';
|
||||
import {MENU, SAMPLE} from './Common';
|
||||
import type {Message, HomeMessage, LogsMessage, Chart} from '../types/message';
|
||||
import Footer from './Footer';
|
||||
import {MENU} from './Common';
|
||||
import type {Content} from '../types/content';
|
||||
|
||||
// appender appends an array (A) to the end of another array (B) in the state.
|
||||
// lens is the path of B in the state, samples is A, and limit is the maximum size of the changed array.
|
||||
// deepUpdate updates an object corresponding to the given update data, which has
|
||||
// the shape of the same structure as the original object. updater also has the same
|
||||
// structure, except that it contains functions where the original data needs to be
|
||||
// updated. These functions are used to handle the update.
|
||||
//
|
||||
// appender retrieves a function, which overrides the state's value at lens, and returns with the overridden state.
|
||||
const appender = (lens, samples, limit) => (state) => {
|
||||
const newSamples = [
|
||||
...view(lens, state), // retrieves a specific value of the state at the given path (lens).
|
||||
...samples,
|
||||
];
|
||||
// set is a function of ramda.js, which needs the path, the new value, the original state, and retrieves
|
||||
// the altered state.
|
||||
return set(
|
||||
lens,
|
||||
newSamples.slice(newSamples.length > limit ? newSamples.length - limit : 0),
|
||||
state
|
||||
);
|
||||
// Since the messages have the same shape as the state content, this approach allows
|
||||
// the generalization of the message handling. The only necessary thing is to set a
|
||||
// handler function for every path of the state in order to maximize the flexibility
|
||||
// of the update.
|
||||
const deepUpdate = (prev: Object, update: Object, updater: Object) => {
|
||||
if (typeof update === 'undefined') {
|
||||
// TODO (kurkomisi): originally this was deep copy, investigate it.
|
||||
return prev;
|
||||
}
|
||||
if (typeof updater === 'function') {
|
||||
return updater(prev, update);
|
||||
}
|
||||
const updated = {};
|
||||
Object.keys(prev).forEach((key) => {
|
||||
updated[key] = deepUpdate(prev[key], update[key], updater[key]);
|
||||
});
|
||||
|
||||
return updated;
|
||||
};
|
||||
// Lenses for specific data fields in the state, used for a clearer deep update.
|
||||
// NOTE: This solution will be changed very likely.
|
||||
const memoryLens = lensPath(['content', 'home', 'memory']);
|
||||
const trafficLens = lensPath(['content', 'home', 'traffic']);
|
||||
const logLens = lensPath(['content', 'logs', 'log']);
|
||||
// styles retrieves the styles for the Dashboard component.
|
||||
|
||||
// shouldUpdate returns the structure of a message. It is used to prevent unnecessary render
|
||||
// method triggerings. In the affected component's shouldComponentUpdate method it can be checked
|
||||
// whether the involved data was changed or not by checking the message structure.
|
||||
//
|
||||
// We could return the message itself too, but it's safer not to give access to it.
|
||||
const shouldUpdate = (msg: Object, updater: Object) => {
|
||||
const su = {};
|
||||
Object.keys(msg).forEach((key) => {
|
||||
su[key] = typeof updater[key] !== 'function' ? shouldUpdate(msg[key], updater[key]) : true;
|
||||
});
|
||||
|
||||
return su;
|
||||
};
|
||||
|
||||
// appender is a state update generalization function, which appends the update data
|
||||
// to the existing data. limit defines the maximum allowed size of the created array.
|
||||
const appender = <T>(limit: number) => (prev: Array<T>, update: Array<T>) => [...prev, ...update].slice(-limit);
|
||||
|
||||
// replacer is a state update generalization function, which replaces the original data.
|
||||
const replacer = <T>(prev: T, update: T) => update;
|
||||
|
||||
// defaultContent is the initial value of the state content.
|
||||
const defaultContent: Content = {
|
||||
general: {
|
||||
version: null,
|
||||
commit: null,
|
||||
},
|
||||
home: {
|
||||
memory: [],
|
||||
traffic: [],
|
||||
},
|
||||
chain: {},
|
||||
txpool: {},
|
||||
network: {},
|
||||
system: {},
|
||||
logs: {
|
||||
log: [],
|
||||
},
|
||||
};
|
||||
|
||||
// updaters contains the state update generalization functions for each path of the state.
|
||||
// TODO (kurkomisi): Define a tricky type which embraces the content and the handlers.
|
||||
const updaters = {
|
||||
general: {
|
||||
version: replacer,
|
||||
commit: replacer,
|
||||
},
|
||||
home: {
|
||||
memory: appender(200),
|
||||
traffic: appender(200),
|
||||
},
|
||||
chain: null,
|
||||
txpool: null,
|
||||
network: null,
|
||||
system: null,
|
||||
logs: {
|
||||
log: appender(200),
|
||||
},
|
||||
};
|
||||
|
||||
// styles returns the styles for the Dashboard component.
|
||||
const styles = theme => ({
|
||||
dashboard: {
|
||||
display: 'flex',
|
||||
@ -61,15 +123,18 @@ const styles = theme => ({
|
||||
overflow: 'hidden',
|
||||
},
|
||||
});
|
||||
|
||||
export type Props = {
|
||||
classes: Object,
|
||||
};
|
||||
|
||||
type State = {
|
||||
active: string, // active menu
|
||||
sideBar: boolean, // true if the sidebar is opened
|
||||
content: $Shape<Content>, // the visualized data
|
||||
shouldUpdate: Set<string> // labels for the components, which need to rerender based on the incoming message
|
||||
content: Content, // the visualized data
|
||||
shouldUpdate: Object // labels for the components, which need to rerender based on the incoming message
|
||||
};
|
||||
|
||||
// Dashboard is the main component, which renders the whole page, makes connection with the server and
|
||||
// listens for messages. When there is an incoming message, updates the page's content correspondingly.
|
||||
class Dashboard extends Component<Props, State> {
|
||||
@ -78,8 +143,8 @@ class Dashboard extends Component<Props, State> {
|
||||
this.state = {
|
||||
active: MENU.get('home').id,
|
||||
sideBar: true,
|
||||
content: {home: {memory: [], traffic: []}, logs: {log: []}},
|
||||
shouldUpdate: new Set(),
|
||||
content: defaultContent,
|
||||
shouldUpdate: {},
|
||||
};
|
||||
}
|
||||
|
||||
@ -91,13 +156,14 @@ class Dashboard extends Component<Props, State> {
|
||||
// reconnect establishes a websocket connection with the server, listens for incoming messages
|
||||
// and tries to reconnect on connection loss.
|
||||
reconnect = () => {
|
||||
this.setState({
|
||||
content: {home: {memory: [], traffic: []}, logs: {log: []}},
|
||||
});
|
||||
const server = new WebSocket(`${((window.location.protocol === 'https:') ? 'wss://' : 'ws://') + window.location.host}/api`);
|
||||
server.onopen = () => {
|
||||
this.setState({content: defaultContent, shouldUpdate: {}});
|
||||
};
|
||||
server.onmessage = (event) => {
|
||||
const msg: Message = JSON.parse(event.data);
|
||||
const msg: $Shape<Content> = JSON.parse(event.data);
|
||||
if (!msg) {
|
||||
console.error(`Incoming message is ${msg}`);
|
||||
return;
|
||||
}
|
||||
this.update(msg);
|
||||
@ -107,56 +173,12 @@ class Dashboard extends Component<Props, State> {
|
||||
};
|
||||
};
|
||||
|
||||
// samples retrieves the raw data of a chart field from the incoming message.
|
||||
samples = (chart: Chart) => {
|
||||
let s = [];
|
||||
if (chart.history) {
|
||||
s = chart.history.map(({value}) => (value || 0)); // traffic comes without value at the beginning
|
||||
}
|
||||
if (chart.new) {
|
||||
s = [...s, chart.new.value || 0];
|
||||
}
|
||||
return s;
|
||||
};
|
||||
|
||||
// handleHome changes the home-menu related part of the state.
|
||||
handleHome = (home: HomeMessage) => {
|
||||
this.setState((prevState) => {
|
||||
let newState = prevState;
|
||||
newState.shouldUpdate = new Set();
|
||||
if (home.memory) {
|
||||
newState = appender(memoryLens, this.samples(home.memory), SAMPLE.get('memory').limit)(newState);
|
||||
newState.shouldUpdate.add('memory');
|
||||
}
|
||||
if (home.traffic) {
|
||||
newState = appender(trafficLens, this.samples(home.traffic), SAMPLE.get('traffic').limit)(newState);
|
||||
newState.shouldUpdate.add('traffic');
|
||||
}
|
||||
return newState;
|
||||
});
|
||||
};
|
||||
|
||||
// handleLogs changes the logs-menu related part of the state.
|
||||
handleLogs = (logs: LogsMessage) => {
|
||||
this.setState((prevState) => {
|
||||
let newState = prevState;
|
||||
newState.shouldUpdate = new Set();
|
||||
if (logs.log) {
|
||||
newState = appender(logLens, [logs.log], SAMPLE.get('logs').limit)(newState);
|
||||
newState.shouldUpdate.add('logs');
|
||||
}
|
||||
return newState;
|
||||
});
|
||||
};
|
||||
|
||||
// update analyzes the incoming message, and updates the charts' content correspondingly.
|
||||
update = (msg: Message) => {
|
||||
if (msg.home) {
|
||||
this.handleHome(msg.home);
|
||||
}
|
||||
if (msg.logs) {
|
||||
this.handleLogs(msg.logs);
|
||||
}
|
||||
// update updates the content corresponding to the incoming message.
|
||||
update = (msg: $Shape<Content>) => {
|
||||
this.setState(prevState => ({
|
||||
content: deepUpdate(prevState.content, msg, updaters),
|
||||
shouldUpdate: shouldUpdate(msg, updaters),
|
||||
}));
|
||||
};
|
||||
|
||||
// changeContent sets the active label, which is used at the content rendering.
|
||||
@ -191,6 +213,13 @@ class Dashboard extends Component<Props, State> {
|
||||
content={this.state.content}
|
||||
shouldUpdate={this.state.shouldUpdate}
|
||||
/>
|
||||
<Footer
|
||||
opened={this.state.sideBar}
|
||||
openSideBar={this.openSideBar}
|
||||
closeSideBar={this.closeSideBar}
|
||||
general={this.state.content.general}
|
||||
shouldUpdate={this.state.shouldUpdate}
|
||||
/>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
80
dashboard/assets/components/Footer.jsx
Normal file
80
dashboard/assets/components/Footer.jsx
Normal file
@ -0,0 +1,80 @@
|
||||
// @flow
|
||||
|
||||
// Copyright 2017 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import React, {Component} from 'react';
|
||||
|
||||
import withStyles from 'material-ui/styles/withStyles';
|
||||
import AppBar from 'material-ui/AppBar';
|
||||
import Toolbar from 'material-ui/Toolbar';
|
||||
import Typography from 'material-ui/Typography';
|
||||
|
||||
import type {General} from '../types/content';
|
||||
|
||||
// styles contains styles for the Header component.
|
||||
const styles = theme => ({
|
||||
footer: {
|
||||
backgroundColor: theme.palette.background.appBar,
|
||||
color: theme.palette.getContrastText(theme.palette.background.appBar),
|
||||
zIndex: theme.zIndex.appBar,
|
||||
},
|
||||
toolbar: {
|
||||
paddingLeft: theme.spacing.unit,
|
||||
paddingRight: theme.spacing.unit,
|
||||
display: 'flex',
|
||||
justifyContent: 'flex-end',
|
||||
},
|
||||
light: {
|
||||
color: 'rgba(255, 255, 255, 0.54)',
|
||||
},
|
||||
});
|
||||
export type Props = {
|
||||
general: General,
|
||||
classes: Object,
|
||||
};
|
||||
// TODO (kurkomisi): If the structure is appropriate, make an abstraction of the common parts with the Header.
|
||||
// Footer renders the header of the dashboard.
|
||||
class Footer extends Component<Props> {
|
||||
shouldComponentUpdate(nextProps) {
|
||||
return typeof nextProps.shouldUpdate.logs !== 'undefined';
|
||||
}
|
||||
|
||||
info = (about: string, data: string) => (
|
||||
<Typography type="caption" color="inherit">
|
||||
<span className={this.props.classes.light}>{about}</span> {data}
|
||||
</Typography>
|
||||
);
|
||||
|
||||
render() {
|
||||
const {classes, general} = this.props; // The classes property is injected by withStyles().
|
||||
const geth = general.version ? this.info('Geth', general.version) : null;
|
||||
const commit = general.commit ? this.info('Commit', general.commit.substring(0, 7)) : null;
|
||||
|
||||
return (
|
||||
<AppBar position="static" className={classes.footer}>
|
||||
<Toolbar className={classes.toolbar}>
|
||||
<div>
|
||||
{geth}
|
||||
{commit}
|
||||
</div>
|
||||
</Toolbar>
|
||||
</AppBar>
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
export default withStyles(styles)(Footer);
|
@ -22,13 +22,13 @@ import withTheme from 'material-ui/styles/withTheme';
|
||||
import {LineChart, AreaChart, Area, YAxis, CartesianGrid, Line} from 'recharts';
|
||||
|
||||
import ChartGrid from './ChartGrid';
|
||||
import type {ChartEntry} from '../types/message';
|
||||
import type {ChartEntry} from '../types/content';
|
||||
|
||||
export type Props = {
|
||||
theme: Object,
|
||||
memory: Array<ChartEntry>,
|
||||
traffic: Array<ChartEntry>,
|
||||
shouldUpdate: Object,
|
||||
shouldUpdate: Object,
|
||||
};
|
||||
// Home renders the home content.
|
||||
class Home extends Component<Props> {
|
||||
@ -40,11 +40,16 @@ class Home extends Component<Props> {
|
||||
}
|
||||
|
||||
shouldComponentUpdate(nextProps) {
|
||||
return nextProps.shouldUpdate.has('memory') || nextProps.shouldUpdate.has('traffic');
|
||||
return typeof nextProps.shouldUpdate.home !== 'undefined';
|
||||
}
|
||||
|
||||
memoryColor: Object;
|
||||
trafficColor: Object;
|
||||
|
||||
render() {
|
||||
const {memory, traffic} = this.props;
|
||||
let {memory, traffic} = this.props;
|
||||
memory = memory.map(({value}) => (value || 0));
|
||||
traffic = traffic.map(({value}) => (value || 0));
|
||||
|
||||
return (
|
||||
<ChartGrid spacing={24}>
|
||||
|
@ -1,7 +1,7 @@
|
||||
{
|
||||
"dependencies": {
|
||||
"babel-core": "^6.26.0",
|
||||
"babel-eslint": "^8.0.3",
|
||||
"babel-eslint": "^8.1.2",
|
||||
"babel-loader": "^7.1.2",
|
||||
"babel-plugin-transform-class-properties": "^6.24.1",
|
||||
"babel-plugin-transform-decorators-legacy": "^1.3.4",
|
||||
@ -12,28 +12,28 @@
|
||||
"babel-preset-stage-0": "^6.24.1",
|
||||
"babel-runtime": "^6.26.0",
|
||||
"classnames": "^2.2.5",
|
||||
"css-loader": "^0.28.7",
|
||||
"eslint": "^4.13.1",
|
||||
"css-loader": "^0.28.8",
|
||||
"eslint": "^4.15.0",
|
||||
"eslint-config-airbnb": "^16.1.0",
|
||||
"eslint-loader": "^1.9.0",
|
||||
"eslint-plugin-import": "^2.8.0",
|
||||
"eslint-plugin-jsx-a11y": "^6.0.3",
|
||||
"eslint-plugin-react": "^7.5.1",
|
||||
"eslint-plugin-flowtype": "^2.40.1",
|
||||
"eslint-plugin-flowtype": "^2.41.0",
|
||||
"file-loader": "^1.1.6",
|
||||
"flow-bin": "^0.61.0",
|
||||
"flow-bin": "^0.63.1",
|
||||
"flow-bin-loader": "^1.0.2",
|
||||
"flow-typed": "^2.2.3",
|
||||
"material-ui": "^1.0.0-beta.24",
|
||||
"material-ui-icons": "^1.0.0-beta.17",
|
||||
"path": "^0.12.7",
|
||||
"ramda": "^0.25.0",
|
||||
"react": "^16.2.0",
|
||||
"react-dom": "^16.2.0",
|
||||
"react-fa": "^5.0.0",
|
||||
"react-transition-group": "^2.2.1",
|
||||
"recharts": "^1.0.0-beta.6",
|
||||
"recharts": "^1.0.0-beta.7",
|
||||
"style-loader": "^0.19.1",
|
||||
"typeface-roboto": "^0.0.50",
|
||||
"url": "^0.11.0",
|
||||
"url-loader": "^0.6.2",
|
||||
"webpack": "^3.10.0"
|
||||
|
@ -16,38 +16,49 @@
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import type {ChartEntry} from './message';
|
||||
|
||||
export type Content = {
|
||||
home: Home,
|
||||
chain: Chain,
|
||||
txpool: TxPool,
|
||||
network: Network,
|
||||
system: System,
|
||||
logs: Logs,
|
||||
general: General,
|
||||
home: Home,
|
||||
chain: Chain,
|
||||
txpool: TxPool,
|
||||
network: Network,
|
||||
system: System,
|
||||
logs: Logs,
|
||||
};
|
||||
|
||||
export type General = {
|
||||
version: ?string,
|
||||
commit: ?string,
|
||||
};
|
||||
|
||||
export type Home = {
|
||||
memory: Array<ChartEntry>,
|
||||
traffic: Array<ChartEntry>,
|
||||
memory: ChartEntries,
|
||||
traffic: ChartEntries,
|
||||
};
|
||||
|
||||
export type ChartEntries = Array<ChartEntry>;
|
||||
|
||||
export type ChartEntry = {
|
||||
time: Date,
|
||||
value: number,
|
||||
};
|
||||
|
||||
export type Chain = {
|
||||
/* TODO (kurkomisi) */
|
||||
/* TODO (kurkomisi) */
|
||||
};
|
||||
|
||||
export type TxPool = {
|
||||
/* TODO (kurkomisi) */
|
||||
/* TODO (kurkomisi) */
|
||||
};
|
||||
|
||||
export type Network = {
|
||||
/* TODO (kurkomisi) */
|
||||
/* TODO (kurkomisi) */
|
||||
};
|
||||
|
||||
export type System = {
|
||||
/* TODO (kurkomisi) */
|
||||
/* TODO (kurkomisi) */
|
||||
};
|
||||
|
||||
export type Logs = {
|
||||
log: Array<string>,
|
||||
log: Array<string>,
|
||||
};
|
||||
|
@ -1,61 +0,0 @@
|
||||
// @flow
|
||||
|
||||
// Copyright 2017 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
export type Message = {
|
||||
home?: HomeMessage,
|
||||
chain?: ChainMessage,
|
||||
txpool?: TxPoolMessage,
|
||||
network?: NetworkMessage,
|
||||
system?: SystemMessage,
|
||||
logs?: LogsMessage,
|
||||
};
|
||||
|
||||
export type HomeMessage = {
|
||||
memory?: Chart,
|
||||
traffic?: Chart,
|
||||
};
|
||||
|
||||
export type Chart = {
|
||||
history?: Array<ChartEntry>,
|
||||
new?: ChartEntry,
|
||||
};
|
||||
|
||||
export type ChartEntry = {
|
||||
time: Date,
|
||||
value: number,
|
||||
};
|
||||
|
||||
export type ChainMessage = {
|
||||
/* TODO (kurkomisi) */
|
||||
};
|
||||
|
||||
export type TxPoolMessage = {
|
||||
/* TODO (kurkomisi) */
|
||||
};
|
||||
|
||||
export type NetworkMessage = {
|
||||
/* TODO (kurkomisi) */
|
||||
};
|
||||
|
||||
export type SystemMessage = {
|
||||
/* TODO (kurkomisi) */
|
||||
};
|
||||
|
||||
export type LogsMessage = {
|
||||
log: string,
|
||||
};
|
@ -23,7 +23,7 @@ module.exports = {
|
||||
},
|
||||
entry: './index',
|
||||
output: {
|
||||
path: path.resolve(__dirname, 'public'),
|
||||
path: path.resolve(__dirname, ''),
|
||||
filename: 'bundle.js',
|
||||
},
|
||||
plugins: [
|
||||
|
@ -18,8 +18,9 @@ package dashboard
|
||||
|
||||
//go:generate npm --prefix ./assets install
|
||||
//go:generate ./assets/node_modules/.bin/webpack --config ./assets/webpack.config.js --context ./assets
|
||||
//go:generate go-bindata -nometadata -o assets.go -prefix assets -nocompress -pkg dashboard assets/public/...
|
||||
//go:generate sh -c "sed 's#var _public#//nolint:misspell\\\n&#' assets.go > assets.go.tmp && mv assets.go.tmp assets.go"
|
||||
//go:generate go-bindata -nometadata -o assets.go -prefix assets -nocompress -pkg dashboard assets/dashboard.html assets/bundle.js
|
||||
//go:generate sh -c "sed 's#var _bundleJs#//nolint:misspell\\\n&#' assets.go > assets.go.tmp && mv assets.go.tmp assets.go"
|
||||
//go:generate sh -c "sed 's#var _dashboardHtml#//nolint:misspell\\\n&#' assets.go > assets.go.tmp && mv assets.go.tmp assets.go"
|
||||
//go:generate gofmt -w -s assets.go
|
||||
|
||||
import (
|
||||
@ -34,6 +35,7 @@ import (
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/p2p"
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
"github.com/rcrowley/go-metrics"
|
||||
"golang.org/x/net/websocket"
|
||||
@ -53,6 +55,7 @@ type Dashboard struct {
|
||||
listener net.Listener
|
||||
conns map[uint32]*client // Currently live websocket connections
|
||||
charts *HomeMessage
|
||||
commit string
|
||||
lock sync.RWMutex // Lock protecting the dashboard's internals
|
||||
|
||||
quit chan chan error // Channel used for graceful exit
|
||||
@ -67,15 +70,16 @@ type client struct {
|
||||
}
|
||||
|
||||
// New creates a new dashboard instance with the given configuration.
|
||||
func New(config *Config) (*Dashboard, error) {
|
||||
func New(config *Config, commit string) (*Dashboard, error) {
|
||||
return &Dashboard{
|
||||
conns: make(map[uint32]*client),
|
||||
config: config,
|
||||
quit: make(chan chan error),
|
||||
charts: &HomeMessage{
|
||||
Memory: &Chart{},
|
||||
Traffic: &Chart{},
|
||||
Memory: ChartEntries{},
|
||||
Traffic: ChartEntries{},
|
||||
},
|
||||
commit: commit,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -87,6 +91,8 @@ func (db *Dashboard) APIs() []rpc.API { return nil }
|
||||
|
||||
// Start implements node.Service, starting the data collection thread and the listening server of the dashboard.
|
||||
func (db *Dashboard) Start(server *p2p.Server) error {
|
||||
log.Info("Starting dashboard")
|
||||
|
||||
db.wg.Add(2)
|
||||
go db.collectData()
|
||||
go db.collectLogs() // In case of removing this line change 2 back to 1 in wg.Add.
|
||||
@ -160,7 +166,7 @@ func (db *Dashboard) webHandler(w http.ResponseWriter, r *http.Request) {
|
||||
w.Write(blob)
|
||||
return
|
||||
}
|
||||
blob, err := Asset(filepath.Join("public", path))
|
||||
blob, err := Asset(path[1:])
|
||||
if err != nil {
|
||||
log.Warn("Failed to load the asset", "path", path, "err", err)
|
||||
http.Error(w, "not found", http.StatusNotFound)
|
||||
@ -197,15 +203,20 @@ func (db *Dashboard) apiHandler(conn *websocket.Conn) {
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
versionMeta := ""
|
||||
if len(params.VersionMeta) > 0 {
|
||||
versionMeta = fmt.Sprintf(" (%s)", params.VersionMeta)
|
||||
}
|
||||
// Send the past data.
|
||||
client.msg <- Message{
|
||||
General: &GeneralMessage{
|
||||
Version: fmt.Sprintf("v%d.%d.%d%s", params.VersionMajor, params.VersionMinor, params.VersionPatch, versionMeta),
|
||||
Commit: db.commit,
|
||||
},
|
||||
Home: &HomeMessage{
|
||||
Memory: &Chart{
|
||||
History: db.charts.Memory.History,
|
||||
},
|
||||
Traffic: &Chart{
|
||||
History: db.charts.Traffic.History,
|
||||
},
|
||||
Memory: db.charts.Memory,
|
||||
Traffic: db.charts.Traffic,
|
||||
},
|
||||
}
|
||||
// Start tracking the connection and drop at connection loss.
|
||||
@ -249,24 +260,20 @@ func (db *Dashboard) collectData() {
|
||||
Value: inboundTraffic,
|
||||
}
|
||||
first := 0
|
||||
if len(db.charts.Memory.History) == memorySampleLimit {
|
||||
if len(db.charts.Memory) == memorySampleLimit {
|
||||
first = 1
|
||||
}
|
||||
db.charts.Memory.History = append(db.charts.Memory.History[first:], memory)
|
||||
db.charts.Memory = append(db.charts.Memory[first:], memory)
|
||||
first = 0
|
||||
if len(db.charts.Traffic.History) == trafficSampleLimit {
|
||||
if len(db.charts.Traffic) == trafficSampleLimit {
|
||||
first = 1
|
||||
}
|
||||
db.charts.Traffic.History = append(db.charts.Traffic.History[first:], traffic)
|
||||
db.charts.Traffic = append(db.charts.Traffic[first:], traffic)
|
||||
|
||||
db.sendToAll(&Message{
|
||||
Home: &HomeMessage{
|
||||
Memory: &Chart{
|
||||
New: memory,
|
||||
},
|
||||
Traffic: &Chart{
|
||||
New: traffic,
|
||||
},
|
||||
Memory: ChartEntries{memory},
|
||||
Traffic: ChartEntries{traffic},
|
||||
},
|
||||
})
|
||||
}
|
||||
@ -287,7 +294,7 @@ func (db *Dashboard) collectLogs() {
|
||||
case <-time.After(db.config.Refresh / 2):
|
||||
db.sendToAll(&Message{
|
||||
Logs: &LogsMessage{
|
||||
Log: fmt.Sprintf("%-4d: This is a fake log.", id),
|
||||
Log: []string{fmt.Sprintf("%-4d: This is a fake log.", id)},
|
||||
},
|
||||
})
|
||||
id++
|
||||
|
@ -19,6 +19,7 @@ package dashboard
|
||||
import "time"
|
||||
|
||||
type Message struct {
|
||||
General *GeneralMessage `json:"general,omitempty"`
|
||||
Home *HomeMessage `json:"home,omitempty"`
|
||||
Chain *ChainMessage `json:"chain,omitempty"`
|
||||
TxPool *TxPoolMessage `json:"txpool,omitempty"`
|
||||
@ -27,16 +28,18 @@ type Message struct {
|
||||
Logs *LogsMessage `json:"logs,omitempty"`
|
||||
}
|
||||
|
||||
type HomeMessage struct {
|
||||
Memory *Chart `json:"memory,omitempty"`
|
||||
Traffic *Chart `json:"traffic,omitempty"`
|
||||
type GeneralMessage struct {
|
||||
Version string `json:"version,omitempty"`
|
||||
Commit string `json:"commit,omitempty"`
|
||||
}
|
||||
|
||||
type Chart struct {
|
||||
History []*ChartEntry `json:"history,omitempty"`
|
||||
New *ChartEntry `json:"new,omitempty"`
|
||||
type HomeMessage struct {
|
||||
Memory ChartEntries `json:"memory,omitempty"`
|
||||
Traffic ChartEntries `json:"traffic,omitempty"`
|
||||
}
|
||||
|
||||
type ChartEntries []*ChartEntry
|
||||
|
||||
type ChartEntry struct {
|
||||
Time time.Time `json:"time,omitempty"`
|
||||
Value float64 `json:"value,omitempty"`
|
||||
@ -59,5 +62,5 @@ type SystemMessage struct {
|
||||
}
|
||||
|
||||
type LogsMessage struct {
|
||||
Log string `json:"log,omitempty"`
|
||||
Log []string `json:"log,omitempty"`
|
||||
}
|
||||
|
@ -613,6 +613,7 @@ func (s *Service) reportHistory(conn *websocket.Conn, list []uint64) error {
|
||||
}
|
||||
// Ran out of blocks, cut the report short and send
|
||||
history = history[len(history)-i:]
|
||||
break
|
||||
}
|
||||
// Assemble the history report and send it to the server
|
||||
if len(history) > 0 {
|
||||
|
@ -512,6 +512,11 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB
|
||||
var coalescedLogs []*types.Log
|
||||
|
||||
for {
|
||||
// If we don't have enough gas for any further transactions then we're done
|
||||
if gp.Gas() < params.TxGas {
|
||||
log.Trace("Not enough gas for further transactions", "gp", gp)
|
||||
break
|
||||
}
|
||||
// Retrieve the next transaction and abort if all done
|
||||
tx := txs.Peek()
|
||||
if tx == nil {
|
||||
|
111
trie/hasher.go
111
trie/hasher.go
@ -26,46 +26,27 @@ import (
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
)
|
||||
|
||||
// calculator is a utility used by the hasher to calculate the hash value of the tree node.
|
||||
type calculator struct {
|
||||
sha hash.Hash
|
||||
buffer *bytes.Buffer
|
||||
type hasher struct {
|
||||
tmp *bytes.Buffer
|
||||
sha hash.Hash
|
||||
cachegen, cachelimit uint16
|
||||
}
|
||||
|
||||
// calculatorPool is a set of temporary calculators that may be individually saved and retrieved.
|
||||
var calculatorPool = sync.Pool{
|
||||
// hashers live in a global pool.
|
||||
var hasherPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
return &calculator{buffer: new(bytes.Buffer), sha: sha3.NewKeccak256()}
|
||||
return &hasher{tmp: new(bytes.Buffer), sha: sha3.NewKeccak256()}
|
||||
},
|
||||
}
|
||||
|
||||
// hasher hasher is used to calculate the hash value of the whole tree.
|
||||
type hasher struct {
|
||||
cachegen uint16
|
||||
cachelimit uint16
|
||||
threaded bool
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func newHasher(cachegen, cachelimit uint16) *hasher {
|
||||
h := &hasher{
|
||||
cachegen: cachegen,
|
||||
cachelimit: cachelimit,
|
||||
}
|
||||
h := hasherPool.Get().(*hasher)
|
||||
h.cachegen, h.cachelimit = cachegen, cachelimit
|
||||
return h
|
||||
}
|
||||
|
||||
// newCalculator retrieves a cleaned calculator from calculator pool.
|
||||
func (h *hasher) newCalculator() *calculator {
|
||||
calculator := calculatorPool.Get().(*calculator)
|
||||
calculator.buffer.Reset()
|
||||
calculator.sha.Reset()
|
||||
return calculator
|
||||
}
|
||||
|
||||
// returnCalculator returns a no longer used calculator to the pool.
|
||||
func (h *hasher) returnCalculator(calculator *calculator) {
|
||||
calculatorPool.Put(calculator)
|
||||
func returnHasherToPool(h *hasher) {
|
||||
hasherPool.Put(h)
|
||||
}
|
||||
|
||||
// hash collapses a node down into a hash node, also returning a copy of the
|
||||
@ -142,48 +123,15 @@ func (h *hasher) hashChildren(original node, db DatabaseWriter) (node, node, err
|
||||
// Hash the full node's children, caching the newly hashed subtrees
|
||||
collapsed, cached := n.copy(), n.copy()
|
||||
|
||||
// hashChild is a helper to hash a single child, which is called either on the
|
||||
// same thread as the caller or in a goroutine for the toplevel branching.
|
||||
hashChild := func(index int, wg *sync.WaitGroup) {
|
||||
if wg != nil {
|
||||
defer wg.Done()
|
||||
for i := 0; i < 16; i++ {
|
||||
if n.Children[i] != nil {
|
||||
collapsed.Children[i], cached.Children[i], err = h.hash(n.Children[i], db, false)
|
||||
if err != nil {
|
||||
return original, original, err
|
||||
}
|
||||
} else {
|
||||
collapsed.Children[i] = valueNode(nil) // Ensure that nil children are encoded as empty strings.
|
||||
}
|
||||
// Ensure that nil children are encoded as empty strings.
|
||||
if collapsed.Children[index] == nil {
|
||||
collapsed.Children[index] = valueNode(nil)
|
||||
return
|
||||
}
|
||||
// Hash all other children properly
|
||||
var herr error
|
||||
collapsed.Children[index], cached.Children[index], herr = h.hash(n.Children[index], db, false)
|
||||
if herr != nil {
|
||||
h.mu.Lock() // rarely if ever locked, no congenstion
|
||||
err = herr
|
||||
h.mu.Unlock()
|
||||
}
|
||||
}
|
||||
// If we're not running in threaded mode yet, span a goroutine for each child
|
||||
if !h.threaded {
|
||||
// Disable further threading
|
||||
h.threaded = true
|
||||
|
||||
// Hash all the children concurrently
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 16; i++ {
|
||||
wg.Add(1)
|
||||
go hashChild(i, &wg)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Reenable threading for subsequent hash calls
|
||||
h.threaded = false
|
||||
} else {
|
||||
for i := 0; i < 16; i++ {
|
||||
hashChild(i, nil)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return original, original, err
|
||||
}
|
||||
cached.Children[16] = n.Children[16]
|
||||
if collapsed.Children[16] == nil {
|
||||
@ -202,29 +150,24 @@ func (h *hasher) store(n node, db DatabaseWriter, force bool) (node, error) {
|
||||
if _, isHash := n.(hashNode); n == nil || isHash {
|
||||
return n, nil
|
||||
}
|
||||
calculator := h.newCalculator()
|
||||
defer h.returnCalculator(calculator)
|
||||
|
||||
// Generate the RLP encoding of the node
|
||||
if err := rlp.Encode(calculator.buffer, n); err != nil {
|
||||
h.tmp.Reset()
|
||||
if err := rlp.Encode(h.tmp, n); err != nil {
|
||||
panic("encode error: " + err.Error())
|
||||
}
|
||||
if calculator.buffer.Len() < 32 && !force {
|
||||
|
||||
if h.tmp.Len() < 32 && !force {
|
||||
return n, nil // Nodes smaller than 32 bytes are stored inside their parent
|
||||
}
|
||||
// Larger nodes are replaced by their hash and stored in the database.
|
||||
hash, _ := n.cache()
|
||||
if hash == nil {
|
||||
calculator.sha.Write(calculator.buffer.Bytes())
|
||||
hash = hashNode(calculator.sha.Sum(nil))
|
||||
h.sha.Reset()
|
||||
h.sha.Write(h.tmp.Bytes())
|
||||
hash = hashNode(h.sha.Sum(nil))
|
||||
}
|
||||
if db != nil {
|
||||
// db might be a leveldb batch, which is not safe for concurrent writes
|
||||
h.mu.Lock()
|
||||
err := db.Put(hash, calculator.buffer.Bytes())
|
||||
h.mu.Unlock()
|
||||
|
||||
return hash, err
|
||||
return hash, db.Put(hash, h.tmp.Bytes())
|
||||
}
|
||||
return hash, nil
|
||||
}
|
||||
|
@ -199,10 +199,10 @@ func (t *SecureTrie) secKey(key []byte) []byte {
|
||||
// invalid on the next call to hashKey or secKey.
|
||||
func (t *SecureTrie) hashKey(key []byte) []byte {
|
||||
h := newHasher(0, 0)
|
||||
calculator := h.newCalculator()
|
||||
calculator.sha.Write(key)
|
||||
buf := calculator.sha.Sum(t.hashKeyBuf[:0])
|
||||
h.returnCalculator(calculator)
|
||||
h.sha.Reset()
|
||||
h.sha.Write(key)
|
||||
buf := h.sha.Sum(t.hashKeyBuf[:0])
|
||||
returnHasherToPool(h)
|
||||
return buf
|
||||
}
|
||||
|
||||
|
@ -501,5 +501,6 @@ func (t *Trie) hashRoot(db DatabaseWriter) (node, node, error) {
|
||||
return hashNode(emptyRoot.Bytes()), nil, nil
|
||||
}
|
||||
h := newHasher(t.cachegen, t.cachelimit)
|
||||
defer returnHasherToPool(h)
|
||||
return h.hash(t.root, db, true)
|
||||
}
|
||||
|
@ -116,12 +116,17 @@ func (api *PublicWhisperAPI) SetMaxMessageSize(ctx context.Context, size uint32)
|
||||
return true, api.w.SetMaxMessageSize(size)
|
||||
}
|
||||
|
||||
// SetMinPow sets the minimum PoW for a message before it is accepted.
|
||||
// SetMinPow sets the minimum PoW, and notifies the peers.
|
||||
func (api *PublicWhisperAPI) SetMinPoW(ctx context.Context, pow float64) (bool, error) {
|
||||
return true, api.w.SetMinimumPoW(pow)
|
||||
}
|
||||
|
||||
// MarkTrustedPeer marks a peer trusted. , which will allow it to send historic (expired) messages.
|
||||
// SetBloomFilter sets the new value of bloom filter, and notifies the peers.
|
||||
func (api *PublicWhisperAPI) SetBloomFilter(ctx context.Context, bloom hexutil.Bytes) (bool, error) {
|
||||
return true, api.w.SetBloomFilter(bloom)
|
||||
}
|
||||
|
||||
// MarkTrustedPeer marks a peer trusted, which will allow it to send historic (expired) messages.
|
||||
// Note: This function is not adding new nodes, the node needs to exists as a peer.
|
||||
func (api *PublicWhisperAPI) MarkTrustedPeer(ctx context.Context, enode string) (bool, error) {
|
||||
n, err := discover.ParseNode(enode)
|
||||
|
@ -35,7 +35,6 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
EnvelopeVersion = uint64(0)
|
||||
ProtocolVersion = uint64(6)
|
||||
ProtocolVersionStr = "6.0"
|
||||
ProtocolName = "shh"
|
||||
@ -52,11 +51,14 @@ const (
|
||||
paddingMask = byte(3)
|
||||
signatureFlag = byte(4)
|
||||
|
||||
TopicLength = 4
|
||||
signatureLength = 65
|
||||
aesKeyLength = 32
|
||||
AESNonceLength = 12
|
||||
keyIdSize = 32
|
||||
TopicLength = 4 // in bytes
|
||||
signatureLength = 65 // in bytes
|
||||
aesKeyLength = 32 // in bytes
|
||||
AESNonceLength = 12 // in bytes
|
||||
keyIdSize = 32 // in bytes
|
||||
bloomFilterSize = 64 // in bytes
|
||||
|
||||
EnvelopeHeaderLength = 20
|
||||
|
||||
MaxMessageSize = uint32(10 * 1024 * 1024) // maximum accepted size of a message.
|
||||
DefaultMaxMessageSize = uint32(1024 * 1024)
|
||||
@ -68,10 +70,8 @@ const (
|
||||
expirationCycle = time.Second
|
||||
transmissionCycle = 300 * time.Millisecond
|
||||
|
||||
DefaultTTL = 50 // seconds
|
||||
SynchAllowance = 10 // seconds
|
||||
|
||||
EnvelopeHeaderLength = 20
|
||||
DefaultTTL = 50 // seconds
|
||||
DefaultSyncAllowance = 10 // seconds
|
||||
)
|
||||
|
||||
type unknownVersionError uint64
|
||||
|
@ -42,9 +42,11 @@ type Envelope struct {
|
||||
Data []byte
|
||||
Nonce uint64
|
||||
|
||||
pow float64 // Message-specific PoW as described in the Whisper specification.
|
||||
hash common.Hash // Cached hash of the envelope to avoid rehashing every time.
|
||||
// Don't access hash directly, use Hash() function instead.
|
||||
pow float64 // Message-specific PoW as described in the Whisper specification.
|
||||
|
||||
// the following variables should not be accessed directly, use the corresponding function instead: Hash(), Bloom()
|
||||
hash common.Hash // Cached hash of the envelope to avoid rehashing every time.
|
||||
bloom []byte
|
||||
}
|
||||
|
||||
// size returns the size of envelope as it is sent (i.e. public fields only)
|
||||
@ -227,3 +229,30 @@ func (e *Envelope) Open(watcher *Filter) (msg *ReceivedMessage) {
|
||||
}
|
||||
return msg
|
||||
}
|
||||
|
||||
// Bloom maps 4-bytes Topic into 64-byte bloom filter with 3 bits set (at most).
|
||||
func (e *Envelope) Bloom() []byte {
|
||||
if e.bloom == nil {
|
||||
e.bloom = TopicToBloom(e.Topic)
|
||||
}
|
||||
return e.bloom
|
||||
}
|
||||
|
||||
// TopicToBloom converts the topic (4 bytes) to the bloom filter (64 bytes)
|
||||
func TopicToBloom(topic TopicType) []byte {
|
||||
b := make([]byte, bloomFilterSize)
|
||||
var index [3]int
|
||||
for j := 0; j < 3; j++ {
|
||||
index[j] = int(topic[j])
|
||||
if (topic[3] & (1 << uint(j))) != 0 {
|
||||
index[j] += 256
|
||||
}
|
||||
}
|
||||
|
||||
for j := 0; j < 3; j++ {
|
||||
byteIndex := index[j] / 8
|
||||
bitIndex := index[j] % 8
|
||||
b[byteIndex] = (1 << uint(bitIndex))
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
@ -36,6 +36,7 @@ type Peer struct {
|
||||
|
||||
trusted bool
|
||||
powRequirement float64
|
||||
bloomFilter []byte // may contain nil in case of full node
|
||||
|
||||
known *set.Set // Messages already known by the peer to avoid wasting bandwidth
|
||||
|
||||
@ -74,8 +75,12 @@ func (p *Peer) handshake() error {
|
||||
// Send the handshake status message asynchronously
|
||||
errc := make(chan error, 1)
|
||||
go func() {
|
||||
errc <- p2p.Send(p.ws, statusCode, ProtocolVersion)
|
||||
pow := p.host.MinPow()
|
||||
powConverted := math.Float64bits(pow)
|
||||
bloom := p.host.BloomFilter()
|
||||
errc <- p2p.SendItems(p.ws, statusCode, ProtocolVersion, powConverted, bloom)
|
||||
}()
|
||||
|
||||
// Fetch the remote status packet and verify protocol match
|
||||
packet, err := p.ws.ReadMsg()
|
||||
if err != nil {
|
||||
@ -85,14 +90,42 @@ func (p *Peer) handshake() error {
|
||||
return fmt.Errorf("peer [%x] sent packet %x before status packet", p.ID(), packet.Code)
|
||||
}
|
||||
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
|
||||
peerVersion, err := s.Uint()
|
||||
_, err = s.List()
|
||||
if err != nil {
|
||||
return fmt.Errorf("peer [%x] sent bad status message: %v", p.ID(), err)
|
||||
}
|
||||
peerVersion, err := s.Uint()
|
||||
if err != nil {
|
||||
return fmt.Errorf("peer [%x] sent bad status message (unable to decode version): %v", p.ID(), err)
|
||||
}
|
||||
if peerVersion != ProtocolVersion {
|
||||
return fmt.Errorf("peer [%x]: protocol version mismatch %d != %d", p.ID(), peerVersion, ProtocolVersion)
|
||||
}
|
||||
// Wait until out own status is consumed too
|
||||
|
||||
// only version is mandatory, subsequent parameters are optional
|
||||
powRaw, err := s.Uint()
|
||||
if err == nil {
|
||||
pow := math.Float64frombits(powRaw)
|
||||
if math.IsInf(pow, 0) || math.IsNaN(pow) || pow < 0.0 {
|
||||
return fmt.Errorf("peer [%x] sent bad status message: invalid pow", p.ID())
|
||||
}
|
||||
p.powRequirement = pow
|
||||
|
||||
var bloom []byte
|
||||
err = s.Decode(&bloom)
|
||||
if err == nil {
|
||||
sz := len(bloom)
|
||||
if sz != bloomFilterSize && sz != 0 {
|
||||
return fmt.Errorf("peer [%x] sent bad status message: wrong bloom filter size %d", p.ID(), sz)
|
||||
}
|
||||
if isFullNode(bloom) {
|
||||
p.bloomFilter = nil
|
||||
} else {
|
||||
p.bloomFilter = bloom
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := <-errc; err != nil {
|
||||
return fmt.Errorf("peer [%x] failed to send status packet: %v", p.ID(), err)
|
||||
}
|
||||
@ -156,7 +189,7 @@ func (p *Peer) broadcast() error {
|
||||
envelopes := p.host.Envelopes()
|
||||
bundle := make([]*Envelope, 0, len(envelopes))
|
||||
for _, envelope := range envelopes {
|
||||
if !p.marked(envelope) && envelope.PoW() >= p.powRequirement {
|
||||
if !p.marked(envelope) && envelope.PoW() >= p.powRequirement && p.bloomMatch(envelope) {
|
||||
bundle = append(bundle, envelope)
|
||||
}
|
||||
}
|
||||
@ -186,3 +219,16 @@ func (p *Peer) notifyAboutPowRequirementChange(pow float64) error {
|
||||
i := math.Float64bits(pow)
|
||||
return p2p.Send(p.ws, powRequirementCode, i)
|
||||
}
|
||||
|
||||
func (p *Peer) notifyAboutBloomFilterChange(bloom []byte) error {
|
||||
return p2p.Send(p.ws, bloomFilterExCode, bloom)
|
||||
}
|
||||
|
||||
func (p *Peer) bloomMatch(env *Envelope) bool {
|
||||
if p.bloomFilter == nil {
|
||||
// no filter - full node, accepts all envelops
|
||||
return true
|
||||
}
|
||||
|
||||
return bloomFilterMatch(p.bloomFilter, env.Bloom())
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"bytes"
|
||||
"crypto/ecdsa"
|
||||
"fmt"
|
||||
mrand "math/rand"
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
@ -87,6 +88,9 @@ var nodes [NumNodes]*TestNode
|
||||
var sharedKey []byte = []byte("some arbitrary data here")
|
||||
var sharedTopic TopicType = TopicType{0xF, 0x1, 0x2, 0}
|
||||
var expectedMessage []byte = []byte("per rectum ad astra")
|
||||
var masterBloomFilter []byte
|
||||
var masterPow = 0.00000001
|
||||
var round int = 1
|
||||
|
||||
func TestSimulation(t *testing.T) {
|
||||
// create a chain of whisper nodes,
|
||||
@ -104,8 +108,13 @@ func TestSimulation(t *testing.T) {
|
||||
// check if each node have received and decrypted exactly one message
|
||||
checkPropagation(t, true)
|
||||
|
||||
// send protocol-level messages (powRequirementCode) and check the new PoW requirement values
|
||||
powReqExchange(t)
|
||||
// check if Status message was correctly decoded
|
||||
checkBloomFilterExchange(t)
|
||||
checkPowExchange(t)
|
||||
|
||||
// send new pow and bloom exchange messages
|
||||
resetParams(t)
|
||||
round++
|
||||
|
||||
// node #1 sends one expected (decryptable) message
|
||||
sendMsg(t, true, 1)
|
||||
@ -113,18 +122,65 @@ func TestSimulation(t *testing.T) {
|
||||
// check if each node (except node #0) have received and decrypted exactly one message
|
||||
checkPropagation(t, false)
|
||||
|
||||
for i := 1; i < NumNodes; i++ {
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
sendMsg(t, true, i)
|
||||
}
|
||||
|
||||
// check if corresponding protocol-level messages were correctly decoded
|
||||
checkPowExchangeForNodeZero(t)
|
||||
checkBloomFilterExchange(t)
|
||||
|
||||
stopServers()
|
||||
}
|
||||
|
||||
func resetParams(t *testing.T) {
|
||||
// change pow only for node zero
|
||||
masterPow = 7777777.0
|
||||
nodes[0].shh.SetMinimumPoW(masterPow)
|
||||
|
||||
// change bloom for all nodes
|
||||
masterBloomFilter = TopicToBloom(sharedTopic)
|
||||
for i := 0; i < NumNodes; i++ {
|
||||
nodes[i].shh.SetBloomFilter(masterBloomFilter)
|
||||
}
|
||||
}
|
||||
|
||||
func initBloom(t *testing.T) {
|
||||
masterBloomFilter = make([]byte, bloomFilterSize)
|
||||
_, err := mrand.Read(masterBloomFilter)
|
||||
if err != nil {
|
||||
t.Fatalf("rand failed: %s.", err)
|
||||
}
|
||||
|
||||
msgBloom := TopicToBloom(sharedTopic)
|
||||
masterBloomFilter = addBloom(masterBloomFilter, msgBloom)
|
||||
for i := 0; i < 32; i++ {
|
||||
masterBloomFilter[i] = 0xFF
|
||||
}
|
||||
|
||||
if !bloomFilterMatch(masterBloomFilter, msgBloom) {
|
||||
t.Fatalf("bloom mismatch on initBloom.")
|
||||
}
|
||||
}
|
||||
|
||||
func initialize(t *testing.T) {
|
||||
initBloom(t)
|
||||
|
||||
var err error
|
||||
ip := net.IPv4(127, 0, 0, 1)
|
||||
port0 := 30303
|
||||
|
||||
for i := 0; i < NumNodes; i++ {
|
||||
var node TestNode
|
||||
b := make([]byte, bloomFilterSize)
|
||||
copy(b, masterBloomFilter)
|
||||
node.shh = New(&DefaultConfig)
|
||||
node.shh.SetMinimumPowTest(0.00000001)
|
||||
node.shh.SetMinimumPoW(masterPow)
|
||||
node.shh.SetBloomFilter(b)
|
||||
if !bytes.Equal(node.shh.BloomFilter(), masterBloomFilter) {
|
||||
t.Fatalf("bloom mismatch on init.")
|
||||
}
|
||||
node.shh.Start(nil)
|
||||
topics := make([]TopicType, 0)
|
||||
topics = append(topics, sharedTopic)
|
||||
@ -206,7 +262,7 @@ func checkPropagation(t *testing.T, includingNodeZero bool) {
|
||||
for i := first; i < NumNodes; i++ {
|
||||
f := nodes[i].shh.GetFilter(nodes[i].filerId)
|
||||
if f == nil {
|
||||
t.Fatalf("failed to get filterId %s from node %d.", nodes[i].filerId, i)
|
||||
t.Fatalf("failed to get filterId %s from node %d, round %d.", nodes[i].filerId, i, round)
|
||||
}
|
||||
|
||||
mail := f.Retrieve()
|
||||
@ -332,34 +388,43 @@ func TestPeerBasic(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func powReqExchange(t *testing.T) {
|
||||
for i, node := range nodes {
|
||||
for peer := range node.shh.peers {
|
||||
if peer.powRequirement > 1000.0 {
|
||||
t.Fatalf("node %d: one of the peers' pow requirement is too big (%f).", i, peer.powRequirement)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const pow float64 = 7777777.0
|
||||
nodes[0].shh.SetMinimumPoW(pow)
|
||||
|
||||
// wait until all the messages are delivered
|
||||
time.Sleep(64 * time.Millisecond)
|
||||
|
||||
func checkPowExchangeForNodeZero(t *testing.T) {
|
||||
cnt := 0
|
||||
for i, node := range nodes {
|
||||
for peer := range node.shh.peers {
|
||||
if peer.peer.ID() == discover.PubkeyID(&nodes[0].id.PublicKey) {
|
||||
cnt++
|
||||
if peer.powRequirement != pow {
|
||||
if peer.powRequirement != masterPow {
|
||||
t.Fatalf("node %d: failed to set the new pow requirement.", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if cnt == 0 {
|
||||
t.Fatalf("no matching peers found.")
|
||||
}
|
||||
}
|
||||
|
||||
func checkPowExchange(t *testing.T) {
|
||||
for i, node := range nodes {
|
||||
for peer := range node.shh.peers {
|
||||
if peer.peer.ID() != discover.PubkeyID(&nodes[0].id.PublicKey) {
|
||||
if peer.powRequirement != masterPow {
|
||||
t.Fatalf("node %d: failed to exchange pow requirement in round %d; expected %f, got %f",
|
||||
i, round, masterPow, peer.powRequirement)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func checkBloomFilterExchange(t *testing.T) {
|
||||
for i, node := range nodes {
|
||||
for peer := range node.shh.peers {
|
||||
if !bytes.Equal(peer.bloomFilter, masterBloomFilter) {
|
||||
t.Fatalf("node %d: failed to exchange bloom filter requirement in round %d. \n%x expected \n%x got",
|
||||
i, round, masterBloomFilter, peer.bloomFilter)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -48,9 +48,12 @@ type Statistics struct {
|
||||
}
|
||||
|
||||
const (
|
||||
minPowIdx = iota // Minimal PoW required by the whisper node
|
||||
maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node
|
||||
overflowIdx = iota // Indicator of message queue overflow
|
||||
maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node
|
||||
overflowIdx // Indicator of message queue overflow
|
||||
minPowIdx // Minimal PoW required by the whisper node
|
||||
minPowToleranceIdx // Minimal PoW tolerated by the whisper node for a limited time
|
||||
bloomFilterIdx // Bloom filter for topics of interest for this node
|
||||
bloomFilterToleranceIdx // Bloom filter tolerated by the whisper node for a limited time
|
||||
)
|
||||
|
||||
// Whisper represents a dark communication interface through the Ethereum
|
||||
@ -76,7 +79,7 @@ type Whisper struct {
|
||||
|
||||
settings syncmap.Map // holds configuration settings that can be dynamically changed
|
||||
|
||||
reactionAllowance int // maximum time in seconds allowed to process the whisper-related messages
|
||||
syncAllowance int // maximum time in seconds allowed to process the whisper-related messages
|
||||
|
||||
statsMu sync.Mutex // guard stats
|
||||
stats Statistics // Statistics of whisper node
|
||||
@ -91,15 +94,15 @@ func New(cfg *Config) *Whisper {
|
||||
}
|
||||
|
||||
whisper := &Whisper{
|
||||
privateKeys: make(map[string]*ecdsa.PrivateKey),
|
||||
symKeys: make(map[string][]byte),
|
||||
envelopes: make(map[common.Hash]*Envelope),
|
||||
expirations: make(map[uint32]*set.SetNonTS),
|
||||
peers: make(map[*Peer]struct{}),
|
||||
messageQueue: make(chan *Envelope, messageQueueLimit),
|
||||
p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
|
||||
quit: make(chan struct{}),
|
||||
reactionAllowance: SynchAllowance,
|
||||
privateKeys: make(map[string]*ecdsa.PrivateKey),
|
||||
symKeys: make(map[string][]byte),
|
||||
envelopes: make(map[common.Hash]*Envelope),
|
||||
expirations: make(map[uint32]*set.SetNonTS),
|
||||
peers: make(map[*Peer]struct{}),
|
||||
messageQueue: make(chan *Envelope, messageQueueLimit),
|
||||
p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
|
||||
quit: make(chan struct{}),
|
||||
syncAllowance: DefaultSyncAllowance,
|
||||
}
|
||||
|
||||
whisper.filters = NewFilters(whisper)
|
||||
@ -126,11 +129,55 @@ func New(cfg *Config) *Whisper {
|
||||
return whisper
|
||||
}
|
||||
|
||||
// MinPow returns the PoW value required by this node.
|
||||
func (w *Whisper) MinPow() float64 {
|
||||
val, _ := w.settings.Load(minPowIdx)
|
||||
val, exist := w.settings.Load(minPowIdx)
|
||||
if !exist || val == nil {
|
||||
return DefaultMinimumPoW
|
||||
}
|
||||
v, ok := val.(float64)
|
||||
if !ok {
|
||||
log.Error("Error loading minPowIdx, using default")
|
||||
return DefaultMinimumPoW
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
// MinPowTolerance returns the value of minimum PoW which is tolerated for a limited
|
||||
// time after PoW was changed. If sufficient time have elapsed or no change of PoW
|
||||
// have ever occurred, the return value will be the same as return value of MinPow().
|
||||
func (w *Whisper) MinPowTolerance() float64 {
|
||||
val, exist := w.settings.Load(minPowToleranceIdx)
|
||||
if !exist || val == nil {
|
||||
return DefaultMinimumPoW
|
||||
}
|
||||
return val.(float64)
|
||||
}
|
||||
|
||||
// BloomFilter returns the aggregated bloom filter for all the topics of interest.
|
||||
// The nodes are required to send only messages that match the advertised bloom filter.
|
||||
// If a message does not match the bloom, it will tantamount to spam, and the peer will
|
||||
// be disconnected.
|
||||
func (w *Whisper) BloomFilter() []byte {
|
||||
val, exist := w.settings.Load(bloomFilterIdx)
|
||||
if !exist || val == nil {
|
||||
return nil
|
||||
}
|
||||
return val.([]byte)
|
||||
}
|
||||
|
||||
// BloomFilterTolerance returns the bloom filter which is tolerated for a limited
|
||||
// time after new bloom was advertised to the peers. If sufficient time have elapsed
|
||||
// or no change of bloom filter have ever occurred, the return value will be the same
|
||||
// as return value of BloomFilter().
|
||||
func (w *Whisper) BloomFilterTolerance() []byte {
|
||||
val, exist := w.settings.Load(bloomFilterToleranceIdx)
|
||||
if !exist || val == nil {
|
||||
return nil
|
||||
}
|
||||
return val.([]byte)
|
||||
}
|
||||
|
||||
// MaxMessageSize returns the maximum accepted message size.
|
||||
func (w *Whisper) MaxMessageSize() uint32 {
|
||||
val, _ := w.settings.Load(maxMsgSizeIdx)
|
||||
@ -180,18 +227,40 @@ func (w *Whisper) SetMaxMessageSize(size uint32) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetBloomFilter sets the new bloom filter
|
||||
func (w *Whisper) SetBloomFilter(bloom []byte) error {
|
||||
if len(bloom) != bloomFilterSize {
|
||||
return fmt.Errorf("invalid bloom filter size: %d", len(bloom))
|
||||
}
|
||||
|
||||
b := make([]byte, bloomFilterSize)
|
||||
copy(b, bloom)
|
||||
|
||||
w.settings.Store(bloomFilterIdx, b)
|
||||
w.notifyPeersAboutBloomFilterChange(b)
|
||||
|
||||
go func() {
|
||||
// allow some time before all the peers have processed the notification
|
||||
time.Sleep(time.Duration(w.syncAllowance) * time.Second)
|
||||
w.settings.Store(bloomFilterToleranceIdx, b)
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetMinimumPoW sets the minimal PoW required by this node
|
||||
func (w *Whisper) SetMinimumPoW(val float64) error {
|
||||
if val < 0.0 {
|
||||
return fmt.Errorf("invalid PoW: %f", val)
|
||||
}
|
||||
|
||||
w.settings.Store(minPowIdx, val)
|
||||
w.notifyPeersAboutPowRequirementChange(val)
|
||||
|
||||
go func() {
|
||||
// allow some time before all the peers have processed the notification
|
||||
time.Sleep(time.Duration(w.reactionAllowance) * time.Second)
|
||||
w.settings.Store(minPowIdx, val)
|
||||
time.Sleep(time.Duration(w.syncAllowance) * time.Second)
|
||||
w.settings.Store(minPowToleranceIdx, val)
|
||||
}()
|
||||
|
||||
return nil
|
||||
@ -199,21 +268,13 @@ func (w *Whisper) SetMinimumPoW(val float64) error {
|
||||
|
||||
// SetMinimumPoW sets the minimal PoW in test environment
|
||||
func (w *Whisper) SetMinimumPowTest(val float64) {
|
||||
w.notifyPeersAboutPowRequirementChange(val)
|
||||
w.settings.Store(minPowIdx, val)
|
||||
w.notifyPeersAboutPowRequirementChange(val)
|
||||
w.settings.Store(minPowToleranceIdx, val)
|
||||
}
|
||||
|
||||
func (w *Whisper) notifyPeersAboutPowRequirementChange(pow float64) {
|
||||
arr := make([]*Peer, len(w.peers))
|
||||
i := 0
|
||||
|
||||
w.peerMu.Lock()
|
||||
for p := range w.peers {
|
||||
arr[i] = p
|
||||
i++
|
||||
}
|
||||
w.peerMu.Unlock()
|
||||
|
||||
arr := w.getPeers()
|
||||
for _, p := range arr {
|
||||
err := p.notifyAboutPowRequirementChange(pow)
|
||||
if err != nil {
|
||||
@ -221,11 +282,37 @@ func (w *Whisper) notifyPeersAboutPowRequirementChange(pow float64) {
|
||||
err = p.notifyAboutPowRequirementChange(pow)
|
||||
}
|
||||
if err != nil {
|
||||
log.Warn("oversized message received", "peer", p.ID(), "error", err)
|
||||
log.Warn("failed to notify peer about new pow requirement", "peer", p.ID(), "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Whisper) notifyPeersAboutBloomFilterChange(bloom []byte) {
|
||||
arr := w.getPeers()
|
||||
for _, p := range arr {
|
||||
err := p.notifyAboutBloomFilterChange(bloom)
|
||||
if err != nil {
|
||||
// allow one retry
|
||||
err = p.notifyAboutBloomFilterChange(bloom)
|
||||
}
|
||||
if err != nil {
|
||||
log.Warn("failed to notify peer about new bloom filter", "peer", p.ID(), "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Whisper) getPeers() []*Peer {
|
||||
arr := make([]*Peer, len(w.peers))
|
||||
i := 0
|
||||
w.peerMu.Lock()
|
||||
for p := range w.peers {
|
||||
arr[i] = p
|
||||
i++
|
||||
}
|
||||
w.peerMu.Unlock()
|
||||
return arr
|
||||
}
|
||||
|
||||
// getPeer retrieves peer by ID
|
||||
func (w *Whisper) getPeer(peerID []byte) (*Peer, error) {
|
||||
w.peerMu.Lock()
|
||||
@ -459,7 +546,28 @@ func (w *Whisper) GetSymKey(id string) ([]byte, error) {
|
||||
// Subscribe installs a new message handler used for filtering, decrypting
|
||||
// and subsequent storing of incoming messages.
|
||||
func (w *Whisper) Subscribe(f *Filter) (string, error) {
|
||||
return w.filters.Install(f)
|
||||
s, err := w.filters.Install(f)
|
||||
if err == nil {
|
||||
w.updateBloomFilter(f)
|
||||
}
|
||||
return s, err
|
||||
}
|
||||
|
||||
// updateBloomFilter recalculates the new value of bloom filter,
|
||||
// and informs the peers if necessary.
|
||||
func (w *Whisper) updateBloomFilter(f *Filter) {
|
||||
aggregate := make([]byte, bloomFilterSize)
|
||||
for _, t := range f.Topics {
|
||||
top := BytesToTopic(t)
|
||||
b := TopicToBloom(top)
|
||||
aggregate = addBloom(aggregate, b)
|
||||
}
|
||||
|
||||
if !bloomFilterMatch(w.BloomFilter(), aggregate) {
|
||||
// existing bloom filter must be updated
|
||||
aggregate = addBloom(w.BloomFilter(), aggregate)
|
||||
w.SetBloomFilter(aggregate)
|
||||
}
|
||||
}
|
||||
|
||||
// GetFilter returns the filter by id.
|
||||
@ -592,7 +700,21 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
|
||||
}
|
||||
p.powRequirement = f
|
||||
case bloomFilterExCode:
|
||||
// to be implemented
|
||||
var bloom []byte
|
||||
err := packet.Decode(&bloom)
|
||||
if err == nil && len(bloom) != bloomFilterSize {
|
||||
err = fmt.Errorf("wrong bloom filter size %d", len(bloom))
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Warn("failed to decode bloom filter exchange message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
|
||||
return errors.New("invalid bloom filter exchange message")
|
||||
}
|
||||
if isFullNode(bloom) {
|
||||
p.bloomFilter = nil
|
||||
} else {
|
||||
p.bloomFilter = bloom
|
||||
}
|
||||
case p2pMessageCode:
|
||||
// peer-to-peer message, sent directly to peer bypassing PoW checks, etc.
|
||||
// this message is not supposed to be forwarded to other peers, and
|
||||
@ -633,7 +755,7 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
|
||||
sent := envelope.Expiry - envelope.TTL
|
||||
|
||||
if sent > now {
|
||||
if sent-SynchAllowance > now {
|
||||
if sent-DefaultSyncAllowance > now {
|
||||
return false, fmt.Errorf("envelope created in the future [%x]", envelope.Hash())
|
||||
} else {
|
||||
// recalculate PoW, adjusted for the time difference, plus one second for latency
|
||||
@ -642,7 +764,7 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
|
||||
}
|
||||
|
||||
if envelope.Expiry < now {
|
||||
if envelope.Expiry+SynchAllowance*2 < now {
|
||||
if envelope.Expiry+DefaultSyncAllowance*2 < now {
|
||||
return false, fmt.Errorf("very old message")
|
||||
} else {
|
||||
log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex())
|
||||
@ -655,11 +777,22 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
|
||||
}
|
||||
|
||||
if envelope.PoW() < wh.MinPow() {
|
||||
log.Debug("envelope with low PoW dropped", "PoW", envelope.PoW(), "hash", envelope.Hash().Hex())
|
||||
return false, nil // drop envelope without error for now
|
||||
// maybe the value was recently changed, and the peers did not adjust yet.
|
||||
// in this case the previous value is retrieved by MinPowTolerance()
|
||||
// for a short period of peer synchronization.
|
||||
if envelope.PoW() < wh.MinPowTolerance() {
|
||||
return false, fmt.Errorf("envelope with low PoW received: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex())
|
||||
}
|
||||
}
|
||||
|
||||
// once the status message includes the PoW requirement, an error should be returned here:
|
||||
//return false, fmt.Errorf("envelope with low PoW dropped: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex())
|
||||
if !bloomFilterMatch(wh.BloomFilter(), envelope.Bloom()) {
|
||||
// maybe the value was recently changed, and the peers did not adjust yet.
|
||||
// in this case the previous value is retrieved by BloomFilterTolerance()
|
||||
// for a short period of peer synchronization.
|
||||
if !bloomFilterMatch(wh.BloomFilterTolerance(), envelope.Bloom()) {
|
||||
return false, fmt.Errorf("envelope does not match bloom filter, hash=[%v], bloom: \n%x \n%x \n%x",
|
||||
envelope.Hash().Hex(), wh.BloomFilter(), envelope.Bloom(), envelope.Topic)
|
||||
}
|
||||
}
|
||||
|
||||
hash := envelope.Hash()
|
||||
@ -897,3 +1030,40 @@ func GenerateRandomID() (id string, err error) {
|
||||
id = common.Bytes2Hex(buf)
|
||||
return id, err
|
||||
}
|
||||
|
||||
func isFullNode(bloom []byte) bool {
|
||||
if bloom == nil {
|
||||
return true
|
||||
}
|
||||
for _, b := range bloom {
|
||||
if b != 255 {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func bloomFilterMatch(filter, sample []byte) bool {
|
||||
if filter == nil {
|
||||
// full node, accepts all messages
|
||||
return true
|
||||
}
|
||||
|
||||
for i := 0; i < bloomFilterSize; i++ {
|
||||
f := filter[i]
|
||||
s := sample[i]
|
||||
if (f | s) != f {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func addBloom(a, b []byte) []byte {
|
||||
c := make([]byte, bloomFilterSize)
|
||||
for i := 0; i < bloomFilterSize; i++ {
|
||||
c[i] = a[i] | b[i]
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
@ -843,3 +843,64 @@ func TestSymmetricSendKeyMismatch(t *testing.T) {
|
||||
t.Fatalf("received a message when keys weren't matching")
|
||||
}
|
||||
}
|
||||
|
||||
func TestBloom(t *testing.T) {
|
||||
topic := TopicType{0, 0, 255, 6}
|
||||
b := TopicToBloom(topic)
|
||||
x := make([]byte, bloomFilterSize)
|
||||
x[0] = byte(1)
|
||||
x[32] = byte(1)
|
||||
x[bloomFilterSize-1] = byte(128)
|
||||
if !bloomFilterMatch(x, b) || !bloomFilterMatch(b, x) {
|
||||
t.Fatalf("bloom filter does not match the mask")
|
||||
}
|
||||
|
||||
_, err := mrand.Read(b)
|
||||
if err != nil {
|
||||
t.Fatalf("math rand error")
|
||||
}
|
||||
_, err = mrand.Read(x)
|
||||
if err != nil {
|
||||
t.Fatalf("math rand error")
|
||||
}
|
||||
if !bloomFilterMatch(b, b) {
|
||||
t.Fatalf("bloom filter does not match self")
|
||||
}
|
||||
x = addBloom(x, b)
|
||||
if !bloomFilterMatch(x, b) {
|
||||
t.Fatalf("bloom filter does not match combined bloom")
|
||||
}
|
||||
if !isFullNode(nil) {
|
||||
t.Fatalf("isFullNode did not recognize nil as full node")
|
||||
}
|
||||
x[17] = 254
|
||||
if isFullNode(x) {
|
||||
t.Fatalf("isFullNode false positive")
|
||||
}
|
||||
for i := 0; i < bloomFilterSize; i++ {
|
||||
b[i] = byte(255)
|
||||
}
|
||||
if !isFullNode(b) {
|
||||
t.Fatalf("isFullNode false negative")
|
||||
}
|
||||
if bloomFilterMatch(x, b) {
|
||||
t.Fatalf("bloomFilterMatch false positive")
|
||||
}
|
||||
if !bloomFilterMatch(b, x) {
|
||||
t.Fatalf("bloomFilterMatch false negative")
|
||||
}
|
||||
|
||||
w := New(&DefaultConfig)
|
||||
f := w.BloomFilter()
|
||||
if f != nil {
|
||||
t.Fatalf("wrong bloom on creation")
|
||||
}
|
||||
err = w.SetBloomFilter(x)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to set bloom filter: %s", err)
|
||||
}
|
||||
f = w.BloomFilter()
|
||||
if !bloomFilterMatch(f, x) || !bloomFilterMatch(x, f) {
|
||||
t.Fatalf("retireved wrong bloom filter")
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user