Fix parallel recovery senders (#962)

* fix parallel context creation

* fix a typo
This commit is contained in:
Igor Mandrigin 2020-08-22 12:08:47 +02:00 committed by GitHub
parent 7c16160087
commit 18df6cd182
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 9 deletions

View File

@ -47,6 +47,7 @@ import "C"
import (
"errors"
"math/big"
"runtime"
"unsafe"
)
@ -54,12 +55,18 @@ type Context struct {
context *C.secp256k1_context
}
var context *C.secp256k1_context
var DefaultContext *Context // to avoid allocating structures every time on `RecoverPubkey` w/o context
var (
context *C.secp256k1_context
contextsForThreads []*C.secp256k1_context
DefaultContext *Context // to avoid allocating structures every time on `RecoverPubkey` w/o context
)
func init() {
context = initContext()
DefaultContext = &Context{context}
for i := 0; i < runtime.NumCPU(); i++ {
contextsForThreads = append(contextsForThreads, initContext())
}
}
func initContext() *C.secp256k1_context {
@ -70,6 +77,14 @@ func initContext() *C.secp256k1_context {
return ctx
}
func ContextForThread(threadNo int) *Context {
return &Context{contextsForThreads[threadNo]}
}
func NumOfContexts() int {
return len(contextsForThreads)
}
func NewContext() *Context {
ctx := initContext()
return &Context{ctx}

View File

@ -5,7 +5,6 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"math/big"
"os"
"runtime/pprof"
@ -13,6 +12,8 @@ import (
"sync"
"time"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/common/etl"
@ -153,12 +154,11 @@ func SpawnRecoverSendersStage(cfg Stage3Config, s *StageState, db ethdb.Database
wg := new(sync.WaitGroup)
wg.Add(cfg.NumOfGoroutines)
for i := 0; i < cfg.NumOfGoroutines; i++ {
go func() {
go func(threadNo int) {
defer wg.Done()
// each goroutine gets it's own crypto context to make sure they are really parallel
recoverSenders(secp256k1.NewContext(), config, jobs, out, quitCh)
}()
recoverSenders(secp256k1.ContextForThread(threadNo), config, jobs, out, quitCh)
}(i)
}
log.Info("Sync (Senders): Started recoverer goroutines", "numOfGoroutines", cfg.NumOfGoroutines)
go func() {

View File

@ -1,11 +1,11 @@
package stagedsync
import (
"runtime"
"time"
"github.com/ledgerwatch/turbo-geth/core"
"github.com/ledgerwatch/turbo-geth/core/vm"
"github.com/ledgerwatch/turbo-geth/crypto/secp256k1"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
@ -68,7 +68,7 @@ func PrepareStagedSync(
ExecFunc: func(s *StageState, u Unwinder) error {
const batchSize = 10000
const blockSize = 4096
n := runtime.NumCPU()
n := secp256k1.NumOfContexts() // we can only be as parallels as our crypto library supports
cfg := Stage3Config{
BatchSize: batchSize,