Add sync test to E2E (#4654)

* Complete evaluator for chain consensus

* Add sync e2e test

* Cleanup

* Rename

* Add tad more offset for correct head

* Change offset to middle of slot

* Change head block root to head epoch

* comment

* Fix eth1

* Address comments

* Gazelle

* Change to use file

* Change to use reader

* Use fil
This commit is contained in:
Ivan Martinez 2020-01-28 14:16:00 -05:00 committed by Raul Jordan
parent 439a84fcb9
commit ad01bfbcde
10 changed files with 329 additions and 157 deletions

View File

@ -8,6 +8,7 @@ go_test(
"endtoend_test.go",
"minimal_e2e_test.go",
],
args = ["-test.v"],
data = [
"//beacon-chain",
"//validator",
@ -28,7 +29,6 @@ go_test(
"//shared/params:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@io_bazel_rules_go//go/tools/bazel:go_default_library",
"@org_golang_google_grpc//:go_default_library",
@ -42,6 +42,7 @@ go_library(
"beacon_node.go",
"epochTimer.go",
"eth1.go",
"helpers.go",
"validator.go",
],
importpath = "github.com/prysmaticlabs/prysm/endtoend",
@ -58,7 +59,6 @@ go_library(
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
"@com_github_ethereum_go_ethereum//ethclient:go_default_library",
"@com_github_ethereum_go_ethereum//rpc:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@io_bazel_rules_go//go/tools/bazel:go_default_library",
],
)

View File

@ -1,33 +1,20 @@
package endtoend
import (
"bufio"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path"
"strings"
"testing"
"time"
"github.com/bazelbuild/rules_go/go/tools/bazel"
"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
ev "github.com/prysmaticlabs/prysm/endtoend/evaluators"
"github.com/prysmaticlabs/prysm/shared/params"
)
type beaconNodeInfo struct {
processID int
datadir string
rpcPort uint64
monitorPort uint64
grpcPort uint64
multiAddr string
}
type end2EndConfig struct {
beaconFlags []string
validatorFlags []string
@ -42,10 +29,10 @@ type end2EndConfig struct {
var beaconNodeLogFileName = "beacon-%d.log"
// startBeaconNodes starts the requested amount of beacon nodes, passing in the deposit contract given.
func startBeaconNodes(t *testing.T, config *end2EndConfig) []*beaconNodeInfo {
func startBeaconNodes(t *testing.T, config *end2EndConfig) []*ev.BeaconNodeInfo {
numNodes := config.numBeaconNodes
nodeInfo := []*beaconNodeInfo{}
nodeInfo := []*ev.BeaconNodeInfo{}
for i := uint64(0); i < numNodes; i++ {
newNode := startNewBeaconNode(t, config, nodeInfo)
nodeInfo = append(nodeInfo, newNode)
@ -54,7 +41,7 @@ func startBeaconNodes(t *testing.T, config *end2EndConfig) []*beaconNodeInfo {
return nodeInfo
}
func startNewBeaconNode(t *testing.T, config *end2EndConfig, beaconNodes []*beaconNodeInfo) *beaconNodeInfo {
func startNewBeaconNode(t *testing.T, config *end2EndConfig, beaconNodes []*ev.BeaconNodeInfo) *ev.BeaconNodeInfo {
tmpPath := config.tmpPath
index := len(beaconNodes)
binaryPath, found := bazel.FindBinary("beacon-chain", "beacon-chain")
@ -89,7 +76,7 @@ func startNewBeaconNode(t *testing.T, config *end2EndConfig, beaconNodes []*beac
// After the first node is made, have all following nodes connect to all previously made nodes.
if index >= 1 {
for p := 0; p < index; p++ {
args = append(args, fmt.Sprintf("--peer=%s", beaconNodes[p].multiAddr))
args = append(args, fmt.Sprintf("--peer=%s", beaconNodes[p].MultiAddr))
}
}
@ -110,13 +97,13 @@ func startNewBeaconNode(t *testing.T, config *end2EndConfig, beaconNodes []*beac
t.Fatalf("could not get multiaddr for node %d: %v", index, err)
}
return &beaconNodeInfo{
processID: cmd.Process.Pid,
datadir: fmt.Sprintf("%s/eth2-beacon-node-%d", tmpPath, index),
rpcPort: 4200 + uint64(index),
monitorPort: 8280 + uint64(index),
grpcPort: 3400 + uint64(index),
multiAddr: multiAddr,
return &ev.BeaconNodeInfo{
ProcessID: cmd.Process.Pid,
DataDir: fmt.Sprintf("%s/eth2-beacon-node-%d", tmpPath, index),
RPCPort: 4200 + uint64(index),
MonitorPort: 8280 + uint64(index),
GRPCPort: 3400 + uint64(index),
MultiAddr: multiAddr,
}
}
@ -139,33 +126,3 @@ func getMultiAddrFromLogFile(name string) (string, error) {
}
return contents[startIdx : startIdx+endIdx], nil
}
func waitForTextInFile(file *os.File, text string) error {
wait := 0
// Cap the wait in case there are issues starting.
maxWait := 36
for wait < maxWait {
time.Sleep(2 * time.Second)
// Rewind the file pointer to the start of the file so we can read it again.
_, err := file.Seek(0, io.SeekStart)
if err != nil {
return errors.Wrap(err, "could not rewind file to start")
}
scanner := bufio.NewScanner(file)
for scanner.Scan() {
if strings.Contains(scanner.Text(), text) {
return nil
}
}
if err := scanner.Err(); err != nil {
return err
}
wait += 2
}
contents, err := ioutil.ReadFile(file.Name())
if err != nil {
return err
}
return fmt.Errorf("could not find requested text \"%s\" in logs:\n%s", text, string(contents))
}

View File

@ -1,22 +1,17 @@
package endtoend
import (
"bufio"
"context"
"fmt"
"io/ioutil"
"net/http"
"os"
"path"
"strconv"
"strings"
"testing"
"time"
"github.com/bazelbuild/rules_go/go/tools/bazel"
ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
ev "github.com/prysmaticlabs/prysm/endtoend/evaluators"
"github.com/prysmaticlabs/prysm/shared/params"
"google.golang.org/grpc"
)
@ -36,17 +31,15 @@ func runEndToEndTest(t *testing.T, config *end2EndConfig) {
processIDs = append(processIDs, vv.processID)
}
for _, bb := range beaconNodes {
processIDs = append(processIDs, bb.processID)
processIDs = append(processIDs, bb.ProcessID)
}
defer logOutput(t, tmpPath, config)
defer killProcesses(t, processIDs)
if config.numBeaconNodes > 1 {
t.Run("all_peers_connect", func(t *testing.T) {
for _, bNode := range beaconNodes {
if err := peersConnect(bNode.monitorPort, config.numBeaconNodes-1); err != nil {
t.Fatalf("Failed to connect to peers: %v", err)
}
if err := ev.PeersConnect(beaconNodes); err != nil {
t.Fatalf("Failed to connect to peers: %v", err)
}
})
}
@ -78,6 +71,7 @@ func runEndToEndTest(t *testing.T, config *end2EndConfig) {
// Small offset so evaluators perform in the middle of an epoch.
epochSeconds := params.BeaconConfig().SecondsPerSlot * params.BeaconConfig().SlotsPerEpoch
genesisTime := time.Unix(genesis.GenesisTime.Seconds+int64(epochSeconds/2), 0)
ticker := GetEpochTicker(genesisTime, epochSeconds)
for currentEpoch := range ticker.C() {
for _, evaluator := range config.evaluators {
@ -93,98 +87,43 @@ func runEndToEndTest(t *testing.T, config *end2EndConfig) {
}
if t.Failed() || currentEpoch >= config.epochsToRun {
if err := conn.Close(); err != nil {
t.Fatal(err)
}
ticker.Done()
break
}
}
}
func peersConnect(port uint64, expectedPeers uint64) error {
response, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/p2p", port))
syncNodeInfo := startNewBeaconNode(t, config, beaconNodes)
beaconNodes = append(beaconNodes, syncNodeInfo)
index := len(beaconNodes) - 1
// Sleep until the next epoch to give time for the newly started node to sync.
nextEpochSeconds := (config.epochsToRun+2)*epochSeconds + epochSeconds/2
genesisTime.Add(time.Duration(nextEpochSeconds) * time.Second)
// Wait until middle of epoch to request to prevent conflicts.
time.Sleep(time.Until(genesisTime))
syncLogFile, err := os.Open(path.Join(tmpPath, fmt.Sprintf(beaconNodeLogFileName, index)))
if err != nil {
return errors.Wrap(err, "failed to reach p2p metrics page")
t.Fatal(err)
}
dataInBytes, err := ioutil.ReadAll(response.Body)
if err != nil {
return err
if err := waitForTextInFile(syncLogFile, "Synced up to"); err != nil {
t.Fatalf("Failed to sync: %v", err)
}
pageContent := string(dataInBytes)
if err := response.Body.Close(); err != nil {
return err
}
// Subtracting by 2 here since the libp2p page has "3 peers" as text.
// With a starting index before the "p", going two characters back should give us
// the number we need.
startIdx := strings.Index(pageContent, "peers") - 2
if startIdx == -3 {
return fmt.Errorf("could not find needed text in %s", pageContent)
}
peerCount, err := strconv.Atoi(pageContent[startIdx : startIdx+1])
if err != nil {
return err
}
if expectedPeers != uint64(peerCount) {
return fmt.Errorf("unexpected amount of peers, expected %d, received %d", expectedPeers, peerCount)
}
return nil
}
func killProcesses(t *testing.T, pIDs []int) {
for _, id := range pIDs {
process, err := os.FindProcess(id)
if err != nil {
t.Fatalf("Could not find process %d: %v", id, err)
}
if err := process.Kill(); err != nil {
t.Run("node_finishes_sync", func(t *testing.T) {
if err := ev.FinishedSyncing(syncNodeInfo.RPCPort); err != nil {
t.Fatal(err)
}
if err := process.Release(); err != nil {
})
t.Run("all_nodes_have_correct_head", func(t *testing.T) {
if err := ev.AllChainsHaveSameHead(beaconNodes); err != nil {
t.Fatal(err)
}
}
}
func logOutput(t *testing.T, tmpPath string, config *end2EndConfig) {
// Log out errors from beacon chain nodes.
for i := uint64(0); i < config.numBeaconNodes; i++ {
beaconLogFile, err := os.Open(path.Join(tmpPath, fmt.Sprintf(beaconNodeLogFileName, i)))
if err != nil {
t.Fatal(err)
}
logErrorOutput(t, beaconLogFile, "beacon chain node", i)
validatorLogFile, err := os.Open(path.Join(tmpPath, fmt.Sprintf(validatorLogFileName, i)))
if err != nil {
t.Fatal(err)
}
logErrorOutput(t, validatorLogFile, "validator client", i)
}
t.Logf("Ending time: %s\n", time.Now().String())
}
func logErrorOutput(t *testing.T, file *os.File, title string, index uint64) {
var errorLines []string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
currentLine := scanner.Text()
if strings.Contains(currentLine, "level=error") {
errorLines = append(errorLines, currentLine)
}
}
if len(errorLines) < 1 {
t.Logf("No error logs detected for %s %d", title, index)
return
}
t.Log("===================================================================")
t.Logf("Start of %s %d error output:\n", title, index)
for _, err := range errorLines {
t.Log(err)
}
t.Logf("\nEnd of %s %d error output:", title, index)
t.Log("===================================================================")
})
defer killProcesses(t, []int{syncNodeInfo.ProcessID})
}

View File

@ -118,6 +118,11 @@ func startEth1(t *testing.T, tmpPath string) (common.Address, string, int) {
time.Sleep(100 * time.Millisecond)
}
// Advancing the blocks another eth1follow distance to prevent issues reading the chain.
if err := mineBlocks(web3, keystore, params.BeaconConfig().Eth1FollowDistance); err != nil {
t.Fatalf("Unable to advance chain: %v", err)
}
return contractAddr, keystorePath, cmd.Process.Pid
}

View File

@ -5,6 +5,8 @@ go_library(
testonly = True,
srcs = [
"finality.go",
"node.go",
"types.go",
"validator.go",
],
importpath = "github.com/prysmaticlabs/prysm/endtoend/evaluators",
@ -14,5 +16,6 @@ go_library(
"@com_github_gogo_protobuf//types:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@org_golang_google_grpc//:go_default_library",
],
)

138
endtoend/evaluators/node.go Normal file
View File

@ -0,0 +1,138 @@
package evaluators
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"google.golang.org/grpc"
)
// PeersConnect checks all beacon nodes and returns whether they are connected to each other as peers.
func PeersConnect(beaconNodes []*BeaconNodeInfo) error {
for _, bNode := range beaconNodes {
response, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/p2p", bNode.MonitorPort))
if err != nil {
return errors.Wrap(err, "failed to reach p2p metrics page")
}
dataInBytes, err := ioutil.ReadAll(response.Body)
if err != nil {
return err
}
if err := response.Body.Close(); err != nil {
return err
}
// Subtracting by 2 here since the libp2p page has "3 peers" as text.
// With a starting index before the "p", going two characters back should give us
// the number we need.
startIdx := strings.Index(string(dataInBytes), "peers") - 2
if startIdx == -3 {
return fmt.Errorf("could not find needed text in %s", dataInBytes)
}
peerCount, err := strconv.Atoi(string(dataInBytes)[startIdx : startIdx+1])
if err != nil {
return err
}
expectedPeers := uint64(len(beaconNodes) - 1)
if expectedPeers != uint64(peerCount) {
return fmt.Errorf("unexpected amount of peers, expected %d, received %d", expectedPeers, peerCount)
}
}
return nil
}
// FinishedSyncing returns whether the beacon node with the given rpc port has finished syncing.
func FinishedSyncing(rpcPort uint64) error {
syncConn, err := grpc.Dial(fmt.Sprintf("127.0.0.1:%d", rpcPort), grpc.WithInsecure())
if err != nil {
return errors.Wrap(err, "failed to dial: %v")
}
syncNodeClient := eth.NewNodeClient(syncConn)
syncStatus, err := syncNodeClient.GetSyncStatus(context.Background(), &ptypes.Empty{})
if err != nil {
return err
}
if syncStatus.Syncing {
return errors.New("expected node to have completed sync")
}
return nil
}
// AllChainsHaveSameHead connects to all RPC ports in the passed in array and ensures they have the same head epoch.
// Checks finality and justification as well.
// Not checking head block root as it may change irregularly for the validator connected nodes.
func AllChainsHaveSameHead(beaconNodes []*BeaconNodeInfo) error {
headEpochs := make([]uint64, len(beaconNodes))
justifiedRoots := make([][]byte, len(beaconNodes))
prevJustifiedRoots := make([][]byte, len(beaconNodes))
finalizedRoots := make([][]byte, len(beaconNodes))
for i, bNode := range beaconNodes {
conn, err := grpc.Dial(fmt.Sprintf("127.0.0.1:%d", bNode.RPCPort), grpc.WithInsecure())
if err != nil {
return errors.Wrap(err, "Failed to dial")
}
beaconClient := eth.NewBeaconChainClient(conn)
chainHead, err := beaconClient.GetChainHead(context.Background(), &ptypes.Empty{})
if err != nil {
return err
}
headEpochs[i] = chainHead.HeadEpoch
justifiedRoots[i] = chainHead.JustifiedBlockRoot
prevJustifiedRoots[i] = chainHead.PreviousJustifiedBlockRoot
finalizedRoots[i] = chainHead.FinalizedBlockRoot
if err := conn.Close(); err != nil {
return err
}
}
for i, epoch := range headEpochs {
if headEpochs[0] != epoch {
return fmt.Errorf(
"received conflicting head epochs on node %d, expected %d, received %d",
i,
headEpochs[0],
epoch,
)
}
}
for i, root := range justifiedRoots {
if !bytes.Equal(justifiedRoots[0], root) {
return fmt.Errorf(
"received conflicting justified block roots on node %d, expected %#x, received %#x",
i,
justifiedRoots[0],
root,
)
}
}
for i, root := range prevJustifiedRoots {
if !bytes.Equal(prevJustifiedRoots[0], root) {
return fmt.Errorf(
"received conflicting previous justified block roots on node %d, expected %#x, received %#x",
i,
prevJustifiedRoots[0],
root,
)
}
}
for i, root := range finalizedRoots {
if !bytes.Equal(finalizedRoots[0], root) {
return fmt.Errorf(
"received conflicting finalized epoch roots on node %d, expected %#x, received %#x",
i,
finalizedRoots[0],
root,
)
}
}
return nil
}

View File

@ -0,0 +1,24 @@
package evaluators
import (
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
)
// Evaluator defines the structure of the evaluators used to
// conduct the current beacon state during the E2E.
type Evaluator struct {
Name string
Policy func(currentEpoch uint64) bool
Evaluation func(client eth.BeaconChainClient) error
}
// BeaconNodeInfo contains the info of ports and other required information
// needed to communicate with the beacon node it represents.
type BeaconNodeInfo struct {
ProcessID int
DataDir string
RPCPort uint64
MonitorPort uint64
GRPCPort uint64
MultiAddr string
}

View File

@ -9,14 +9,6 @@ import (
"github.com/prysmaticlabs/prysm/shared/params"
)
// Evaluator defines the structure of the evaluators used to
// conduct the current beacon state during the E2E.
type Evaluator struct {
Name string
Policy func(currentEpoch uint64) bool
Evaluation func(client eth.BeaconChainClient) error
}
// ValidatorsAreActive ensures the expected amount of validators are active.
var ValidatorsAreActive = Evaluator{
Name: "validators_active_epoch_%d",

114
endtoend/helpers.go Normal file
View File

@ -0,0 +1,114 @@
package endtoend
import (
"bufio"
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"strings"
"testing"
"time"
)
const (
maxPollingWaitTime = 36 * time.Second
filePollingInterval = 1 * time.Second
)
func killProcesses(t *testing.T, pIDs []int) {
for _, id := range pIDs {
process, err := os.FindProcess(id)
if err != nil {
t.Fatalf("Could not find process %d: %v", id, err)
}
if err := process.Kill(); err != nil {
t.Fatal(err)
}
if err := process.Release(); err != nil {
t.Fatal(err)
}
}
}
func waitForTextInFile(file *os.File, text string) error {
d := time.Now().Add(maxPollingWaitTime)
ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
// Use a ticker with a deadline to poll a given file.
ticker := time.NewTicker(filePollingInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
contents, err := ioutil.ReadAll(file)
if err != nil {
return err
}
return fmt.Errorf("could not find requested text \"%s\" in logs:\n%s", text, string(contents))
case <-ticker.C:
_, err := file.Seek(0,io.SeekStart)
if err != nil {
return err
}
fileScanner := bufio.NewScanner(file)
for fileScanner.Scan() {
scanned := fileScanner.Text()
if strings.Contains(scanned, text) {
return nil
}
}
if err := fileScanner.Err(); err != nil {
return err
}
}
}
}
func logOutput(t *testing.T, tmpPath string, config *end2EndConfig) {
// Log out errors from beacon chain nodes.
for i := uint64(0); i < config.numBeaconNodes; i++ {
beaconLogFile, err := os.Open(path.Join(tmpPath, fmt.Sprintf(beaconNodeLogFileName, i)))
if err != nil {
t.Fatal(err)
}
logErrorOutput(t, beaconLogFile, "beacon chain node", i)
validatorLogFile, err := os.Open(path.Join(tmpPath, fmt.Sprintf(validatorLogFileName, i)))
if err != nil {
t.Fatal(err)
}
logErrorOutput(t, validatorLogFile, "validator client", i)
}
t.Logf("Ending time: %s\n", time.Now().String())
}
func logErrorOutput(t *testing.T, file io.Reader, title string, index uint64) {
var errorLines []string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
currentLine := scanner.Text()
if strings.Contains(currentLine, "level=error") {
errorLines = append(errorLines, currentLine)
}
}
if len(errorLines) < 1 {
t.Logf("No error logs detected for %s %d", title, index)
return
}
t.Log("===================================================================")
t.Logf("Start of %s %d error output:\n", title, index)
for _, err := range errorLines {
t.Log(err)
}
t.Logf("\nEnd of %s %d error output:", title, index)
t.Log("===================================================================")
}

View File

@ -16,7 +16,7 @@ func TestEndToEnd_MinimalConfig(t *testing.T) {
minimalConfig := &end2EndConfig{
beaconFlags: append(featureconfig.E2EBeaconChainFlags, "--minimal-config"),
validatorFlags: append(featureconfig.E2EValidatorFlags, "--minimal-config"),
epochsToRun: 5,
epochsToRun: 6,
numBeaconNodes: 4,
numValidators: params.BeaconConfig().MinGenesisActiveValidatorCount,
evaluators: []ev.Evaluator{