erigon-pulse/ethdb/remote/remotedbserver/server.go

259 lines
7.2 KiB
Go
Raw Normal View History

2020-07-27 12:15:48 +00:00
package remotedbserver
import (
"fmt"
2020-07-27 12:15:48 +00:00
"io"
"net"
"time"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/core"
2020-07-27 12:15:48 +00:00
"github.com/ledgerwatch/turbo-geth/ethdb"
More updates to downloader, new p2psentry protocol (#1559) * Initial commit * Add sentry gRPC interface * p2psentry directory * Update README.md * Update README.md * Update README.md * Add go package * Correct syntax * add external downloader interface (#2) * Add txpool (#3) * Add private API (#4) * Invert control.proto, add PeerMinBlock, Separare incoming Tx message into a separate stream (#5) Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> * Separate upload messages into its own stream (#6) Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> * Only send changed accounts to listeners (#7) * Txpool interface doc (#9) * More additions * More additions * Fix locking * Intermediate * Fix separation of phases * Intermediate * Fix test * More transformations * New simplified way of downloading headers * Fix hard-coded header sync * Fixed syncing near the tip of the chain * Add architecture diagram source and picture (#10) * More fixes * rename tip to link * Use preverified hashes instead of preverified headers * Fix preverified hashes generation * more parametrisation * Continue parametrisation * Fix grpc data limit, interruption of headers stage * Add ropsten preverified hashes * Typed hashes (#11) * Typed hashes * Fix PeerId * 64-bit tx nonce * Disable penalties * Add goerli settings, bootstrap nodes * Try to fix goerly sync * Remove interfaces * Add proper golang packages, max_block into p2p sentry Status * Prepare for proto overhaul * Squashed 'interfaces/' content from commit ce36053c2 git-subtree-dir: interfaces git-subtree-split: ce36053c24db2f56e48ac752808de60afa1dfb4b * Change EtherReply to address * Adaptations to new types * Switch to new types * Fixes * Fix formatting * Fix lint * Lint fixes, reverse order in types * Fix lint * Fix lint * Fix lint * Fix test * Not supporting eth/66 yet * Fix shutdown * Fix lint * Fix lint * Fix lint * return stopped check Co-authored-by: Artem Vorotnikov <artem@vorotnikov.me> Co-authored-by: b00ris <b00ris@mail.ru> Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> Co-authored-by: lightclient <14004106+lightclient@users.noreply.github.com> Co-authored-by: canepat <16927169+canepat@users.noreply.github.com>
2021-03-19 21:24:49 +00:00
"github.com/ledgerwatch/turbo-geth/gointerfaces/remote"
2020-07-27 12:15:48 +00:00
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/metrics"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
2020-07-27 12:15:48 +00:00
)
2020-09-07 05:47:08 +00:00
const MaxTxTTL = 30 * time.Second
2020-07-27 12:15:48 +00:00
type KvServer struct {
remote.UnimplementedKVServer // must be embedded to have forward compatible implementations.
kv ethdb.KV
}
func StartGrpc(kv ethdb.KV, eth core.Backend, addr string, rateLimit uint32, creds *credentials.TransportCredentials, events *Events) (*grpc.Server, error) {
2020-07-27 12:15:48 +00:00
log.Info("Starting private RPC server", "on", addr)
lis, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("could not create listener: %w, addr=%s", err, addr)
2020-07-27 12:15:48 +00:00
}
kv2Srv := NewKvServer(kv)
2020-07-29 04:31:46 +00:00
dbSrv := NewDBServer(kv)
ethBackendSrv := NewEthBackendServer(eth, events)
2020-07-27 12:15:48 +00:00
var (
streamInterceptors []grpc.StreamServerInterceptor
unaryInterceptors []grpc.UnaryServerInterceptor
)
if metrics.Enabled {
streamInterceptors = append(streamInterceptors, grpc_prometheus.StreamServerInterceptor)
unaryInterceptors = append(unaryInterceptors, grpc_prometheus.UnaryServerInterceptor)
}
streamInterceptors = append(streamInterceptors, grpc_recovery.StreamServerInterceptor())
unaryInterceptors = append(unaryInterceptors, grpc_recovery.UnaryServerInterceptor())
var grpcServer *grpc.Server
//cpus := uint32(runtime.GOMAXPROCS(-1))
opts := []grpc.ServerOption{
//grpc.NumStreamWorkers(cpus), // reduce amount of goroutines
grpc.WriteBufferSize(1024), // reduce buffers to save mem
grpc.ReadBufferSize(1024),
grpc.MaxConcurrentStreams(rateLimit), // to force clients reduce concurrency level
// Don't drop the connection, settings accordign to this comment on GitHub
// https://github.com/grpc/grpc-go/issues/3171#issuecomment-552796779
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 10 * time.Second,
PermitWithoutStream: true,
}),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)),
}
if creds == nil {
// no specific opts
} else {
opts = append(opts, grpc.Creds(*creds))
}
grpcServer = grpc.NewServer(opts...)
remote.RegisterDBServer(grpcServer, dbSrv)
remote.RegisterETHBACKENDServer(grpcServer, ethBackendSrv)
remote.RegisterKVServer(grpcServer, kv2Srv)
2020-07-27 12:15:48 +00:00
if metrics.Enabled {
grpc_prometheus.Register(grpcServer)
}
go func() {
if err := grpcServer.Serve(lis); err != nil {
log.Error("private RPC server fail", "err", err)
2020-07-27 12:15:48 +00:00
}
}()
return grpcServer, nil
2020-07-27 12:15:48 +00:00
}
func NewKvServer(kv ethdb.KV) *KvServer {
return &KvServer{kv: kv}
}
func (s *KvServer) Tx(stream remote.KV_TxServer) error {
tx, errBegin := s.kv.Begin(stream.Context())
if errBegin != nil {
return fmt.Errorf("server-side error: %w", errBegin)
}
rollback := func() {
tx.Rollback()
}
defer rollback()
var CursorID uint32
type CursorInfo struct {
bucket string
c ethdb.Cursor
k, v []byte //fields to save current position of cursor - used when Tx reopen
}
cursors := map[uint32]*CursorInfo{}
txTicker := time.NewTicker(MaxTxTTL)
defer txTicker.Stop()
// send all items to client, if k==nil - still send it to client and break loop
for {
in, recvErr := stream.Recv()
if recvErr != nil {
if recvErr == io.EOF { // termination
return nil
}
return fmt.Errorf("server-side error: %w", recvErr)
}
//TODO: protect against client - which doesn't send any requests
select {
default:
case <-txTicker.C:
for _, c := range cursors { // save positions of cursor, will restore after Tx reopening
k, v, err := c.c.Current()
if err != nil {
return err
}
c.k = common.CopyBytes(k)
c.v = common.CopyBytes(v)
}
tx.Rollback()
tx, errBegin = s.kv.Begin(stream.Context())
if errBegin != nil {
return fmt.Errorf("server-side error: %w", errBegin)
}
for _, c := range cursors { // restore all cursors position
c.c = tx.Cursor(c.bucket)
switch casted := c.c.(type) {
case ethdb.CursorDupSort:
v, err := casted.SeekBothRange(c.k, c.v)
if err != nil {
return fmt.Errorf("server-side error: %w", err)
}
if v == nil { // it may happen that key where we stopped disappeared after transaction reopen, then just move to next key
_, _, err = casted.Next()
if err != nil {
return fmt.Errorf("server-side error: %w", err)
}
}
case ethdb.Cursor:
if _, _, err := c.c.Seek(c.k); err != nil {
return fmt.Errorf("server-side error: %w", err)
}
}
}
}
var c ethdb.Cursor
if in.BucketName == "" {
cInfo, ok := cursors[in.Cursor]
if !ok {
return fmt.Errorf("server-side error: unknown Cursor=%d, Op=%s", in.Cursor, in.Op)
}
c = cInfo.c
}
switch in.Op {
case remote.Op_OPEN:
CursorID++
cursors[CursorID] = &CursorInfo{
bucket: in.BucketName,
c: tx.Cursor(in.BucketName),
}
if err := stream.Send(&remote.Pair{CursorID: CursorID}); err != nil {
return fmt.Errorf("server-side error: %w", err)
}
continue
case remote.Op_CLOSE:
2020-12-29 21:34:29 +00:00
cInfo, ok := cursors[in.Cursor]
if !ok {
return fmt.Errorf("server-side error: unknown Cursor=%d, Op=%s", in.Cursor, in.Op)
}
cInfo.c.Close()
delete(cursors, in.Cursor)
if err := stream.Send(&remote.Pair{}); err != nil {
return fmt.Errorf("server-side error: %w", err)
}
continue
default:
}
if err := handleOp(c, stream, in); err != nil {
return fmt.Errorf("server-side error: %w", err)
}
}
}
func handleOp(c ethdb.Cursor, stream remote.KV_TxServer, in *remote.Cursor) error {
var k, v []byte
var err error
switch in.Op {
case remote.Op_FIRST:
k, v, err = c.First()
case remote.Op_FIRST_DUP:
v, err = c.(ethdb.CursorDupSort).FirstDup()
case remote.Op_SEEK:
k, v, err = c.Seek(in.K)
case remote.Op_SEEK_BOTH:
v, err = c.(ethdb.CursorDupSort).SeekBothRange(in.K, in.V)
case remote.Op_CURRENT:
k, v, err = c.Current()
case remote.Op_LAST:
k, v, err = c.Last()
case remote.Op_LAST_DUP:
v, err = c.(ethdb.CursorDupSort).LastDup()
case remote.Op_NEXT:
k, v, err = c.Next()
case remote.Op_NEXT_DUP:
k, v, err = c.(ethdb.CursorDupSort).NextDup()
case remote.Op_NEXT_NO_DUP:
k, v, err = c.(ethdb.CursorDupSort).NextNoDup()
case remote.Op_PREV:
k, v, err = c.Prev()
//case remote.Op_PREV_DUP:
// k, v, err = c.(ethdb.CursorDupSort).Prev()
// if err != nil {
// return err
// }
//case remote.Op_PREV_NO_DUP:
// k, v, err = c.Prev()
// if err != nil {
// return err
// }
case remote.Op_SEEK_EXACT:
ChangeSets dupsort (#1342) * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * squash * squash * fix * fix * fix * fix * fix * fix * fix * fix * fix * fix * fix * history_early_stop * history_early_stop * vmConfig with ReadOnly false * auto_increment * auto_increment * rebase master Co-authored-by: Alexey Akhunov <akhounov@gmail.com>
2020-11-16 12:08:28 +00:00
k, v, err = c.SeekExact(in.K)
case remote.Op_SEEK_BOTH_EXACT:
k, v, err = c.(ethdb.CursorDupSort).SeekBothExact(in.K, in.V)
default:
return fmt.Errorf("unknown operation: %s", in.Op)
}
if err != nil {
return err
}
if err := stream.Send(&remote.Pair{K: k, V: v}); err != nil {
return err
}
return nil
}