From 6488a392a347d0d47212fdc78386e3e0e5841d7d Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 29 Jan 2015 16:52:00 +0100 Subject: [PATCH] Reimplemented message filters for rpc calls --- .../ext/ethereum.js/example/balance.html | 2 +- core/filter.go | 21 +++++ rpc/args.go | 31 ++++++- rpc/message.go | 55 +++++++++++ rpc/packages.go | 93 ++++++++++++++++++- xeth/xeth.go | 21 +++-- 6 files changed, 207 insertions(+), 16 deletions(-) diff --git a/cmd/mist/assets/ext/ethereum.js/example/balance.html b/cmd/mist/assets/ext/ethereum.js/example/balance.html index 88f55315a..4b51c5a7d 100644 --- a/cmd/mist/assets/ext/ethereum.js/example/balance.html +++ b/cmd/mist/assets/ext/ethereum.js/example/balance.html @@ -17,7 +17,7 @@ var originalBalance = web3.toDecimal(balance); document.getElementById('original').innerText = 'original balance: ' + originalBalance + ' watching...'; - web3.eth.watch({altered: coinbase}).changed(function() { + var filter = web3.eth.watch({address: coinbase}).changed(function() { balance = web3.eth.balanceAt(coinbase) var currentBalance = web3.toDecimal(balance); document.getElementById("current").innerText = 'current: ' + currentBalance; diff --git a/core/filter.go b/core/filter.go index efdd819ed..d154e7b7a 100644 --- a/core/filter.go +++ b/core/filter.go @@ -12,6 +12,17 @@ type AccountChange struct { Address, StateAddress []byte } +type FilterOptions struct { + Earliest int64 + Latest int64 + + Address []byte + Topics [][]byte + + Skip int + Max int +} + // Filtering interface type Filter struct { eth EthManager @@ -32,6 +43,16 @@ func NewFilter(eth EthManager) *Filter { return &Filter{eth: eth} } +func (self *Filter) SetOptions(options FilterOptions) { + self.earliest = options.Earliest + self.latest = options.Latest + self.skip = options.Skip + self.max = options.Max + self.address = options.Address + self.topics = options.Topics + +} + // Set the earliest and latest block for filtering. // -1 = latest block (i.e., the current block) // hash = particular hash from-to diff --git a/rpc/args.go b/rpc/args.go index ff4974792..79519e7d2 100644 --- a/rpc/args.go +++ b/rpc/args.go @@ -1,6 +1,7 @@ package rpc import "encoding/json" +import "github.com/ethereum/go-ethereum/core" type GetBlockArgs struct { BlockNumber int32 @@ -36,10 +37,6 @@ type NewTxArgs struct { Data string `json:"data"` } -// type TxResponse struct { -// Hash string -// } - func (a *NewTxArgs) requirements() error { if a.Gas == "" { return NewErrorResponse("Transact requires a 'gas' value as argument") @@ -195,3 +192,29 @@ func (obj *Sha3Args) UnmarshalJSON(b []byte) (err error) { } return } + +type FilterOptions struct { + Earliest int64 + Latest int64 + Address string + Topics []string + Skip int + Max int +} + +func toFilterOptions(options *FilterOptions) core.FilterOptions { + var opts core.FilterOptions + opts.Earliest = options.Earliest + opts.Latest = options.Latest + opts.Address = fromHex(options.Address) + opts.Topics = make([][]byte, len(options.Topics)) + for i, topic := range options.Topics { + opts.Topics[i] = fromHex(topic) + } + + return opts +} + +type FilterChangedArgs struct { + n int +} diff --git a/rpc/message.go b/rpc/message.go index e9f47634f..05f66ee95 100644 --- a/rpc/message.go +++ b/rpc/message.go @@ -21,6 +21,8 @@ import ( "encoding/json" "errors" "fmt" + + "github.com/ethereum/go-ethereum/state" ) const ( @@ -184,3 +186,56 @@ func (req *RpcRequest) ToGetCodeAtArgs() (*GetCodeAtArgs, error) { rpclogger.DebugDetailf("%T %v", args, args) return args, nil } + +func (req *RpcRequest) ToFilterArgs() (*FilterOptions, error) { + if len(req.Params) < 1 { + return nil, NewErrorResponse(ErrorArguments) + } + + args := new(FilterOptions) + r := bytes.NewReader(req.Params[0]) + err := json.NewDecoder(r).Decode(args) + if err != nil { + return nil, NewErrorResponse(ErrorDecodeArgs) + } + rpclogger.DebugDetailf("%T %v", args, args) + return args, nil +} + +func (req *RpcRequest) ToFilterChangedArgs() (int, error) { + if len(req.Params) < 1 { + return 0, NewErrorResponse(ErrorArguments) + } + + var id int + r := bytes.NewReader(req.Params[0]) + err := json.NewDecoder(r).Decode(&id) + if err != nil { + return 0, NewErrorResponse(ErrorDecodeArgs) + } + rpclogger.DebugDetailf("%T %v", id, id) + return id, nil +} + +type Log struct { + Address string `json:"address"` + Topics []string `json:"topics"` + Data string `json:"data"` +} + +func toLogs(logs state.Logs) (ls []Log) { + ls = make([]Log, len(logs)) + + for i, log := range logs { + var l Log + l.Topics = make([]string, len(log.Topics())) + l.Address = toHex(log.Address()) + l.Data = toHex(log.Data()) + for j, topic := range log.Topics() { + l.Topics[j] = toHex(topic) + } + ls[i] = l + } + + return +} diff --git a/rpc/packages.go b/rpc/packages.go index b25660b25..e8dc570fd 100644 --- a/rpc/packages.go +++ b/rpc/packages.go @@ -29,9 +29,13 @@ import ( "fmt" "math/big" "strings" + "sync" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethutil" + "github.com/ethereum/go-ethereum/event/filter" + "github.com/ethereum/go-ethereum/state" "github.com/ethereum/go-ethereum/xeth" ) @@ -53,12 +57,79 @@ type RpcServer interface { Stop() } -func NewEthereumApi(xeth *xeth.XEth) *EthereumApi { - return &EthereumApi{xeth: xeth} +type EthereumApi struct { + xeth *xeth.XEth + filterManager *filter.FilterManager + + mut sync.RWMutex + logs map[int]state.Logs } -type EthereumApi struct { - xeth *xeth.XEth +func NewEthereumApi(xeth *xeth.XEth) *EthereumApi { + api := &EthereumApi{ + xeth: xeth, + filterManager: filter.NewFilterManager(xeth.Backend().EventMux()), + logs: make(map[int]state.Logs), + } + go api.filterManager.Start() + + return api +} + +func (self *EthereumApi) NewFilter(args *FilterOptions, reply *interface{}) error { + var id int + filter := core.NewFilter(self.xeth.Backend()) + filter.LogsCallback = func(logs state.Logs) { + self.mut.Lock() + defer self.mut.Unlock() + + self.logs[id] = append(self.logs[id], logs...) + } + id = self.filterManager.InstallFilter(filter) + *reply = id + + return nil +} + +type Log struct { + Address string `json:"address"` + Topics []string `json:"topics"` + Data string `json:"data"` +} + +func toLogs(logs state.Logs) (ls []Log) { + ls = make([]Log, len(logs)) + + for i, log := range logs { + var l Log + l.Topics = make([]string, len(log.Topics())) + l.Address = toHex(log.Address()) + l.Data = toHex(log.Data()) + for j, topic := range log.Topics() { + l.Topics[j] = toHex(topic) + } + ls[i] = l + } + + return +} + +func (self *EthereumApi) FilterChanged(id int, reply *interface{}) error { + self.mut.RLock() + defer self.mut.RUnlock() + + *reply = toLogs(self.logs[id]) + + self.logs[id] = nil // empty the logs + + return nil +} + +func (self *EthereumApi) Logs(id int, reply *interface{}) error { + filter := self.filterManager.GetFilter(id) + *reply = toLogs(filter.Find()) + + return nil } func (p *EthereumApi) GetBlock(args *GetBlockArgs, reply *interface{}) error { @@ -162,7 +233,7 @@ func (p *EthereumApi) GetBalanceAt(args *GetBalanceArgs, reply *interface{}) err return err } state := p.xeth.State().SafeGet(args.Address) - *reply = BalanceRes{Balance: state.Balance().String(), Address: args.Address} + *reply = toHex(state.Balance().Bytes()) return nil } @@ -234,6 +305,18 @@ func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error return err } return p.Call(args, reply) + case "eth_newFilter": + args, err := req.ToFilterArgs() + if err != nil { + return err + } + return p.NewFilter(args, reply) + case "eth_changed": + args, err := req.ToFilterChangedArgs() + if err != nil { + return err + } + return p.FilterChanged(args, reply) case "web3_sha3": args, err := req.ToSha3Args() if err != nil { diff --git a/xeth/xeth.go b/xeth/xeth.go index 382a1f285..4cfa104d0 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -12,7 +12,9 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethutil" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/state" ) @@ -22,19 +24,22 @@ var pipelogger = logger.NewLogger("XETH") type Backend interface { BlockProcessor() *core.BlockProcessor ChainManager() *core.ChainManager - KeyManager() *crypto.KeyManager + TxPool() *core.TxPool + PeerCount() int IsMining() bool IsListening() bool - PeerCount() int + Peers() []*p2p.Peer + KeyManager() *crypto.KeyManager + ClientIdentity() p2p.ClientIdentity Db() ethutil.Database - TxPool() *core.TxPool + EventMux() *event.TypeMux } type XEth struct { eth Backend blockProcessor *core.BlockProcessor chainManager *core.ChainManager - world *State + state *State } func New(eth Backend) *XEth { @@ -43,12 +48,16 @@ func New(eth Backend) *XEth { blockProcessor: eth.BlockProcessor(), chainManager: eth.ChainManager(), } - xeth.world = NewState(xeth) + xeth.state = NewState(xeth) return xeth } -func (self *XEth) State() *State { return self.world } +func (self *XEth) Backend() Backend { + return self.eth +} + +func (self *XEth) State() *State { return self.state } func (self *XEth) BlockByHash(strHash string) *Block { hash := fromHex(strHash)