Merge pull request #15 from rauljordan/txpool

Subscribe to block headers from running geth node to propose collations

Former-commit-id: d7dc186afaab44e43436a4bfbdce9c681a147a1e [formerly a038be9a6572a3797ee2fd41f034d605ad18b9bb]
Former-commit-id: 76113ca9c383cbc115e009a3f5f9871b6d90f0b3
This commit is contained in:
Raul Jordan 2018-02-07 00:07:02 -06:00 committed by GitHub
commit 887a4865d2
8 changed files with 330 additions and 666 deletions

View File

@ -17,7 +17,7 @@ var (
Flags: []cli.Flag{utils.DataDirFlag, utils.PasswordFileFlag, utils.NetworkIdFlag},
Category: "SHARDING COMMANDS",
Description: `
The Geth sharding client connects to a running geth node in sharding mode. This feature is a work in progress.
Launches a sharding client that connects to a running geth node and proposes collations to a Validator Manager Contract. This feature is a work in progress.
`,
}
)

View File

@ -1,13 +1,18 @@
package sharding
import (
"context"
"fmt"
"io/ioutil"
"math/big"
"strings"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
@ -72,22 +77,34 @@ func (c *Client) Start() error {
return err
}
// TODO: Wait to be selected as collator in goroutine?
// Deposit 100ETH into the validator set in the VMC. Checks if account
// is already a validator in the VMC (in the case the client restarted).
// Once that's done we can subscribe to block headers.
//
// TODO: this function should store the validator's VMC index as a property
// in the client's struct
if err := joinValidatorSet(c); err != nil {
return err
}
// Listens to block headers from the geth node and if we are an eligible
// proposer, we fetch pending transactions and propose a collation
if err := subscribeBlockHeaders(c); err != nil {
return err
}
return nil
}
// Wait until sharding client is shutdown.
func (c *Client) Wait() {
// TODO: Blocking lock.
log.Info("Sharding client has been shutdown...")
}
// dialRPC endpoint to node.
func dialRPC(endpoint string) (*rpc.Client, error) {
if endpoint == "" {
endpoint = node.DefaultIPCEndpoint(clientIdentifier)
}
return rpc.Dial(endpoint)
// WatchCollationHeaders checks the logs for add_header func calls
// and updates the head collation of the client. We can probably store
// this as a property of the client struct
func (c *Client) WatchCollationHeaders() {
}
// UnlockAccount will unlock the specified account using utils.PasswordFileFlag or empty string if unset.
@ -105,3 +122,49 @@ func (c *Client) unlockAccount(account accounts.Account) error {
return c.keystore.Unlock(account, pass)
}
func (c *Client) createTXOps(value *big.Int) (bind.TransactOpts, error) {
accounts := c.keystore.Accounts()
if len(accounts) == 0 {
return bind.TransactOpts{}, fmt.Errorf("no accounts found")
}
if err := c.unlockAccount(accounts[0]); err != nil {
return bind.TransactOpts{}, fmt.Errorf("unable to unlock account 0: %v", err)
}
if value.Cmp(big.NewInt(0)) == 0 {
return bind.TransactOpts{
From: accounts[0].Address,
Signer: func(signer types.Signer, addr common.Address, tx *types.Transaction) (*types.Transaction, error) {
networkID, err := c.client.NetworkID(context.Background())
if err != nil {
return nil, fmt.Errorf("unable to fetch networkID: %v", err)
}
return c.keystore.SignTx(accounts[0], tx, networkID /* chainID */)
},
}, nil
}
return bind.TransactOpts{
From: accounts[0].Address,
Value: value,
Signer: func(signer types.Signer, addr common.Address, tx *types.Transaction) (*types.Transaction, error) {
networkID, err := c.client.NetworkID(context.Background())
if err != nil {
return nil, fmt.Errorf("unable to fetch networkID: %v", err)
}
return c.keystore.SignTx(accounts[0], tx, networkID /* chainID */)
},
}, nil
}
// dialRPC endpoint to node.
func dialRPC(endpoint string) (*rpc.Client, error) {
if endpoint == "" {
endpoint = node.DefaultIPCEndpoint(clientIdentifier)
}
return rpc.Dial(endpoint)
}

105
sharding/collator.go Normal file
View File

@ -0,0 +1,105 @@
package sharding
import (
"context"
"fmt"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"math/big"
)
// SubscribeBlockHeaders checks incoming block headers and determines if
// we are an eligible proposer for collations. Then, it finds the pending tx's
// from the running geth node and sorts them by descending order of gas price,
// eliminates those that ask for too much gas, and routes them over
// to the VMC to create a collation
func subscribeBlockHeaders(c *Client) error {
headerChan := make(chan *types.Header, 16)
_, err := c.client.SubscribeNewHead(context.Background(), headerChan)
if err != nil {
return fmt.Errorf("unable to subscribe to incoming headers. %v", err)
}
log.Info("Listening for new headers...")
for {
// TODO: Error handling for getting disconnected from the client
select {
case head := <-headerChan:
// Query the current state to see if we are an eligible proposer
log.Info(fmt.Sprintf("Received new header: %v", head.Number.String()))
// TODO: Only run this code on certain periods?
if err := checkShardsForProposal(c, head); err != nil {
return fmt.Errorf("unable to watch shards. %v", err)
}
}
}
}
// checkShardsForProposal checks if we are an eligible proposer for
// collation for the available shards in the VMC. The function calls
// getEligibleProposer from the VMC and proposes a collation if
// conditions are met
func checkShardsForProposal(c *Client, head *types.Header) error {
accounts := c.keystore.Accounts()
if len(accounts) == 0 {
return fmt.Errorf("no accounts found")
}
if err := c.unlockAccount(accounts[0]); err != nil {
return fmt.Errorf("cannot unlock account. %v", err)
}
log.Info("Checking if we are an eligible collation proposer for a shard...")
for s := int64(0); s < shardCount; s++ {
// Checks if we are an eligible proposer according to the VMC
period := head.Number.Div(head.Number, big.NewInt(periodLength))
addr, err := c.vmc.VMCCaller.GetEligibleProposer(&bind.CallOpts{}, big.NewInt(s), period)
// TODO: When we are not a proposer, we get the error of being unable to
// unmarshal empty output. Open issue to deal with this.
// If output is non-empty and the addr == coinbase
if err == nil && addr == accounts[0].Address {
log.Info(fmt.Sprintf("Selected as collator on shard: %d", s))
err := proposeCollation(s)
if err != nil {
return fmt.Errorf("could not propose collation. %v", err)
}
}
}
return nil
}
// proposeCollation interacts with the VMC directly to add a collation header
func proposeCollation(shardID int64) error {
// TODO: Adds a collation header to the VMC with the following fields:
// [
// shard_id: uint256,
// expected_period_number: uint256,
// period_start_prevhash: bytes32,
// parent_hash: bytes32,
// transactions_root: bytes32,
// coinbase: address,
// state_root: bytes32,
// receipts_root: bytes32,
// number: uint256,
// sig: bytes
// ]
//
// Before calling this, we would need to have access to the state of
// the period_start_prevhash. Refer to the comments in:
// https://github.com/ethereum/py-evm/issues/258#issuecomment-359879350
//
// This function will call FetchCandidateHead() of the VMC to obtain
// more necessary information.
//
// This functions will fetch the transactions in the txpool and and apply
// them to finish up the collation. It will then need to broadcast the
// collation to the main chain using JSON-RPC.
log.Info("Propose collation function called")
return nil
}

29
sharding/collator_test.go Normal file
View File

@ -0,0 +1,29 @@
package sharding
import (
"context"
"fmt"
"testing"
"github.com/ethereum/go-ethereum/core/types"
)
type FakeClient struct {
client *FakeEthClient
}
type FakeEthClient struct{}
type FakeSubscription struct{}
func (ec *FakeEthClient) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (FakeSubscription, error) {
return FakeSubscription{}, fmt.Errorf("error, network disconnected!")
}
func TestSubscribeHeaders(t *testing.T) {
client := &FakeClient{client: &FakeEthClient{}}
err := subscribeBlockHeaders(client)
if err != nil {
t.Errorf("subscribe new headers should work", "no error", err)
}
}

View File

@ -8,13 +8,13 @@ import (
var (
// Number of network shards
shardCount = 100
shardCount = int64(100)
// Address of the validator management contract
validatorManagerAddress = common.HexToAddress("0x0") // TODO
// Gas limit for verifying signatures
sigGasLimit = 40000
// Number of blocks in a period
periodLength = 5
periodLength = int64(5)
// Number of periods ahead of current period which the contract is able to return the collator of that period.
lookaheadPeriods = 4
// Required deposit size in wei

File diff suppressed because one or more lines are too long

View File

@ -44,7 +44,7 @@ contract VMC {
mapping (int => Receipt) receipts;
// shardId => headerHash
mapping (int => bytes32) shardHead;
// Number of validators
int numValidators;
// Number of receipts
@ -109,19 +109,19 @@ contract VMC {
index = stackPop();
else
index = int(numValidators);
validators[index] = Validator({
deposit: msg.value,
addr: msg.sender
});
++numValidators;
isValidatorDeposited[msg.sender] = true;
Deposit(msg.sender, index);
return index;
}
// Removes the validator from the validator set and refunds the deposited ether
// Removes the validator from the validator set and refunds the deposited ether
function withdraw(int _validatorIndex) public {
require(msg.sender == validators[_validatorIndex].addr);
// [FIXME] Should consider calling the validator's contract, might be useful
@ -193,7 +193,7 @@ contract VMC {
// during a future collation. Saves a `receipt ID` for this request,
// also saving `msg.sender`, `msg.value`, `to`, `shard_id`, `startgas`,
// `gasprice`, and `data`.
function txToShard(address _to, int _shardId, uint _txStartgas, uint _txGasprice,
function txToShard(address _to, int _shardId, uint _txStartgas, uint _txGasprice,
bytes12 _data) public payable returns(int) {
receipts[numReceipts] = Receipt({
shardId: _shardId,
@ -206,11 +206,11 @@ contract VMC {
});
var receiptId = numReceipts;
++numReceipts;
TxToShard(_to, _shardId, receiptId);
return receiptId;
}
function updateGasPrice(int _receiptId, uint _txGasprice) public payable returns(bool) {
require(receipts[_receiptId].sender == msg.sender);
receipts[_receiptId].txGasprice = _txGasprice;
@ -225,7 +225,7 @@ contract VMC {
emptySlotsStack[emptySlotsStackTop] = index;
++emptySlotsStackTop;
}
function stackPop() internal returns(int) {
if (isStackEmpty())
return -1;

View File

@ -3,11 +3,9 @@ package sharding
import (
"context"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/sharding/contracts"
)
@ -17,32 +15,18 @@ import (
func initVMC(c *Client) error {
b, err := c.client.CodeAt(context.Background(), validatorManagerAddress, nil)
if err != nil {
return fmt.Errorf("unable to get contract code at %s. %v", validatorManagerAddress, err)
return fmt.Errorf("unable to get contract code at %s: %v", validatorManagerAddress, err)
}
if len(b) == 0 {
log.Info(fmt.Sprintf("No validator management contract found at %s. Deploying new contract.", validatorManagerAddress.String()))
accounts := c.keystore.Accounts()
if len(accounts) == 0 {
return fmt.Errorf("no accounts found")
txOps, err := c.createTXOps(big.NewInt(0))
if err != nil {
return fmt.Errorf("unable to intiate the transaction: %v", err)
}
if err := c.unlockAccount(accounts[0]); err != nil {
return fmt.Errorf("unable to unlock account 0: %v", err)
}
ops := bind.TransactOpts{
From: accounts[0].Address,
Signer: func(signer types.Signer, addr common.Address, tx *types.Transaction) (*types.Transaction, error) {
networkID, err := c.client.NetworkID(context.Background())
if err != nil {
return nil, fmt.Errorf("unable to fetch networkID: %v", err)
}
return c.keystore.SignTx(accounts[0], tx, networkID /* chainID */)
},
}
addr, tx, contract, err := contracts.DeployVMC(&ops, c.client)
addr, tx, contract, err := contracts.DeployVMC(&txOps, c.client)
if err != nil {
return fmt.Errorf("unable to deploy validator management contract: %v", err)
}
@ -66,3 +50,23 @@ func initVMC(c *Client) error {
return nil
}
// joinValidatorSet checks if the account is a validator in the VMC. If
// the account is not in the set, it will deposit 100ETH into contract.
func joinValidatorSet(c *Client) error {
// TODO: Check if account is already in validator set. Fetch this From
// the VMC contract's validator set
txOps, err := c.createTXOps(depositSize)
if err != nil {
return fmt.Errorf("unable to intiate the deposit transaction: %v", err)
}
tx, err := c.vmc.VMCTransactor.Deposit(&txOps)
if err != nil {
return fmt.Errorf("unable to deposit eth and become a validator: %v", err)
}
log.Info(fmt.Sprintf("Deposited 100ETH into contract with transaction hash: %s", tx.Hash().String()))
return nil
}