First steps for RPC deamon (remote DB access) (#199)

* Remote DB initial commit

* Fix lint

* Fix lint

* Fix lint
This commit is contained in:
ledgerwatch 2019-11-25 13:39:32 +00:00 committed by GitHub
parent 5f667d3225
commit b9b4904e8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 606 additions and 0 deletions

309
ethdb/remote/bolt_remote.go Normal file
View File

@ -0,0 +1,309 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package remote
import (
"fmt"
"io"
"github.com/ledgerwatch/bolt"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ugorji/go/codec"
)
// Version is the current version of the remote db protocol. If the protocol changes in a non backwards compatible way,
// this constant needs to be increased
const Version uint64 = 1
// Command is the type of command in the boltdb remote protocol
type Command uint8
const (
// CmdVersion : version
// is sent from client to server to ask about the version of protocol the server supports
// it is also to be used to be sent periodically to make sure the connection stays open
CmdVersion Command = iota
// CmdLastError request the last error generated by any command
CmdLastError
// CmdBeginTx : txHandle
// request starting a new transaction (read-only). It returns transaction's handle (uint64), or 0
// if there was an error. If 0 is returned, the corresponding error can be queried by CmdLastError command
CmdBeginTx
// CmdEndTx (txHandle)
// request the end of the transaction (rollback)
CmdEndTx
// CmdBucket (txHandle, name): bucketHandle
// requests opening a bucket with given name. It returns bucket's handle (uint64)
CmdBucket
// CmdGet (bucketHandle, key): value
// requests a value for a key from given bucket.
CmdGet
// CmdCursor (bucketHandle): cursorHandle
// request creating a cursor for the given bucket. It returns cursor's handle (uint64)
CmdCursor
// CmdSeek (cursorHandle, seekKey): (key, value)
// Moves given cursor to the seekKey, or to the next key after seekKey
CmdSeek
)
// Pool of decoders
var decoderPool = make(chan *codec.Decoder, 128)
func newDecoder(r io.Reader) *codec.Decoder {
var d *codec.Decoder
select {
case d = <-decoderPool:
d.Reset(r)
default:
{
var handle codec.CborHandle
d = codec.NewDecoder(r, &handle)
}
}
return d
}
func returnDecoderToPool(d *codec.Decoder) {
select {
case decoderPool <- d:
default:
log.Warn("Allowing decoder to be garbage collected, pool is full")
}
}
// Pool of encoders
var encoderPool = make(chan *codec.Encoder, 128)
func newEncoder(w io.Writer) *codec.Encoder {
var e *codec.Encoder
select {
case e = <-encoderPool:
e.Reset(w)
default:
{
var handle codec.CborHandle
e = codec.NewEncoder(w, &handle)
}
}
return e
}
func returnEncoderToPool(e *codec.Encoder) {
select {
case encoderPool <- e:
default:
log.Warn("Allowing encoder to be garbage collected, pool is full")
}
}
// Server is to be called as a go-routine, one per every client connection.
// It runs while the connection is active and keep the entire connection's context
// in the local variables
// For tests, bytes.Buffer can be used for both `in` and `out`
func Server(db *bolt.DB, in io.Reader, out io.Writer) error {
decoder := newDecoder(in)
defer returnDecoderToPool(decoder)
encoder := newEncoder(out)
defer returnEncoderToPool(encoder)
// Server is passive - it runs a loop what reads commands (and their arguments) and attempts to respond
var lastError error
var lastHandle uint64
// Read-only transactions opened by the client
transactions := make(map[uint64]*bolt.Tx)
// Buckets opened by the client
buckets := make(map[uint64]*bolt.Bucket)
// List of buckets opened in each transaction
bucketsByTx := make(map[uint64][]uint64)
// Cursors opened by the client
cursors := make(map[uint64]*bolt.Cursor)
// List of cursors opened in each bucket
cursorsByBucket := make(map[uint64][]uint64)
var c Command
for {
if err := decoder.Decode(&c); err != nil {
if err == io.EOF {
// Graceful termination when the end of the input is reached
break
}
log.Error("could not decode command", "error", err)
return err
}
switch c {
case CmdVersion:
var version = Version
if err := encoder.Encode(&version); err != nil {
log.Error("could not encode response to CmdVersion", "error", err)
return err
}
case CmdLastError:
var errorString = fmt.Sprintf("%v", lastError)
if err := encoder.Encode(&errorString); err != nil {
log.Error("could not encode response to CmdLastError", "error", err)
return err
}
case CmdBeginTx:
var txHandle uint64
var tx *bolt.Tx
tx, lastError = db.Begin(false)
if lastError == nil {
defer func() {
if err := tx.Rollback(); err != nil {
log.Error("could not rollback transaction", "error", err)
}
}()
lastHandle++
txHandle = lastHandle
transactions[txHandle] = tx
}
if err := encoder.Encode(&txHandle); err != nil {
log.Error("could not encode txHandle in response to CmdBeginTx", "error", err)
return err
}
case CmdEndTx:
var txHandle uint64
if err := decoder.Decode(&txHandle); err != nil {
log.Error("could not decode txHandle for CmdEndTx")
return err
}
if tx, ok := transactions[txHandle]; ok {
// Remove all the buckets
if bucketHandles, ok1 := bucketsByTx[txHandle]; ok1 {
for _, bucketHandle := range bucketHandles {
if cursorHandles, ok2 := cursorsByBucket[bucketHandle]; ok2 {
for _, cursorHandle := range cursorHandles {
delete(cursors, cursorHandle)
}
delete(cursorsByBucket, bucketHandle)
}
delete(buckets, bucketHandle)
}
delete(bucketsByTx, txHandle)
}
if err := tx.Rollback(); err != nil {
log.Error("could not end transaction", "handle", txHandle, "error", err)
return err
}
delete(transactions, txHandle)
lastError = nil
} else {
lastError = fmt.Errorf("transaction not found")
}
case CmdBucket:
// Read the txHandle
var txHandle uint64
if err := decoder.Decode(&txHandle); err != nil {
log.Error("could not decode txHandle for CmdBucket")
return err
}
// Read the name of the bucket
var name []byte
if err := decoder.Decode(&name); err != nil {
log.Error("could not decode name for CmdBucket", "error", err)
return err
}
var bucketHandle uint64
if tx, ok := transactions[txHandle]; ok {
// Open the bucket
var bucket *bolt.Bucket
bucket = tx.Bucket(name)
if bucket == nil {
lastError = fmt.Errorf("bucket not found")
} else {
lastHandle++
bucketHandle = lastHandle
buckets[bucketHandle] = bucket
if bucketHandles, ok1 := bucketsByTx[txHandle]; ok1 {
bucketHandles = append(bucketHandles, bucketHandle)
bucketsByTx[txHandle] = bucketHandles
} else {
bucketsByTx[txHandle] = []uint64{bucketHandle}
}
lastError = nil
}
} else {
lastError = fmt.Errorf("transaction not found")
}
if err := encoder.Encode(&bucketHandle); err != nil {
log.Error("could not encode bucketHandle in response to CmdBucket", "error", err)
return err
}
case CmdGet:
var bucketHandle uint64
if err := decoder.Decode(&bucketHandle); err != nil {
log.Error("could not decode bucketHandle for CmdGet")
return err
}
var key []byte
if err := decoder.Decode(&key); err != nil {
log.Error("could not decode key for CmdGet")
return err
}
var value []byte
if bucket, ok := buckets[bucketHandle]; ok {
value, _ = bucket.Get(key)
lastError = nil
} else {
lastError = fmt.Errorf("bucket not found")
}
if err := encoder.Encode(&value); err != nil {
log.Error("could not encode value in response to CmdGet", "error", err)
return err
}
case CmdCursor:
var bucketHandle uint64
if err := decoder.Decode(&bucketHandle); err != nil {
log.Error("could not decode bucketHandle for CmdCursor")
return err
}
var cursorHandle uint64
if bucket, ok := buckets[bucketHandle]; ok {
cursor := bucket.Cursor()
lastHandle++
cursorHandle = lastHandle
cursors[cursorHandle] = cursor
if cursorHandles, ok1 := cursorsByBucket[bucketHandle]; ok1 {
cursorHandles = append(cursorHandles, cursorHandle)
cursorsByBucket[bucketHandle] = cursorHandles
} else {
cursorsByBucket[bucketHandle] = []uint64{cursorHandle}
}
lastError = nil
} else {
lastError = fmt.Errorf("bucket not found")
}
if err := encoder.Encode(&cursorHandle); err != nil {
log.Error("could not encode value in response to CmdCursor", "error", err)
return err
}
case CmdSeek:
var cursorHandle uint64
if err := decoder.Decode(&cursorHandle); err != nil {
log.Error("could not decode cursorHandle for CmdSeek")
return err
}
var seekKey []byte
if err := decoder.Decode(&seekKey); err != nil {
log.Error("could not decode seekKey for CmdSeek")
return err
}
default:
log.Error("unknown", "command", c)
return fmt.Errorf("unknown command %d", c)
}
}
return nil
}

View File

@ -0,0 +1,297 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package remote
import (
"bytes"
"testing"
"github.com/ledgerwatch/bolt"
)
func TestCmdVersion(t *testing.T) {
// ---------- Start of boilerplate code
db, err := bolt.Open("in-memory", 0600, &bolt.Options{MemOnly: true})
if err != nil {
t.Errorf("Could not create database: %v", err)
}
// Prepare input buffer with one command CmdVersion
var inBuf bytes.Buffer
encoder := newEncoder(&inBuf)
defer returnEncoderToPool(encoder)
// output buffer to receive the result of the command
var outBuf bytes.Buffer
decoder := newDecoder(&outBuf)
defer returnDecoderToPool(decoder)
// ---------- End of boilerplate code
var c = CmdVersion
if err = encoder.Encode(&c); err != nil {
t.Errorf("Could not encode CmdVersion: %v", err)
}
if err = Server(db, &inBuf, &outBuf); err != nil {
t.Errorf("Error while calling Server: %v", err)
}
var v uint64
if err = decoder.Decode(&v); err != nil {
t.Errorf("Could not decode version returned by CmdVersion: %v", err)
}
if v != Version {
t.Errorf("Returned version %d, expected %d", v, Version)
}
}
func TestCmdBeginEndLastError(t *testing.T) {
// ---------- Start of boilerplate code
db, err := bolt.Open("in-memory", 0600, &bolt.Options{MemOnly: true})
if err != nil {
t.Errorf("Could not create database: %v", err)
}
// Prepare input buffer with one command CmdVersion
var inBuf bytes.Buffer
encoder := newEncoder(&inBuf)
defer returnEncoderToPool(encoder)
// output buffer to receive the result of the command
var outBuf bytes.Buffer
decoder := newDecoder(&outBuf)
defer returnDecoderToPool(decoder)
// ---------- End of boilerplate code
// Send CmdBeginTx, followed by CmdEndTx with the wrong txHandle, followed by CmdLastError, followed by CmdEndTx with the correct handle
// followed by the CmdLastError
var c = CmdBeginTx
if err = encoder.Encode(&c); err != nil {
t.Errorf("Could not encode CmdBeginTx: %v", err)
}
// CmdEnd with the wrong handle
var txHandle uint64 = 156
c = CmdEndTx
if err = encoder.Encode(&c); err != nil {
t.Errorf("Could not encode CmdEndTx: %v", err)
}
if err = encoder.Encode(&txHandle); err != nil {
t.Errorf("Could not encode txHandle: %v", err)
}
// CmdLastError to retrive the error related to the CmdEndTx with the wrong handle
c = CmdLastError
if err = encoder.Encode(&c); err != nil {
t.Errorf("Could not encode CmdLastError: %v", err)
}
// Now we issue CmdEndTx with the correct tx handle
c = CmdEndTx
if err = encoder.Encode(&c); err != nil {
t.Errorf("Could not encode CmdEndTx: %v", err)
}
txHandle = 1
if err = encoder.Encode(&txHandle); err != nil {
t.Errorf("Could not encode txHandle: %v", err)
}
// Check that CmdLastError now returns empty string
c = CmdLastError
if err = encoder.Encode(&c); err != nil {
t.Errorf("Could not encode CmdLastError: %v", err)
}
// By now we constructed all input requests, now we call the
// Server to process them all
if err = Server(db, &inBuf, &outBuf); err != nil {
t.Errorf("Error while calling Server: %v", err)
}
// And then we interpret the results
if err = decoder.Decode(&txHandle); err != nil {
t.Errorf("Could not decode response from CmdBeginTx")
}
var lastErrorStr string
if err = decoder.Decode(&lastErrorStr); err != nil {
t.Errorf("Could not decode response from CmdLastError")
}
if lastErrorStr != "transaction not found" {
t.Errorf("Wrong error message from CmdLastError: %s", lastErrorStr)
}
if err = decoder.Decode(&lastErrorStr); err != nil {
t.Errorf("Could not decode response from CmdLastError")
}
if lastErrorStr != "<nil>" {
t.Errorf("Wrong error message from CmdLastError: %s", lastErrorStr)
}
}
func TestCmdBucket(t *testing.T) {
// ---------- Start of boilerplate code
db, err := bolt.Open("in-memory", 0600, &bolt.Options{MemOnly: true})
if err != nil {
t.Errorf("Could not create database: %v", err)
}
// Prepare input buffer with one command CmdVersion
var inBuf bytes.Buffer
encoder := newEncoder(&inBuf)
defer returnEncoderToPool(encoder)
// output buffer to receive the result of the command
var outBuf bytes.Buffer
decoder := newDecoder(&outBuf)
defer returnDecoderToPool(decoder)
// ---------- End of boilerplate code
// Create a bucket
var name = []byte("testbucket")
if err = db.Update(func(tx *bolt.Tx) error {
_, err1 := tx.CreateBucket(name, false)
return err1
}); err != nil {
t.Errorf("Could not create and populate a bucket: %v", err)
}
var c = CmdBeginTx
if err = encoder.Encode(&c); err != nil {
t.Errorf("Could not encode CmdBegin: %v", err)
}
c = CmdBucket
if err = encoder.Encode(&c); err != nil {
t.Errorf("Could not encode CmdBucket: %v", err)
}
var txHandle uint64 = 1
if err = encoder.Encode(&txHandle); err != nil {
t.Errorf("Could not encode txHandle for CmdBucket: %v", err)
}
if err = encoder.Encode(&name); err != nil {
t.Errorf("Could not encode name for CmdBucket: %v", err)
}
// By now we constructed all input requests, now we call the
// Server to process them all
if err = Server(db, &inBuf, &outBuf); err != nil {
t.Errorf("Error while calling Server: %v", err)
}
// And then we interpret the results
if err = decoder.Decode(&txHandle); err != nil {
t.Errorf("Could not decode response from CmdBegin")
}
if txHandle != 1 {
t.Errorf("Unexpected txHandle: %d", txHandle)
}
var bucketHandle uint64
if err = decoder.Decode(&bucketHandle); err != nil {
t.Errorf("Could not decode response from CmdBucket")
}
if bucketHandle != 2 {
t.Errorf("Unexpected bucketHandle: %d", txHandle)
}
}
func TestCmdGet(t *testing.T) {
// ---------- Start of boilerplate code
db, err := bolt.Open("in-memory", 0600, &bolt.Options{MemOnly: true})
if err != nil {
t.Errorf("Could not create database: %v", err)
}
// Prepare input buffer with one command CmdVersion
var inBuf bytes.Buffer
encoder := newEncoder(&inBuf)
defer returnEncoderToPool(encoder)
// output buffer to receive the result of the command
var outBuf bytes.Buffer
decoder := newDecoder(&outBuf)
defer returnDecoderToPool(decoder)
// ---------- End of boilerplate code
// Create a bucket and populate some values
var name = []byte("testbucket")
if err = db.Update(func(tx *bolt.Tx) error {
b, err1 := tx.CreateBucket(name, false)
if err1 != nil {
return err1
}
if err1 = b.Put([]byte("key1"), []byte("value1")); err1 != nil {
return err1
}
if err1 = b.Put([]byte("key2"), []byte("value2")); err1 != nil {
return err1
}
return nil
}); err != nil {
t.Errorf("Could not create and populate a bucket: %v", err)
}
var c = CmdBeginTx
if err = encoder.Encode(&c); err != nil {
t.Errorf("Could not encode CmdBeginTx: %v", err)
}
c = CmdBucket
if err = encoder.Encode(&c); err != nil {
t.Errorf("Could not encode CmdBucket: %v", err)
}
var txHandle uint64 = 1
if err = encoder.Encode(&txHandle); err != nil {
t.Errorf("Could not encode txHandle for CmdBucket: %v", err)
}
if err = encoder.Encode(&name); err != nil {
t.Errorf("Could not encode name for CmdBucket: %v", err)
}
// Issue CmdGet with existing key
c = CmdGet
if err = encoder.Encode(&c); err != nil {
t.Errorf("Could not encode CmdGet: %v", err)
}
var bucketHandle uint64 = 2
var key = []byte("key1")
if err = encoder.Encode(&bucketHandle); err != nil {
t.Errorf("Could not encode bucketHandle for CmdGet: %v", err)
}
if err = encoder.Encode(&key); err != nil {
t.Errorf("Could not encode key for CmdGet: %v", err)
}
// Issue CmdGet with non-existing key
c = CmdGet
if err = encoder.Encode(&c); err != nil {
t.Errorf("Could not encode CmdGet: %v", err)
}
bucketHandle = 2
key = []byte("key3")
if err = encoder.Encode(&bucketHandle); err != nil {
t.Errorf("Could not encode bucketHandle for CmdGet: %v", err)
}
if err = encoder.Encode(&key); err != nil {
t.Errorf("Could not encode key for CmdGet: %v", err)
}
// By now we constructed all input requests, now we call the
// Server to process them all
if err = Server(db, &inBuf, &outBuf); err != nil {
t.Errorf("Error while calling Server: %v", err)
}
// And then we interpret the results
// Results of CmdBeginTx
if err = decoder.Decode(&txHandle); err != nil {
t.Errorf("Could not decode response from CmdBegin")
}
if txHandle != 1 {
t.Errorf("Unexpected txHandle: %d", txHandle)
}
// Results of CmdBucket
if err = decoder.Decode(&bucketHandle); err != nil {
t.Errorf("Could not decode response from CmdBucket")
}
if bucketHandle != 2 {
t.Errorf("Unexpected bucketHandle: %d", txHandle)
}
// Results of CmdGet (for key1)
var value []byte
if err = decoder.Decode(&value); err != nil {
t.Errorf("Could not decode value from CmdGet: %v", err)
}
if string(value) != "value1" {
t.Errorf("Wrong value from CmdGet, expected: %x, got %x", "value1", value)
}
// Results of CmdGet (for key3)
if err = decoder.Decode(&value); err != nil {
t.Errorf("Could not decode value from CmdGet: %v", err)
}
if value != nil {
t.Errorf("Wrong value from CmdGet, expected: %x, got %x", "value1", value)
}
}