turbo-api: Add docs to some public structs, methods and fields (#1127)

This commit is contained in:
Igor Mandrigin 2020-09-21 15:10:25 +01:00 committed by GitHub
parent 5aab794c9b
commit 65d8a12bb1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 160 additions and 49 deletions

View File

@ -14,10 +14,12 @@ import (
)
var (
// gitCommit is injected through the build flags (see Makefile)
gitCommit string
)
func main() {
// creating a turbo-api app with all defaults
app := turbocli.MakeApp(runTurboGeth, turbocli.DefaultFlags)
if err := app.Run(os.Args); err != nil {
fmt.Fprintln(os.Stderr, err)
@ -25,12 +27,15 @@ func main() {
}
}
func runTurboGeth(ctx *cli.Context) {
// creating staged sync with all default parameters
sync := stagedsync.New(
stagedsync.DefaultStages(),
stagedsync.DefaultUnwindOrder(),
)
// initializing the node and providing the current git commit there
tg := node.New(ctx, sync, node.Params{GitCommit: gitCommit})
// running the node
err := tg.Serve()
if err != nil {

View File

@ -15,17 +15,23 @@ import (
"github.com/urfave/cli"
)
// defining a custom command-line flag, a string
var flag = cli.StringFlag{
Name: "custom-stage-greeting",
Value: "default-value",
}
// defining a custom bucket name
const (
customBucketName = "ZZZ_0x0F_CUSTOM_BUCKET"
customBucketName = "ch.torquem.demo.tgcustom.CUSTOM_BUCKET"
)
// the regular main function
func main() {
app := turbocli.MakeApp(runTurboGeth, append(turbocli.DefaultFlags, flag))
// initializing turbo-geth application here and providing our custom flag
app := turbocli.MakeApp(runTurboGeth,
append(turbocli.DefaultFlags, flag), // always use DefaultFlags, but add a new one in the end.
)
if err := app.Run(os.Args); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
@ -34,12 +40,12 @@ func main() {
func syncStages(ctx *cli.Context) stagedsync.StageBuilders {
return append(
stagedsync.DefaultStages(),
stagedsync.StageBuilder{
ID: stages.SyncStage("0x0F_CUSTOM"),
stagedsync.DefaultStages(), // adding all default stages
stagedsync.StageBuilder{ // adding our custom stage
ID: stages.SyncStage("ch.torquem.demo.tgcustom.CUSTOM_STAGE"),
Build: func(world stagedsync.StageParameters) *stagedsync.Stage {
return &stagedsync.Stage{
ID: stages.SyncStage("0x0F_CUSTOM"),
ID: stages.SyncStage("ch.torquem.demo.tgcustom.CUSTOM_STAGE"),
Description: "Custom Stage",
ExecFunc: func(s *stagedsync.StageState, _ stagedsync.Unwinder) error {
fmt.Println("hello from the custom stage", ctx.String(flag.Name))
@ -60,12 +66,15 @@ func syncStages(ctx *cli.Context) stagedsync.StageBuilders {
)
}
// turbo-geth main function
func runTurboGeth(ctx *cli.Context) {
// creating a staged sync with our new stage
sync := stagedsync.New(
syncStages(ctx),
stagedsync.DefaultUnwindOrder(),
)
// running a node and initializing a custom bucket with all default settings
tg := node.New(ctx, sync, node.Params{
CustomBuckets: map[string]dbutils.BucketConfigItem{
customBucketName: {},

View File

@ -5,42 +5,67 @@ import (
"github.com/ledgerwatch/turbo-geth/ethdb"
)
type ExecFunc func(*StageState, Unwinder) error
type UnwindFunc func(*UnwindState, *StageState) error
// ExecFunc is the execution function for the stage to move forward.
// * state - is the current state of the stage and contains stage data.
// * unwinder - if the stage needs to cause unwinding, `unwinder` methods can be used.
type ExecFunc func(state *StageState, unwinder Unwinder) error
// UnwindFunc is the unwinding logic of the stage.
// * unwindState - contains information about the unwind itself.
// * stageState - represents the state of this stage at the beginning of unwind.
type UnwindFunc func(unwindState *UnwindState, state *StageState) error
// Stage is a single sync stage in staged sync.
type Stage struct {
ID stages.SyncStage
Disabled bool
Description string
ExecFunc ExecFunc
// ID of the sync stage. Should not be empty and should be unique. It is recommended to prefix it with reverse domain to avoid clashes (`com.example.my-stage`).
ID stages.SyncStage
// Description is a string that is shown in the logs.
Description string
// Disabled defines if the stage is disabled. It sets up when the stage is build by its `StageBuilder`.
Disabled bool
// DisabledDescription shows in the log with a message if the stage is disabled. Here, you can show which command line flags should be provided to enable the page.
DisabledDescription string
UnwindFunc UnwindFunc
// ExecFunc is called when the stage is executed. The main logic of the stage should be here. Should always end with `s.Done()` to allow going to the next stage. MUST NOT be nil!
ExecFunc ExecFunc
// UnwindFunc is called when the stage should be unwound. The unwind logic should be there. MUST NOT be nil!
UnwindFunc UnwindFunc
}
// StageState is the state of the stage.
type StageState struct {
state *State
Stage stages.SyncStage
state *State
// Stage is the ID of this stage.
Stage stages.SyncStage
// BlockNumber is the current block number of the stage at the beginning of the state execution.
BlockNumber uint64
StageData []byte
// StageData (optional) is the additional data for the stage execution at the beginning.
StageData []byte
}
// Update updates the stage state (current block number) in the database. Can be called multiple times during stage execution.
func (s *StageState) Update(db ethdb.Putter, newBlockNum uint64) error {
return stages.SaveStageProgress(db, s.Stage, newBlockNum, nil)
}
// UpdateWithStageData updates both the current block number for that stage, as well as some additional information as array of bytes: stageData.
func (s *StageState) UpdateWithStageData(db ethdb.Putter, newBlockNum uint64, stageData []byte) error {
return stages.SaveStageProgress(db, s.Stage, newBlockNum, stageData)
}
// Done makes sure that the stage execution is complete and proceeds to the next state.
// If Done() is not called and the stage `ExecFunc` exits, then the same stage will be called again.
// This side effect is useful for something like block body download.
func (s *StageState) Done() {
s.state.NextStage()
}
// ExecutionAt gets the current state of the "Execution" stage, which block is currently executed.
func (s *StageState) ExecutionAt(db ethdb.Getter) (uint64, error) {
execution, _, err := stages.GetStageProgress(db, stages.Execution)
return execution, err
}
// DoneAndUpdate a convenience method combining both `Done()` and `Update()` calls together.
func (s *StageState) DoneAndUpdate(db ethdb.Putter, newBlockNum uint64) error {
err := stages.SaveStageProgress(db, s.Stage, newBlockNum, nil)
s.state.NextStage()

View File

@ -11,18 +11,24 @@ import (
"github.com/ledgerwatch/turbo-geth/params"
)
// StageParameters contains the stage that stages receives at runtime when initializes.
// Then the stage can use it to receive different useful functions.
type StageParameters struct {
d DownloaderGlue
chainConfig *params.ChainConfig
chainContext core.ChainContext
vmConfig *vm.Config
db ethdb.Database
TX ethdb.Database
pid string
hdd bool
storageMode ethdb.StorageMode
datadir string
quitCh <-chan struct{}
d DownloaderGlue
chainConfig *params.ChainConfig
chainContext core.ChainContext
vmConfig *vm.Config
db ethdb.Database
// TX is a current transaction that staged sync runs in. It contains all the latest changes that DB has.
// It can be used for both reading and writing.
TX ethdb.Database
pid string
hdd bool
storageMode ethdb.StorageMode
datadir string
// QuitCh is a channel that is closed. This channel is useful to listen to when
// the stage can take significant time and gracefully shutdown at Ctrl+C.
QuitCh <-chan struct{}
headersFetchers []func() error
txPool *core.TxPool
poolStart func() error
@ -30,13 +36,18 @@ type StageParameters struct {
prefetchedBlocks *PrefetchedBlocks
}
// StageBuilder represent an object to create a single stage for staged sync
type StageBuilder struct {
ID stages.SyncStage
// ID is the stage identifier. Should be unique. It is recommended to prefix it with reverse domain `com.example.my-stage` to avoid conflicts.
ID stages.SyncStage
// Build is a factory function that initializes the sync stage based on the `StageParameters` provided.
Build func(StageParameters) *Stage
}
// StageBuilders represents an ordered list of builders to build different stages. It also contains helper methods to change the list of stages.
type StageBuilders []StageBuilder
// Build creates sync states out of builders
func (bb StageBuilders) Build(world StageParameters) []*Stage {
stages := make([]*Stage, len(bb))
for i, builder := range bb {
@ -45,6 +56,7 @@ func (bb StageBuilders) Build(world StageParameters) []*Stage {
return stages
}
// DefaultStages contains the list of default stage builders that are used by turbo-geth.
func DefaultStages() StageBuilders {
return []StageBuilder{
{
@ -69,7 +81,7 @@ func DefaultStages() StageBuilders {
ID: stages.BlockHashes,
Description: "Write block hashes",
ExecFunc: func(s *StageState, u Unwinder) error {
return SpawnBlockHashStage(s, world.db, world.datadir, world.quitCh)
return SpawnBlockHashStage(s, world.db, world.datadir, world.QuitCh)
},
UnwindFunc: func(u *UnwindState, s *StageState) error {
return u.Done(world.db)
@ -113,7 +125,7 @@ func DefaultStages() StageBuilders {
ReadChLen: 4,
Now: time.Now(),
}
return SpawnRecoverSendersStage(cfg, s, world.TX, world.chainConfig, 0, world.datadir, world.quitCh)
return SpawnRecoverSendersStage(cfg, s, world.TX, world.chainConfig, 0, world.datadir, world.QuitCh)
},
UnwindFunc: func(u *UnwindState, s *StageState) error {
return UnwindSendersStage(u, world.TX)
@ -128,7 +140,7 @@ func DefaultStages() StageBuilders {
ID: stages.Execution,
Description: "Execute blocks w/o hash checks",
ExecFunc: func(s *StageState, u Unwinder) error {
return SpawnExecuteBlocksStage(s, world.TX, world.chainConfig, world.chainContext, world.vmConfig, 0 /* limit (meaning no limit) */, world.quitCh, world.storageMode.Receipts, world.hdd, world.changeSetHook)
return SpawnExecuteBlocksStage(s, world.TX, world.chainConfig, world.chainContext, world.vmConfig, 0 /* limit (meaning no limit) */, world.QuitCh, world.storageMode.Receipts, world.hdd, world.changeSetHook)
},
UnwindFunc: func(u *UnwindState, s *StageState) error {
return UnwindExecutionStage(u, s, world.TX, world.storageMode.Receipts)
@ -143,10 +155,10 @@ func DefaultStages() StageBuilders {
ID: stages.HashState,
Description: "Hash the key in the state",
ExecFunc: func(s *StageState, u Unwinder) error {
return SpawnHashStateStage(s, world.TX, world.datadir, world.quitCh)
return SpawnHashStateStage(s, world.TX, world.datadir, world.QuitCh)
},
UnwindFunc: func(u *UnwindState, s *StageState) error {
return UnwindHashStateStage(u, s, world.TX, world.datadir, world.quitCh)
return UnwindHashStateStage(u, s, world.TX, world.datadir, world.QuitCh)
},
}
},
@ -158,10 +170,10 @@ func DefaultStages() StageBuilders {
ID: stages.IntermediateHashes,
Description: "Generate intermediate hashes and computing state root",
ExecFunc: func(s *StageState, u Unwinder) error {
return SpawnIntermediateHashesStage(s, world.TX, world.datadir, world.quitCh)
return SpawnIntermediateHashesStage(s, world.TX, world.datadir, world.QuitCh)
},
UnwindFunc: func(u *UnwindState, s *StageState) error {
return UnwindIntermediateHashesStage(u, s, world.TX, world.datadir, world.quitCh)
return UnwindIntermediateHashesStage(u, s, world.TX, world.datadir, world.QuitCh)
},
}
},
@ -175,10 +187,10 @@ func DefaultStages() StageBuilders {
Disabled: !world.storageMode.History,
DisabledDescription: "Enable by adding `h` to --storage-mode",
ExecFunc: func(s *StageState, u Unwinder) error {
return SpawnAccountHistoryIndex(s, world.TX, world.datadir, world.quitCh)
return SpawnAccountHistoryIndex(s, world.TX, world.datadir, world.QuitCh)
},
UnwindFunc: func(u *UnwindState, s *StageState) error {
return UnwindAccountHistoryIndex(u, world.TX, world.quitCh)
return UnwindAccountHistoryIndex(u, world.TX, world.QuitCh)
},
}
},
@ -192,10 +204,10 @@ func DefaultStages() StageBuilders {
Disabled: !world.storageMode.History,
DisabledDescription: "Enable by adding `h` to --storage-mode",
ExecFunc: func(s *StageState, u Unwinder) error {
return SpawnStorageHistoryIndex(s, world.TX, world.datadir, world.quitCh)
return SpawnStorageHistoryIndex(s, world.TX, world.datadir, world.QuitCh)
},
UnwindFunc: func(u *UnwindState, s *StageState) error {
return UnwindStorageHistoryIndex(u, world.TX, world.quitCh)
return UnwindStorageHistoryIndex(u, world.TX, world.QuitCh)
},
}
},
@ -209,10 +221,10 @@ func DefaultStages() StageBuilders {
Disabled: !world.storageMode.TxIndex,
DisabledDescription: "Enable by adding `t` to --storage-mode",
ExecFunc: func(s *StageState, u Unwinder) error {
return SpawnTxLookup(s, world.TX, world.datadir, world.quitCh)
return SpawnTxLookup(s, world.TX, world.datadir, world.QuitCh)
},
UnwindFunc: func(u *UnwindState, s *StageState) error {
return UnwindTxLookup(u, s, world.TX, world.datadir, world.quitCh)
return UnwindTxLookup(u, s, world.TX, world.datadir, world.QuitCh)
},
}
},
@ -224,10 +236,10 @@ func DefaultStages() StageBuilders {
ID: stages.TxPool,
Description: "Update transaction pool",
ExecFunc: func(s *StageState, _ Unwinder) error {
return spawnTxPool(s, world.TX, world.txPool, world.poolStart, world.quitCh)
return spawnTxPool(s, world.TX, world.txPool, world.poolStart, world.QuitCh)
},
UnwindFunc: func(u *UnwindState, s *StageState) error {
return unwindTxPool(u, s, world.TX, world.txPool, world.quitCh)
return unwindTxPool(u, s, world.TX, world.txPool, world.QuitCh)
},
}
},
@ -260,8 +272,15 @@ func DefaultStages() StageBuilders {
}
}
// UnwindOrder represents the order in which the stages needs to be unwound.
// Currently it is using indexes of stages, 0-based.
// The unwind order is important and not always just stages going backwards.
// Let's say, there is tx pool (state 10) can be unwound only after execution
// is fully unwound (stages 9...3).
type UnwindOrder []int
// DefaultUnwindOrder contains the default unwind order for `DefaultStages()`.
// Just adding stages that don't do unwinding, don't require altering the default order.
func DefaultUnwindOrder() UnwindOrder {
return []int{
0, 1, 2,

View File

@ -51,7 +51,7 @@ func (stagedSync *StagedSync) Prepare(
pid: pid,
storageMode: storageMode,
datadir: datadir,
quitCh: quitCh,
QuitCh: quitCh,
headersFetchers: headersFetchers,
txPool: txPool,
poolStart: poolStart,

View File

@ -28,6 +28,8 @@ import (
)
// SyncStage represents the stages of syncronisation in the SyncMode.StagedSync mode
// It is used to persist the information about the stage state into the database.
// It should not be empty and should be unique.
type SyncStage []byte
var (

View File

@ -5,16 +5,23 @@ import (
"github.com/ledgerwatch/turbo-geth/ethdb"
)
// Unwinder allows the stage to cause an unwind.
type Unwinder interface {
// UnwindTo begins staged sync unwind to the specified block.
UnwindTo(uint64, ethdb.Database) error
}
// UnwindState contains the information about unwind.
type UnwindState struct {
Stage stages.SyncStage
// Stage is the ID of the stage
Stage stages.SyncStage
// UnwindPoint is the block to unwind to.
UnwindPoint uint64
StageData []byte
// StageData is additional data for unwind (useful for long unwinds with ETL that can be interrupted by a user).
StageData []byte
}
// Done() updates the DB state of the stage.
func (u *UnwindState) Done(db ethdb.Putter) error {
err := stages.SaveStageProgress(db, u.Stage, u.UnwindPoint, nil)
if err != nil {
@ -23,10 +30,12 @@ func (u *UnwindState) Done(db ethdb.Putter) error {
return stages.SaveStageUnwind(db, u.Stage, 0, nil)
}
// UpdateWithStageData() sets data for stage unwind (that can later be retrieved by `UnwindState.StageData`).
func (u *UnwindState) UpdateWithStageData(db ethdb.Putter, stageData []byte) error {
return stages.SaveStageUnwind(db, u.Stage, u.UnwindPoint, stageData)
}
// Skip() ignores the unwind
func (u *UnwindState) Skip(db ethdb.Putter) error {
return stages.SaveStageUnwind(db, u.Stage, 0, nil)
}

22
turbo/README.md Normal file
View File

@ -0,0 +1,22 @@
Turbo-API
---
Turbo-API is a set of tools for building applications containing turbo-geth node.
Our own binary [`tg`](../cmd/tg) is built using it.
## Modules
* [`cli`](./cli) - turbo-cli, methods & helpers to run a CLI app with turbo-geth node.
* [`node`](./node) - represents an Ethereum node, running devp2p and sync and writing state to the database.
* [`stagedsync`](../eth/stagedsync) - staged sync algorithm.
## Examples
* [`tg`](../cmd/tg/main.go) - our binary is using turbo-api with all defaults
* [`tgcustom`](../cmd/tgcustom/main.go) - a very simple example of adding a custom stage, a custom bucket and a custom command-line parameter
* [turbo-api-examples](https://github.com/mandrigin/turbo-api-examples) - a series of examples for turbo-geth api

View File

@ -6,6 +6,7 @@ import (
"github.com/urfave/cli"
)
// DefaultFlags contains all flags that are used and supported by turbo-geth binary.
var DefaultFlags = []cli.Flag{
utils.DataDirFlag,
utils.KeyStoreDirFlag,

View File

@ -1,3 +1,4 @@
// Package cli contains framework for building a command-line based turbo-geth node.
package cli
import (
@ -8,6 +9,11 @@ import (
"github.com/urfave/cli"
)
// MakeApp creates a cli application (based on `github.com/urlfave/cli` package).
// The application exits when `action` returns.
// Parameters:
// * action: the main function for the application. receives `*cli.Context` with parsed command-line flags. Returns no error, if some error could not be recovered from write to the log or panic.
// * cliFlags: the list of flags `cli.Flag` that the app should set and parse. By default, use `DefaultFlags()`. If you want to specify your own flag, use `append(DefaultFlags(), myFlag)` for this parameter.
func MakeApp(action func(*cli.Context), cliFlags []cli.Flag) *cli.App {
app := flags.NewApp("", "", "turbo-geth experimental cli")
app.Action = action

View File

@ -1,3 +1,4 @@
// Package node contains classes for running a turbo-geth node.
package node
import (
@ -20,11 +21,14 @@ import (
gopsutil "github.com/shirou/gopsutil/mem"
)
// TurboGethNode represents a single node, that runs sync and p2p network.
// it also can export the private endpoint for RPC daemon, etc.
type TurboGethNode struct {
stack *node.Node
backend *eth.Ethereum
}
// Serve runs the node and blocks the execution. It returns when the node is existed.
func (tg *TurboGethNode) Serve() error {
defer tg.stack.Close()
@ -42,19 +46,28 @@ func (tg *TurboGethNode) run() {
// see cmd/geth/main.go#startNode for full implementation
}
// Params contains optional parameters for creating a node.
// * GitCommit is a commit from which then node was built.
// * CustomBuckets is a `map[string]dbutils.BucketConfigItem`, that contains bucket name and its properties.
//
// NB: You have to declare your custom buckets here to be able to use them in the app.
type Params struct {
GitCommit string
CustomBuckets dbutils.BucketsCfg
}
// New creates a new `TurboGethNode`.
// * ctx - `*cli.Context` from the main function. Necessary to be able to configure the node based on the command-line flags
// * sync - `stagedsync.StagedSync`, an instance of staged sync, setup just as needed.
// * optionalParams - additional parameters for running a node.
func New(
ctx *cli.Context,
sync *stagedsync.StagedSync,
p Params,
optionalParams Params,
) *TurboGethNode {
prepareBuckets(p.CustomBuckets)
prepareBuckets(optionalParams.CustomBuckets)
prepare(ctx)
nodeConfig := makeNodeConfig(ctx, p)
nodeConfig := makeNodeConfig(ctx, optionalParams)
node := makeConfigNode(nodeConfig)
ethConfig := makeEthConfig(ctx, node)