mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-04 01:54:28 +00:00
84ae2eb1f8
* implement NoValueCursor interface * cleanup * fix tests * add more stats data to ui * can't display error * re-open DB low-level net interface when changing db * re-open DB low-level net interface when changing db * fix problem with displaying errors * run ci * improve prop check logic * storage tombstones integrity checks UI * storage page * make DB configurable Co-authored-by: alex.sharov <alex.sharov@lazada.com>
565 lines
16 KiB
Go
565 lines
16 KiB
Go
package remotedbserver
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/ledgerwatch/bolt"
|
|
"github.com/ledgerwatch/turbo-geth/ethdb"
|
|
"github.com/ledgerwatch/turbo-geth/ethdb/codecpool"
|
|
"github.com/ledgerwatch/turbo-geth/ethdb/remote"
|
|
"github.com/ledgerwatch/turbo-geth/log"
|
|
"github.com/ugorji/go/codec"
|
|
)
|
|
|
|
// Version is the current version of the remote db protocol. If the protocol changes in a non backwards compatible way,
|
|
// this constant needs to be increased
|
|
const Version uint64 = 2
|
|
|
|
// Server is to be called as a go-routine, one per every client connection.
|
|
// It runs while the connection is active and keep the entire connection's context
|
|
// in the local variables
|
|
// For tests, bytes.Buffer can be used for both `in` and `out`
|
|
func Server(ctx context.Context, db ethdb.HasBolt, in io.Reader, out io.Writer, closer io.Closer) error {
|
|
defer func() {
|
|
if err1 := closer.Close(); err1 != nil {
|
|
logger.Error("Could not close connection", "err", err1)
|
|
}
|
|
}()
|
|
|
|
decoder := codecpool.Decoder(in)
|
|
defer codecpool.Return(decoder)
|
|
encoder := codecpool.Encoder(out)
|
|
defer codecpool.Return(encoder)
|
|
|
|
// Server is passive - it runs a loop what reads remote.Commands (and their arguments) and attempts to respond
|
|
var lastHandle uint64
|
|
// Read-only transactions opened by the client
|
|
var tx *bolt.Tx
|
|
|
|
// We do Rollback and never Commit, because the remote transactions are always read-only, and must never change
|
|
// anything
|
|
defer func() {
|
|
if tx != nil {
|
|
if rollbackErr := tx.Rollback(); rollbackErr != nil {
|
|
logger.Error("could not roll back", "err", rollbackErr)
|
|
}
|
|
tx = nil
|
|
}
|
|
}()
|
|
|
|
// Buckets opened by the client
|
|
buckets := make(map[uint64]*bolt.Bucket, 2)
|
|
// List of buckets opened in each transaction
|
|
//bucketsByTx := make(map[uint64][]uint64, 10)
|
|
// Cursors opened by the client
|
|
cursors := make(map[uint64]*bolt.Cursor, 2)
|
|
// List of cursors opened in each bucket
|
|
cursorsByBucket := make(map[uint64][]uint64, 2)
|
|
|
|
var c remote.Command
|
|
var bucketHandle uint64
|
|
var cursorHandle uint64
|
|
|
|
var name []byte
|
|
var seekKey []byte
|
|
|
|
for {
|
|
// Make sure we are not blocking the resizing of the memory map
|
|
if tx != nil {
|
|
tx.Yield()
|
|
}
|
|
|
|
if err := decoder.Decode(&c); err != nil {
|
|
if err == io.EOF {
|
|
// Graceful termination when the end of the input is reached
|
|
break
|
|
}
|
|
return fmt.Errorf("could not decode remote.Command: %w", err)
|
|
}
|
|
switch c {
|
|
case remote.CmdVersion:
|
|
if err := encoder.Encode(remote.ResponseOk); err != nil {
|
|
return fmt.Errorf("could not encode response code to remote.CmdVersion: %w", err)
|
|
}
|
|
if err := encoder.Encode(Version); err != nil {
|
|
return fmt.Errorf("could not encode response to remote.CmdVersion: %w", err)
|
|
}
|
|
case remote.CmdBeginTx:
|
|
var err error
|
|
tx, err = db.DB().Begin(false)
|
|
if err != nil {
|
|
err2 := fmt.Errorf("could not start transaction for remote.CmdBeginTx: %w", err)
|
|
encodeErr(encoder, err2)
|
|
return err2
|
|
}
|
|
|
|
if err := encoder.Encode(remote.ResponseOk); err != nil {
|
|
return fmt.Errorf("could not encode response to remote.CmdBeginTx: %w", err)
|
|
}
|
|
case remote.CmdEndTx:
|
|
// Remove all the buckets
|
|
for bucketHandle := range buckets {
|
|
if cursorHandles, ok2 := cursorsByBucket[bucketHandle]; ok2 {
|
|
for _, cursorHandle := range cursorHandles {
|
|
delete(cursors, cursorHandle)
|
|
}
|
|
delete(cursorsByBucket, bucketHandle)
|
|
}
|
|
delete(buckets, bucketHandle)
|
|
}
|
|
|
|
if tx != nil {
|
|
if err := tx.Rollback(); err != nil {
|
|
return fmt.Errorf("could not end transaction: %w", err)
|
|
}
|
|
tx = nil
|
|
}
|
|
|
|
if err := encoder.Encode(remote.ResponseOk); err != nil {
|
|
return fmt.Errorf("could not encode response to remote.CmdEndTx: %w", err)
|
|
}
|
|
case remote.CmdBucket:
|
|
// Read the name of the bucket
|
|
if err := decoder.Decode(&name); err != nil {
|
|
return fmt.Errorf("could not decode name for remote.CmdBucket: %w", err)
|
|
}
|
|
|
|
// Open the bucket
|
|
if tx == nil {
|
|
err := fmt.Errorf("send remote.CmdBucket before remote.CmdBeginTx")
|
|
encodeErr(encoder, err)
|
|
return err
|
|
}
|
|
|
|
bucket := tx.Bucket(name)
|
|
if bucket == nil {
|
|
err := fmt.Errorf("bucket not found: %s", name)
|
|
encodeErr(encoder, err)
|
|
continue
|
|
}
|
|
|
|
lastHandle++
|
|
buckets[lastHandle] = bucket
|
|
if err := encoder.Encode(remote.ResponseOk); err != nil {
|
|
return fmt.Errorf("could not encode response to remote.CmdBucket: %w", err)
|
|
}
|
|
|
|
if err := encoder.Encode(lastHandle); err != nil {
|
|
return fmt.Errorf("could not encode bucketHandle in response to remote.CmdBucket: %w", err)
|
|
}
|
|
|
|
case remote.CmdGet:
|
|
var k []byte
|
|
if err := decoder.Decode(&bucketHandle); err != nil {
|
|
return fmt.Errorf("could not decode bucketHandle for remote.CmdGet: %w", err)
|
|
}
|
|
if err := decoder.Decode(&k); err != nil {
|
|
return fmt.Errorf("could not decode key for remote.CmdGet: %w", err)
|
|
}
|
|
bucket, ok := buckets[bucketHandle]
|
|
if !ok {
|
|
encodeErr(encoder, fmt.Errorf("bucket not found for remote.CmdGet: %d", bucketHandle))
|
|
continue
|
|
}
|
|
v, _ := bucket.Get(k)
|
|
|
|
if err := encoder.Encode(remote.ResponseOk); err != nil {
|
|
err = fmt.Errorf("could not encode response code for remote.CmdGet: %w", err)
|
|
return err
|
|
}
|
|
|
|
if err := encoder.Encode(&v); err != nil {
|
|
return fmt.Errorf("could not encode value in response for remote.CmdGet: %w", err)
|
|
}
|
|
|
|
case remote.CmdCursor:
|
|
if err := decoder.Decode(&bucketHandle); err != nil {
|
|
return fmt.Errorf("could not decode bucketHandle for remote.CmdCursor: %w", err)
|
|
}
|
|
bucket, ok := buckets[bucketHandle]
|
|
if !ok {
|
|
encodeErr(encoder, fmt.Errorf("bucket not found for remote.CmdCursor: %d", bucketHandle))
|
|
continue
|
|
}
|
|
|
|
cursor := bucket.Cursor()
|
|
lastHandle++
|
|
cursorHandle = lastHandle
|
|
cursors[cursorHandle] = cursor
|
|
if cursorHandles, ok1 := cursorsByBucket[bucketHandle]; ok1 {
|
|
cursorHandles = append(cursorHandles, cursorHandle)
|
|
cursorsByBucket[bucketHandle] = cursorHandles
|
|
} else {
|
|
cursorsByBucket[bucketHandle] = []uint64{cursorHandle}
|
|
}
|
|
|
|
if err := encoder.Encode(remote.ResponseOk); err != nil {
|
|
return fmt.Errorf("could not encode response for remote.CmdCursor: %w", err)
|
|
}
|
|
|
|
if err := encoder.Encode(cursorHandle); err != nil {
|
|
return fmt.Errorf("could not cursor handle in response to remote.CmdCursor: %w", err)
|
|
}
|
|
case remote.CmdCursorSeek:
|
|
if err := decoder.Decode(&cursorHandle); err != nil {
|
|
return fmt.Errorf("could not encode (key,value) for remote.CmdCursorSeek: %w", err)
|
|
}
|
|
if err := decoder.Decode(&seekKey); err != nil {
|
|
return fmt.Errorf("could not encode (key,value) for remote.CmdCursorSeek: %w", err)
|
|
}
|
|
cursor, ok := cursors[cursorHandle]
|
|
if !ok {
|
|
encodeErr(encoder, fmt.Errorf("cursor not found: %d", cursorHandle))
|
|
continue
|
|
}
|
|
k, v := cursor.Seek(seekKey)
|
|
if err := encoder.Encode(remote.ResponseOk); err != nil {
|
|
return fmt.Errorf("could not encode (key,value) for remote.CmdCursorSeek: %w", err)
|
|
}
|
|
if err := encodeKeyValue(encoder, k, v); err != nil {
|
|
return fmt.Errorf("could not encode (key,value) for remote.CmdCursorSeek: %w", err)
|
|
}
|
|
case remote.CmdCursorSeekTo:
|
|
if err := decoder.Decode(&cursorHandle); err != nil {
|
|
return fmt.Errorf("could not decode seekKey for remote.CmdCursorSeekTo: %w", err)
|
|
}
|
|
if err := decoder.Decode(&seekKey); err != nil {
|
|
return fmt.Errorf("could not decode seekKey for remote.CmdCursorSeekTo: %w", err)
|
|
}
|
|
cursor, ok := cursors[cursorHandle]
|
|
if !ok {
|
|
encodeErr(encoder, fmt.Errorf("cursor not found: %d", cursorHandle))
|
|
continue
|
|
}
|
|
|
|
k, v := cursor.SeekTo(seekKey)
|
|
|
|
if err := encoder.Encode(remote.ResponseOk); err != nil {
|
|
return fmt.Errorf("could not encode response to remote.CmdCursorSeek: %w", err)
|
|
}
|
|
|
|
if err := encodeKeyValue(encoder, k, v); err != nil {
|
|
return fmt.Errorf("could not encode (key,value) in response to remote.CmdCursorSeekTo: %w", err)
|
|
}
|
|
case remote.CmdCursorNext:
|
|
if err := decoder.Decode(&cursorHandle); err != nil {
|
|
return fmt.Errorf("could not decode cursorHandle for remote.CmdCursorNext: %w", err)
|
|
}
|
|
var numberOfKeys uint64
|
|
if err := decoder.Decode(&numberOfKeys); err != nil {
|
|
return fmt.Errorf("could not decode numberOfKeys for remote.CmdCursorNext: %w", err)
|
|
}
|
|
|
|
if numberOfKeys > remote.CursorMaxBatchSize {
|
|
encodeErr(encoder, fmt.Errorf("requested numberOfKeys is too large: %d", numberOfKeys))
|
|
continue
|
|
}
|
|
|
|
cursor, ok := cursors[cursorHandle]
|
|
if !ok {
|
|
encodeErr(encoder, fmt.Errorf("cursor not found: %d", cursorHandle))
|
|
continue
|
|
}
|
|
|
|
if err := encoder.Encode(remote.ResponseOk); err != nil {
|
|
return fmt.Errorf("could not encode response to remote.CmdCursorNext: %w", err)
|
|
}
|
|
|
|
for k, v := cursor.Next(); numberOfKeys > 0; k, v = cursor.Next() {
|
|
select {
|
|
default:
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
|
|
if err := encodeKeyValue(encoder, k, v); err != nil {
|
|
return fmt.Errorf("could not encode (key,value) in response to remote.CmdCursorNext: %w", err)
|
|
}
|
|
|
|
numberOfKeys--
|
|
if k == nil {
|
|
break
|
|
}
|
|
}
|
|
|
|
case remote.CmdCursorFirst:
|
|
if err := decoder.Decode(&cursorHandle); err != nil {
|
|
return fmt.Errorf("could not decode cursorHandle for remote.CmdCursorFirst: %w", err)
|
|
}
|
|
var numberOfKeys uint64
|
|
if err := decoder.Decode(&numberOfKeys); err != nil {
|
|
return fmt.Errorf("could not decode numberOfKeys for remote.CmdCursorFirst: %w", err)
|
|
}
|
|
cursor, ok := cursors[cursorHandle]
|
|
if !ok {
|
|
encodeErr(encoder, fmt.Errorf("cursor not found: %d", cursorHandle))
|
|
continue
|
|
}
|
|
|
|
if err := encoder.Encode(remote.ResponseOk); err != nil {
|
|
return fmt.Errorf("could not encode response code for remote.CmdCursorFirst: %w", err)
|
|
}
|
|
|
|
for k, v := cursor.First(); numberOfKeys > 0; k, v = cursor.Next() {
|
|
select {
|
|
default:
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
|
|
if err := encodeKeyValue(encoder, k, v); err != nil {
|
|
return fmt.Errorf("could not encode (key,value) for remote.CmdCursorFirst: %w", err)
|
|
}
|
|
|
|
numberOfKeys--
|
|
if k == nil {
|
|
break
|
|
}
|
|
}
|
|
case remote.CmdCursorNextKey:
|
|
if err := decoder.Decode(&cursorHandle); err != nil {
|
|
return fmt.Errorf("could not decode cursorHandle for remote.CmdCursorNextKey: %w", err)
|
|
}
|
|
var numberOfKeys uint64
|
|
if err := decoder.Decode(&numberOfKeys); err != nil {
|
|
return fmt.Errorf("could not decode numberOfKeys for remote.CmdCursorNextKey: %w", err)
|
|
}
|
|
|
|
if numberOfKeys > remote.CursorMaxBatchSize {
|
|
encodeErr(encoder, fmt.Errorf("requested numberOfKeys is too large: %d", numberOfKeys))
|
|
continue
|
|
}
|
|
|
|
cursor, ok := cursors[cursorHandle]
|
|
if !ok {
|
|
encodeErr(encoder, fmt.Errorf("cursor not found: %d", cursorHandle))
|
|
continue
|
|
}
|
|
|
|
if err := encoder.Encode(remote.ResponseOk); err != nil {
|
|
return fmt.Errorf("could not encode response to remote.CmdCursorNextKey: %w", err)
|
|
}
|
|
|
|
for k, v := cursor.Next(); numberOfKeys > 0; k, v = cursor.Next() {
|
|
select {
|
|
default:
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
|
|
if err := encodeKey(encoder, k, len(v) == 0); err != nil {
|
|
return fmt.Errorf("could not encode (key,valueIsEmpty) in response to remote.CmdCursorNextKey: %w", err)
|
|
}
|
|
|
|
numberOfKeys--
|
|
if k == nil {
|
|
break
|
|
}
|
|
}
|
|
case remote.CmdCursorFirstKey:
|
|
if err := decoder.Decode(&cursorHandle); err != nil {
|
|
return fmt.Errorf("could not decode cursorHandle for remote.CmdCursorFirstKey: %w", err)
|
|
}
|
|
var numberOfKeys uint64
|
|
if err := decoder.Decode(&numberOfKeys); err != nil {
|
|
return fmt.Errorf("could not decode numberOfKeys for remote.CmdCursorFirstKey: %w", err)
|
|
}
|
|
cursor, ok := cursors[cursorHandle]
|
|
if !ok {
|
|
encodeErr(encoder, fmt.Errorf("cursor not found: %d", cursorHandle))
|
|
continue
|
|
}
|
|
|
|
if err := encoder.Encode(remote.ResponseOk); err != nil {
|
|
return fmt.Errorf("could not encode response code for remote.CmdCursorFirstKey: %w", err)
|
|
}
|
|
|
|
for k, v := cursor.First(); numberOfKeys > 0; k, v = cursor.Next() {
|
|
select {
|
|
default:
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
|
|
if err := encodeKey(encoder, k, len(v) == 0); err != nil {
|
|
return fmt.Errorf("could not encode (key,valueIsEmpty) for remote.CmdCursorFirstKey: %w", err)
|
|
}
|
|
|
|
numberOfKeys--
|
|
if k == nil {
|
|
break
|
|
}
|
|
}
|
|
case remote.CmdGetAsOf:
|
|
var bucket, hBucket, key, v []byte
|
|
var timestamp uint64
|
|
if err := decoder.Decode(&bucket); err != nil {
|
|
return fmt.Errorf("could not decode seekKey for remote.CmdGetAsOf: %w", err)
|
|
}
|
|
if err := decoder.Decode(&hBucket); err != nil {
|
|
return fmt.Errorf("could not decode seekKey for remote.CmdGetAsOf: %w", err)
|
|
}
|
|
if err := decoder.Decode(&key); err != nil {
|
|
return fmt.Errorf("could not decode seekKey for remote.CmdGetAsOf: %w", err)
|
|
}
|
|
if err := decoder.Decode(×tamp); err != nil {
|
|
return fmt.Errorf("could not decode seekKey for remote.CmdGetAsOf: %w", err)
|
|
}
|
|
|
|
d := ethdb.NewWrapperBoltDatabase(db.DB())
|
|
|
|
var err error
|
|
v, err = d.GetAsOf(bucket, hBucket, key, timestamp)
|
|
if err != nil {
|
|
encodeErr(encoder, err)
|
|
}
|
|
|
|
if err := encoder.Encode(remote.ResponseOk); err != nil {
|
|
return fmt.Errorf("could not encode response to remote.CmdGetAsOf: %w", err)
|
|
}
|
|
|
|
if err := encoder.Encode(&v); err != nil {
|
|
return fmt.Errorf("could not encode response to remote.CmdGetAsOf: %w", err)
|
|
}
|
|
default:
|
|
logger.Error("unknown", "remote.Command", c)
|
|
return fmt.Errorf("unknown remote.Command %d", c)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
const ServerMaxConnections uint64 = 2048
|
|
|
|
var logger = log.New("database", "remote")
|
|
|
|
func encodeKeyValue(encoder *codec.Encoder, key []byte, value []byte) error {
|
|
if err := encoder.Encode(&key); err != nil {
|
|
return err
|
|
}
|
|
if err := encoder.Encode(&value); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func encodeKey(encoder *codec.Encoder, key []byte, valueIsEmpty bool) error {
|
|
if err := encoder.Encode(&key); err != nil {
|
|
return err
|
|
}
|
|
if err := encoder.Encode(valueIsEmpty); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func encodeErr(encoder *codec.Encoder, mainError error) {
|
|
if err := encoder.Encode(remote.ResponseErr); err != nil {
|
|
logger.Error("could not encode remote.ResponseErr", "err", err)
|
|
return
|
|
}
|
|
if err := encoder.Encode(mainError.Error()); err != nil {
|
|
logger.Error("could not encode errCode", "err", err)
|
|
return
|
|
}
|
|
}
|
|
|
|
var netAddr string
|
|
var stopNetInterface context.CancelFunc
|
|
|
|
func StartDeprecated(db ethdb.HasBolt, addr string) {
|
|
if stopNetInterface != nil {
|
|
stopNetInterface()
|
|
}
|
|
|
|
// TODO: implement node.Service, then Stop() will called on SIGINT | SIGTERM and we can call cancel() there
|
|
tcpCtx, cancel := context.WithCancel(context.Background())
|
|
if addr != "" {
|
|
netAddr = addr
|
|
}
|
|
go func() {
|
|
ch := make(chan os.Signal, 1)
|
|
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
|
|
defer signal.Stop(ch)
|
|
|
|
select {
|
|
case <-ch:
|
|
log.Info("Got interrupt, shutting down...")
|
|
cancel()
|
|
case <-tcpCtx.Done():
|
|
}
|
|
}()
|
|
|
|
var lc net.ListenConfig
|
|
ln, err := lc.Listen(tcpCtx, "tcp", netAddr)
|
|
if err != nil {
|
|
logger.Error("Could not create listener", "address", netAddr, "err", err)
|
|
return
|
|
}
|
|
stopNetInterface = func() {
|
|
cancel()
|
|
ln.Close()
|
|
}
|
|
|
|
logger.Info("Listening on", "address", netAddr)
|
|
go Listen(tcpCtx, ln, db)
|
|
}
|
|
|
|
// Listener starts listener that for each incoming connection
|
|
// spawn a go-routine invoking Server
|
|
func Listen(ctx context.Context, ln net.Listener, db ethdb.HasBolt) {
|
|
defer func() {
|
|
if err := ln.Close(); err != nil {
|
|
logger.Error("Could not close listener", "err", err)
|
|
}
|
|
}()
|
|
|
|
ch := make(chan bool, ServerMaxConnections)
|
|
defer close(ch)
|
|
|
|
go func() {
|
|
ticker := time.NewTicker(3 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
logger.Debug("connections", "amount", len(ch))
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
for {
|
|
conn, err1 := ln.Accept()
|
|
if err1 != nil {
|
|
if netErr, ok := err1.(*net.OpError); ok && !netErr.Temporary() {
|
|
return
|
|
}
|
|
logger.Error("Could not accept connection", "err", err1)
|
|
continue
|
|
}
|
|
|
|
go func() {
|
|
ch <- true
|
|
defer func() {
|
|
<-ch
|
|
}()
|
|
|
|
err := Server(ctx, db, conn, conn, conn)
|
|
if err != nil {
|
|
logger.Warn("server error", "err", err)
|
|
}
|
|
}()
|
|
}
|
|
}
|