Fix for RPC daemon leak (#1059)

* Start memory prof

* Fix rpctest

* Fix rpctest

* Attempt to fix the leak

* Remove http pprof
This commit is contained in:
ledgerwatch 2020-09-05 21:58:51 +01:00 committed by GitHub
parent 787bba4f48
commit 3a92b2b39d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 46 additions and 37 deletions

View File

@ -1,9 +1,10 @@
package main package main
import ( import (
"github.com/ledgerwatch/turbo-geth/cmd/utils"
"os" "os"
"github.com/ledgerwatch/turbo-geth/cmd/utils"
"github.com/ledgerwatch/turbo-geth/cmd/rpcdaemon/cli" "github.com/ledgerwatch/turbo-geth/cmd/rpcdaemon/cli"
"github.com/ledgerwatch/turbo-geth/cmd/rpcdaemon/commands" "github.com/ledgerwatch/turbo-geth/cmd/rpcdaemon/commands"
"github.com/ledgerwatch/turbo-geth/log" "github.com/ledgerwatch/turbo-geth/log"

View File

@ -4,12 +4,13 @@ import (
"bytes" "bytes"
"encoding/base64" "encoding/base64"
"fmt" "fmt"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/core/state"
"net/http" "net/http"
"os" "os"
"path" "path"
"time" "time"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/core/state"
) )
var routes map[string]string var routes map[string]string
@ -127,34 +128,36 @@ func Bench1(tgURL, gethURL string, needCompare bool, fullTest bool) {
} }
for nextKeyG != nil { if needCompare {
var srGeth DebugStorageRange for nextKeyG != nil {
res = reqGen.Geth("debug_storageRangeAt", reqGen.storageRangeAt(b.Result.Hash, i, tx.To, *nextKeyG), &srGeth) var srGeth DebugStorageRange
resultsCh <- res res = reqGen.Geth("debug_storageRangeAt", reqGen.storageRangeAt(b.Result.Hash, i, tx.To, *nextKeyG), &srGeth)
if res.Err != nil { resultsCh <- res
fmt.Printf("Could not get storageRange (geth): %s: %v\n", tx.Hash, res.Err) if res.Err != nil {
fmt.Printf("Could not get storageRange (geth): %s: %v\n", tx.Hash, res.Err)
return
}
if srGeth.Error != nil {
fmt.Printf("Error getting storageRange (geth): %d %s\n", srGeth.Error.Code, srGeth.Error.Message)
break
} else {
for k, v := range srGeth.Result.Storage {
smg[k] = v
if v.Key == nil {
fmt.Printf("%x: %x", k, v)
}
}
nextKeyG = srGeth.Result.NextKey
}
}
if !compareStorageRanges(sm, smg) {
fmt.Printf("len(sm) %d, len(smg) %d\n", len(sm), len(smg))
fmt.Printf("================sm\n")
printStorageRange(sm)
fmt.Printf("================smg\n")
printStorageRange(smg)
return return
} }
if srGeth.Error != nil {
fmt.Printf("Error getting storageRange (geth): %d %s\n", srGeth.Error.Code, srGeth.Error.Message)
break
} else {
for k, v := range srGeth.Result.Storage {
smg[k] = v
if v.Key == nil {
fmt.Printf("%x: %x", k, v)
}
}
nextKeyG = srGeth.Result.NextKey
}
}
if !compareStorageRanges(sm, smg) {
fmt.Printf("len(sm) %d, len(smg) %d\n", len(sm), len(smg))
fmt.Printf("================sm\n")
printStorageRange(sm)
fmt.Printf("================smg\n")
printStorageRange(smg)
return
} }
} }
} }

View File

@ -4,14 +4,15 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/core/state"
"github.com/ledgerwatch/turbo-geth/crypto"
"github.com/ledgerwatch/turbo-geth/log"
"io" "io"
"net/http" "net/http"
"strings" "strings"
"time" "time"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/core/state"
"github.com/ledgerwatch/turbo-geth/crypto"
"github.com/ledgerwatch/turbo-geth/log"
) )
func compareBlocks(b, bg *EthBlockByNumber) bool { func compareBlocks(b, bg *EthBlockByNumber) bool {
@ -425,6 +426,7 @@ func print(client *http.Client, url, request string) {
} }
func setRoutes(tgUrl, gethURL string) { func setRoutes(tgUrl, gethURL string) {
routes = make(map[string]string)
routes[TurboGeth] = tgUrl routes[TurboGeth] = tgUrl
routes[Geth] = gethURL routes[Geth] = gethURL
} }

View File

@ -54,6 +54,7 @@ type remoteCursor struct {
ctx context.Context ctx context.Context
prefix []byte prefix []byte
stream remote.KV_SeekClient stream remote.KV_SeekClient
streamCancelFn context.CancelFunc // this function needs to be called to close the stream
tx *remoteTx tx *remoteTx
bucketName string bucketName string
} }
@ -186,7 +187,7 @@ func (tx *remoteTx) Commit(ctx context.Context) error {
func (tx *remoteTx) Rollback() { func (tx *remoteTx) Rollback() {
for _, c := range tx.cursors { for _, c := range tx.cursors {
if c.stream != nil { if c.stream != nil {
_ = c.stream.CloseSend() c.streamCancelFn()
c.stream = nil c.stream = nil
} }
} }
@ -217,7 +218,7 @@ func (tx *remoteTx) Get(bucket string, key []byte) (val []byte, err error) {
if v.stream == nil { if v.stream == nil {
return return
} }
_ = v.stream.CloseSend() v.streamCancelFn()
} }
}() }()
@ -269,13 +270,15 @@ func (c *remoteCursor) First() ([]byte, []byte, error) {
// .Next() - does request streaming (if configured by user) // .Next() - does request streaming (if configured by user)
func (c *remoteCursor) Seek(seek []byte) ([]byte, []byte, error) { func (c *remoteCursor) Seek(seek []byte) ([]byte, []byte, error) {
if c.stream != nil { if c.stream != nil {
_ = c.stream.CloseSend() c.streamCancelFn() // This will close the stream and free resources
c.stream = nil c.stream = nil
} }
c.initialized = true c.initialized = true
var err error var err error
c.stream, err = c.tx.db.remoteKV.Seek(c.ctx) var streamCtx context.Context
streamCtx, c.streamCancelFn = context.WithCancel(c.ctx) // We create child context for the stream so we can cancel it to prevent leak
c.stream, err = c.tx.db.remoteKV.Seek(streamCtx)
if err != nil { if err != nil {
return []byte{}, nil, err return []byte{}, nil, err
} }