diff --git a/ethdb/remote/bolt_remote.go b/ethdb/remote/bolt_remote.go new file mode 100644 index 000000000..ec5a2fcc9 --- /dev/null +++ b/ethdb/remote/bolt_remote.go @@ -0,0 +1,309 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package remote + +import ( + "fmt" + "io" + + "github.com/ledgerwatch/bolt" + "github.com/ledgerwatch/turbo-geth/log" + "github.com/ugorji/go/codec" +) + +// Version is the current version of the remote db protocol. If the protocol changes in a non backwards compatible way, +// this constant needs to be increased +const Version uint64 = 1 + +// Command is the type of command in the boltdb remote protocol +type Command uint8 + +const ( + // CmdVersion : version + // is sent from client to server to ask about the version of protocol the server supports + // it is also to be used to be sent periodically to make sure the connection stays open + CmdVersion Command = iota + // CmdLastError request the last error generated by any command + CmdLastError + // CmdBeginTx : txHandle + // request starting a new transaction (read-only). It returns transaction's handle (uint64), or 0 + // if there was an error. If 0 is returned, the corresponding error can be queried by CmdLastError command + CmdBeginTx + // CmdEndTx (txHandle) + // request the end of the transaction (rollback) + CmdEndTx + // CmdBucket (txHandle, name): bucketHandle + // requests opening a bucket with given name. It returns bucket's handle (uint64) + CmdBucket + // CmdGet (bucketHandle, key): value + // requests a value for a key from given bucket. + CmdGet + // CmdCursor (bucketHandle): cursorHandle + // request creating a cursor for the given bucket. It returns cursor's handle (uint64) + CmdCursor + // CmdSeek (cursorHandle, seekKey): (key, value) + // Moves given cursor to the seekKey, or to the next key after seekKey + CmdSeek +) + +// Pool of decoders +var decoderPool = make(chan *codec.Decoder, 128) + +func newDecoder(r io.Reader) *codec.Decoder { + var d *codec.Decoder + select { + case d = <-decoderPool: + d.Reset(r) + default: + { + var handle codec.CborHandle + d = codec.NewDecoder(r, &handle) + } + } + return d +} + +func returnDecoderToPool(d *codec.Decoder) { + select { + case decoderPool <- d: + default: + log.Warn("Allowing decoder to be garbage collected, pool is full") + } +} + +// Pool of encoders +var encoderPool = make(chan *codec.Encoder, 128) + +func newEncoder(w io.Writer) *codec.Encoder { + var e *codec.Encoder + select { + case e = <-encoderPool: + e.Reset(w) + default: + { + var handle codec.CborHandle + e = codec.NewEncoder(w, &handle) + } + } + return e +} + +func returnEncoderToPool(e *codec.Encoder) { + select { + case encoderPool <- e: + default: + log.Warn("Allowing encoder to be garbage collected, pool is full") + } +} + +// Server is to be called as a go-routine, one per every client connection. +// It runs while the connection is active and keep the entire connection's context +// in the local variables +// For tests, bytes.Buffer can be used for both `in` and `out` +func Server(db *bolt.DB, in io.Reader, out io.Writer) error { + decoder := newDecoder(in) + defer returnDecoderToPool(decoder) + encoder := newEncoder(out) + defer returnEncoderToPool(encoder) + // Server is passive - it runs a loop what reads commands (and their arguments) and attempts to respond + var lastError error + var lastHandle uint64 + // Read-only transactions opened by the client + transactions := make(map[uint64]*bolt.Tx) + // Buckets opened by the client + buckets := make(map[uint64]*bolt.Bucket) + // List of buckets opened in each transaction + bucketsByTx := make(map[uint64][]uint64) + // Cursors opened by the client + cursors := make(map[uint64]*bolt.Cursor) + // List of cursors opened in each bucket + cursorsByBucket := make(map[uint64][]uint64) + var c Command + for { + if err := decoder.Decode(&c); err != nil { + if err == io.EOF { + // Graceful termination when the end of the input is reached + break + } + log.Error("could not decode command", "error", err) + return err + } + switch c { + case CmdVersion: + var version = Version + if err := encoder.Encode(&version); err != nil { + log.Error("could not encode response to CmdVersion", "error", err) + return err + } + case CmdLastError: + var errorString = fmt.Sprintf("%v", lastError) + if err := encoder.Encode(&errorString); err != nil { + log.Error("could not encode response to CmdLastError", "error", err) + return err + } + case CmdBeginTx: + var txHandle uint64 + var tx *bolt.Tx + tx, lastError = db.Begin(false) + if lastError == nil { + defer func() { + if err := tx.Rollback(); err != nil { + log.Error("could not rollback transaction", "error", err) + } + }() + lastHandle++ + txHandle = lastHandle + transactions[txHandle] = tx + } + if err := encoder.Encode(&txHandle); err != nil { + log.Error("could not encode txHandle in response to CmdBeginTx", "error", err) + return err + } + case CmdEndTx: + var txHandle uint64 + if err := decoder.Decode(&txHandle); err != nil { + log.Error("could not decode txHandle for CmdEndTx") + return err + } + if tx, ok := transactions[txHandle]; ok { + // Remove all the buckets + if bucketHandles, ok1 := bucketsByTx[txHandle]; ok1 { + for _, bucketHandle := range bucketHandles { + if cursorHandles, ok2 := cursorsByBucket[bucketHandle]; ok2 { + for _, cursorHandle := range cursorHandles { + delete(cursors, cursorHandle) + } + delete(cursorsByBucket, bucketHandle) + } + delete(buckets, bucketHandle) + } + delete(bucketsByTx, txHandle) + } + if err := tx.Rollback(); err != nil { + log.Error("could not end transaction", "handle", txHandle, "error", err) + return err + } + delete(transactions, txHandle) + lastError = nil + } else { + lastError = fmt.Errorf("transaction not found") + } + case CmdBucket: + // Read the txHandle + var txHandle uint64 + if err := decoder.Decode(&txHandle); err != nil { + log.Error("could not decode txHandle for CmdBucket") + return err + } + // Read the name of the bucket + var name []byte + if err := decoder.Decode(&name); err != nil { + log.Error("could not decode name for CmdBucket", "error", err) + return err + } + var bucketHandle uint64 + if tx, ok := transactions[txHandle]; ok { + // Open the bucket + var bucket *bolt.Bucket + bucket = tx.Bucket(name) + if bucket == nil { + lastError = fmt.Errorf("bucket not found") + } else { + lastHandle++ + bucketHandle = lastHandle + buckets[bucketHandle] = bucket + if bucketHandles, ok1 := bucketsByTx[txHandle]; ok1 { + bucketHandles = append(bucketHandles, bucketHandle) + bucketsByTx[txHandle] = bucketHandles + } else { + bucketsByTx[txHandle] = []uint64{bucketHandle} + } + lastError = nil + } + } else { + lastError = fmt.Errorf("transaction not found") + } + if err := encoder.Encode(&bucketHandle); err != nil { + log.Error("could not encode bucketHandle in response to CmdBucket", "error", err) + return err + } + case CmdGet: + var bucketHandle uint64 + if err := decoder.Decode(&bucketHandle); err != nil { + log.Error("could not decode bucketHandle for CmdGet") + return err + } + var key []byte + if err := decoder.Decode(&key); err != nil { + log.Error("could not decode key for CmdGet") + return err + } + var value []byte + if bucket, ok := buckets[bucketHandle]; ok { + value, _ = bucket.Get(key) + lastError = nil + } else { + lastError = fmt.Errorf("bucket not found") + } + if err := encoder.Encode(&value); err != nil { + log.Error("could not encode value in response to CmdGet", "error", err) + return err + } + case CmdCursor: + var bucketHandle uint64 + if err := decoder.Decode(&bucketHandle); err != nil { + log.Error("could not decode bucketHandle for CmdCursor") + return err + } + var cursorHandle uint64 + if bucket, ok := buckets[bucketHandle]; ok { + cursor := bucket.Cursor() + lastHandle++ + cursorHandle = lastHandle + cursors[cursorHandle] = cursor + if cursorHandles, ok1 := cursorsByBucket[bucketHandle]; ok1 { + cursorHandles = append(cursorHandles, cursorHandle) + cursorsByBucket[bucketHandle] = cursorHandles + } else { + cursorsByBucket[bucketHandle] = []uint64{cursorHandle} + } + lastError = nil + } else { + lastError = fmt.Errorf("bucket not found") + } + if err := encoder.Encode(&cursorHandle); err != nil { + log.Error("could not encode value in response to CmdCursor", "error", err) + return err + } + case CmdSeek: + var cursorHandle uint64 + if err := decoder.Decode(&cursorHandle); err != nil { + log.Error("could not decode cursorHandle for CmdSeek") + return err + } + var seekKey []byte + if err := decoder.Decode(&seekKey); err != nil { + log.Error("could not decode seekKey for CmdSeek") + return err + } + default: + log.Error("unknown", "command", c) + return fmt.Errorf("unknown command %d", c) + } + } + return nil +} diff --git a/ethdb/remote/bolt_remote_test.go b/ethdb/remote/bolt_remote_test.go new file mode 100644 index 000000000..0e683a5f4 --- /dev/null +++ b/ethdb/remote/bolt_remote_test.go @@ -0,0 +1,297 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package remote + +import ( + "bytes" + "testing" + + "github.com/ledgerwatch/bolt" +) + +func TestCmdVersion(t *testing.T) { + // ---------- Start of boilerplate code + db, err := bolt.Open("in-memory", 0600, &bolt.Options{MemOnly: true}) + if err != nil { + t.Errorf("Could not create database: %v", err) + } + // Prepare input buffer with one command CmdVersion + var inBuf bytes.Buffer + encoder := newEncoder(&inBuf) + defer returnEncoderToPool(encoder) + // output buffer to receive the result of the command + var outBuf bytes.Buffer + decoder := newDecoder(&outBuf) + defer returnDecoderToPool(decoder) + // ---------- End of boilerplate code + var c = CmdVersion + if err = encoder.Encode(&c); err != nil { + t.Errorf("Could not encode CmdVersion: %v", err) + } + if err = Server(db, &inBuf, &outBuf); err != nil { + t.Errorf("Error while calling Server: %v", err) + } + var v uint64 + if err = decoder.Decode(&v); err != nil { + t.Errorf("Could not decode version returned by CmdVersion: %v", err) + } + if v != Version { + t.Errorf("Returned version %d, expected %d", v, Version) + } +} + +func TestCmdBeginEndLastError(t *testing.T) { + // ---------- Start of boilerplate code + db, err := bolt.Open("in-memory", 0600, &bolt.Options{MemOnly: true}) + if err != nil { + t.Errorf("Could not create database: %v", err) + } + // Prepare input buffer with one command CmdVersion + var inBuf bytes.Buffer + encoder := newEncoder(&inBuf) + defer returnEncoderToPool(encoder) + // output buffer to receive the result of the command + var outBuf bytes.Buffer + decoder := newDecoder(&outBuf) + defer returnDecoderToPool(decoder) + // ---------- End of boilerplate code + // Send CmdBeginTx, followed by CmdEndTx with the wrong txHandle, followed by CmdLastError, followed by CmdEndTx with the correct handle + // followed by the CmdLastError + var c = CmdBeginTx + if err = encoder.Encode(&c); err != nil { + t.Errorf("Could not encode CmdBeginTx: %v", err) + } + // CmdEnd with the wrong handle + var txHandle uint64 = 156 + c = CmdEndTx + if err = encoder.Encode(&c); err != nil { + t.Errorf("Could not encode CmdEndTx: %v", err) + } + if err = encoder.Encode(&txHandle); err != nil { + t.Errorf("Could not encode txHandle: %v", err) + } + // CmdLastError to retrive the error related to the CmdEndTx with the wrong handle + c = CmdLastError + if err = encoder.Encode(&c); err != nil { + t.Errorf("Could not encode CmdLastError: %v", err) + } + // Now we issue CmdEndTx with the correct tx handle + c = CmdEndTx + if err = encoder.Encode(&c); err != nil { + t.Errorf("Could not encode CmdEndTx: %v", err) + } + txHandle = 1 + if err = encoder.Encode(&txHandle); err != nil { + t.Errorf("Could not encode txHandle: %v", err) + } + // Check that CmdLastError now returns empty string + c = CmdLastError + if err = encoder.Encode(&c); err != nil { + t.Errorf("Could not encode CmdLastError: %v", err) + } + // By now we constructed all input requests, now we call the + // Server to process them all + if err = Server(db, &inBuf, &outBuf); err != nil { + t.Errorf("Error while calling Server: %v", err) + } + // And then we interpret the results + if err = decoder.Decode(&txHandle); err != nil { + t.Errorf("Could not decode response from CmdBeginTx") + } + var lastErrorStr string + if err = decoder.Decode(&lastErrorStr); err != nil { + t.Errorf("Could not decode response from CmdLastError") + } + if lastErrorStr != "transaction not found" { + t.Errorf("Wrong error message from CmdLastError: %s", lastErrorStr) + } + if err = decoder.Decode(&lastErrorStr); err != nil { + t.Errorf("Could not decode response from CmdLastError") + } + if lastErrorStr != "" { + t.Errorf("Wrong error message from CmdLastError: %s", lastErrorStr) + } +} + +func TestCmdBucket(t *testing.T) { + // ---------- Start of boilerplate code + db, err := bolt.Open("in-memory", 0600, &bolt.Options{MemOnly: true}) + if err != nil { + t.Errorf("Could not create database: %v", err) + } + // Prepare input buffer with one command CmdVersion + var inBuf bytes.Buffer + encoder := newEncoder(&inBuf) + defer returnEncoderToPool(encoder) + // output buffer to receive the result of the command + var outBuf bytes.Buffer + decoder := newDecoder(&outBuf) + defer returnDecoderToPool(decoder) + // ---------- End of boilerplate code + // Create a bucket + var name = []byte("testbucket") + if err = db.Update(func(tx *bolt.Tx) error { + _, err1 := tx.CreateBucket(name, false) + return err1 + }); err != nil { + t.Errorf("Could not create and populate a bucket: %v", err) + } + var c = CmdBeginTx + if err = encoder.Encode(&c); err != nil { + t.Errorf("Could not encode CmdBegin: %v", err) + } + c = CmdBucket + if err = encoder.Encode(&c); err != nil { + t.Errorf("Could not encode CmdBucket: %v", err) + } + var txHandle uint64 = 1 + if err = encoder.Encode(&txHandle); err != nil { + t.Errorf("Could not encode txHandle for CmdBucket: %v", err) + } + if err = encoder.Encode(&name); err != nil { + t.Errorf("Could not encode name for CmdBucket: %v", err) + } + // By now we constructed all input requests, now we call the + // Server to process them all + if err = Server(db, &inBuf, &outBuf); err != nil { + t.Errorf("Error while calling Server: %v", err) + } + // And then we interpret the results + if err = decoder.Decode(&txHandle); err != nil { + t.Errorf("Could not decode response from CmdBegin") + } + if txHandle != 1 { + t.Errorf("Unexpected txHandle: %d", txHandle) + } + var bucketHandle uint64 + if err = decoder.Decode(&bucketHandle); err != nil { + t.Errorf("Could not decode response from CmdBucket") + } + if bucketHandle != 2 { + t.Errorf("Unexpected bucketHandle: %d", txHandle) + } +} + +func TestCmdGet(t *testing.T) { + // ---------- Start of boilerplate code + db, err := bolt.Open("in-memory", 0600, &bolt.Options{MemOnly: true}) + if err != nil { + t.Errorf("Could not create database: %v", err) + } + // Prepare input buffer with one command CmdVersion + var inBuf bytes.Buffer + encoder := newEncoder(&inBuf) + defer returnEncoderToPool(encoder) + // output buffer to receive the result of the command + var outBuf bytes.Buffer + decoder := newDecoder(&outBuf) + defer returnDecoderToPool(decoder) + // ---------- End of boilerplate code + // Create a bucket and populate some values + var name = []byte("testbucket") + if err = db.Update(func(tx *bolt.Tx) error { + b, err1 := tx.CreateBucket(name, false) + if err1 != nil { + return err1 + } + if err1 = b.Put([]byte("key1"), []byte("value1")); err1 != nil { + return err1 + } + if err1 = b.Put([]byte("key2"), []byte("value2")); err1 != nil { + return err1 + } + return nil + }); err != nil { + t.Errorf("Could not create and populate a bucket: %v", err) + } + var c = CmdBeginTx + if err = encoder.Encode(&c); err != nil { + t.Errorf("Could not encode CmdBeginTx: %v", err) + } + c = CmdBucket + if err = encoder.Encode(&c); err != nil { + t.Errorf("Could not encode CmdBucket: %v", err) + } + var txHandle uint64 = 1 + if err = encoder.Encode(&txHandle); err != nil { + t.Errorf("Could not encode txHandle for CmdBucket: %v", err) + } + if err = encoder.Encode(&name); err != nil { + t.Errorf("Could not encode name for CmdBucket: %v", err) + } + // Issue CmdGet with existing key + c = CmdGet + if err = encoder.Encode(&c); err != nil { + t.Errorf("Could not encode CmdGet: %v", err) + } + var bucketHandle uint64 = 2 + var key = []byte("key1") + if err = encoder.Encode(&bucketHandle); err != nil { + t.Errorf("Could not encode bucketHandle for CmdGet: %v", err) + } + if err = encoder.Encode(&key); err != nil { + t.Errorf("Could not encode key for CmdGet: %v", err) + } + // Issue CmdGet with non-existing key + c = CmdGet + if err = encoder.Encode(&c); err != nil { + t.Errorf("Could not encode CmdGet: %v", err) + } + bucketHandle = 2 + key = []byte("key3") + if err = encoder.Encode(&bucketHandle); err != nil { + t.Errorf("Could not encode bucketHandle for CmdGet: %v", err) + } + if err = encoder.Encode(&key); err != nil { + t.Errorf("Could not encode key for CmdGet: %v", err) + } + // By now we constructed all input requests, now we call the + // Server to process them all + if err = Server(db, &inBuf, &outBuf); err != nil { + t.Errorf("Error while calling Server: %v", err) + } + // And then we interpret the results + // Results of CmdBeginTx + if err = decoder.Decode(&txHandle); err != nil { + t.Errorf("Could not decode response from CmdBegin") + } + if txHandle != 1 { + t.Errorf("Unexpected txHandle: %d", txHandle) + } + // Results of CmdBucket + if err = decoder.Decode(&bucketHandle); err != nil { + t.Errorf("Could not decode response from CmdBucket") + } + if bucketHandle != 2 { + t.Errorf("Unexpected bucketHandle: %d", txHandle) + } + // Results of CmdGet (for key1) + var value []byte + if err = decoder.Decode(&value); err != nil { + t.Errorf("Could not decode value from CmdGet: %v", err) + } + if string(value) != "value1" { + t.Errorf("Wrong value from CmdGet, expected: %x, got %x", "value1", value) + } + // Results of CmdGet (for key3) + if err = decoder.Decode(&value); err != nil { + t.Errorf("Could not decode value from CmdGet: %v", err) + } + if value != nil { + t.Errorf("Wrong value from CmdGet, expected: %x, got %x", "value1", value) + } +}