Use KV Abstraction in RestAPI (#400)

* Introduce NoValuesCursor. From() method is useless because can be replaced by Seek().`
* implement NoValueCursor interface
* use abstract db in restapi
* cleanup .md
This commit is contained in:
Alex Sharov 2020-03-24 09:12:55 +07:00 committed by GitHub
parent 1fb8749638
commit b490192e67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1257 additions and 952 deletions

View File

@ -8,7 +8,7 @@ import (
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/core/types/accounts"
"github.com/ledgerwatch/turbo-geth/ethdb/remote"
"github.com/ledgerwatch/turbo-geth/ethdb"
)
func RegisterAccountAPI(router *gin.RouterGroup, e *Env) error {
@ -42,13 +42,13 @@ func jsonifyAccount(account *accounts.Account) map[string]interface{} {
return result
}
func findAccountByID(accountID string, remoteDB *remote.DB) (*accounts.Account, error) {
func findAccountByID(accountID string, remoteDB ethdb.KV) (*accounts.Account, error) {
possibleKeys := getPossibleKeys(accountID)
var account *accounts.Account
err := remoteDB.View(context.TODO(), func(tx *remote.Tx) error {
err := remoteDB.View(context.TODO(), func(tx ethdb.Tx) error {
bucket := tx.Bucket(dbutils.AccountsBucket)
for _, key := range possibleKeys {

View File

@ -3,11 +3,11 @@ package apis
import (
"errors"
"github.com/ledgerwatch/turbo-geth/ethdb/remote"
"github.com/ledgerwatch/turbo-geth/ethdb"
)
var ErrEntityNotFound = errors.New("entity not found")
type Env struct {
DB *remote.DB
DB ethdb.KV
}

View File

@ -6,7 +6,7 @@ import (
"strings"
"github.com/gin-gonic/gin"
"github.com/ledgerwatch/turbo-geth/ethdb/remote"
"github.com/ledgerwatch/turbo-geth/ethdb"
)
func RegisterRemoteDBAPI(router *gin.RouterGroup, e *Env) error {
@ -17,7 +17,8 @@ func RegisterRemoteDBAPI(router *gin.RouterGroup, e *Env) error {
func (e *Env) GetDB(c *gin.Context) {
var host, port string
split := strings.Split(e.DB.GetDialAddr(), ":")
split := strings.Split(e.DB.Options().Remote.DialAddress, ":")
if len(split) == 2 {
host, port = split[0], split[1]
}
@ -26,7 +27,7 @@ func (e *Env) GetDB(c *gin.Context) {
func (e *Env) PostDB(c *gin.Context) {
newAddr := c.Query("host") + ":" + c.Query("port")
remoteDB, err := remote.Open(context.Background(), remote.DefaultOpts.Addr(newAddr))
remoteDB, err := ethdb.NewRemote().Path(newAddr).Open(context.TODO())
if err != nil {
c.Error(err) //nolint:errcheck
return

View File

@ -1,7 +1,6 @@
package apis
import (
"bytes"
"context"
"fmt"
"net/http"
@ -9,7 +8,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/ethdb/remote"
"github.com/ledgerwatch/turbo-geth/ethdb"
)
func RegisterStorageAPI(router *gin.RouterGroup, e *Env) error {
@ -32,19 +31,16 @@ type StorageResponse struct {
Value string `json:"value"`
}
func findStorageByPrefix(prefixS string, remoteDB *remote.DB) ([]*StorageResponse, error) {
func findStorageByPrefix(prefixS string, remoteDB ethdb.KV) ([]*StorageResponse, error) {
var results []*StorageResponse
prefix := common.FromHex(prefixS)
if err := remoteDB.View(context.TODO(), func(tx *remote.Tx) error {
c := tx.Bucket(dbutils.StorageBucket).Cursor(remote.DefaultCursorOpts)
if err := remoteDB.View(context.TODO(), func(tx ethdb.Tx) error {
c := tx.Bucket(dbutils.StorageBucket).Cursor().Prefix(prefix).Prefetch(200)
for k, v, err := c.Seek(prefix); k != nil; k, v, err = c.Next() {
for k, v, err := c.First(); k != nil || err != nil; k, v, err = c.Next() {
if err != nil {
return err
}
if !bytes.HasPrefix(k, prefix) {
return nil
}
results = append(results, &StorageResponse{
Prefix: fmt.Sprintf("%x\n", k),

View File

@ -9,7 +9,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/ethdb/remote"
"github.com/ledgerwatch/turbo-geth/ethdb"
)
func RegisterStorageTombstonesAPI(router *gin.RouterGroup, e *Env) error {
@ -39,23 +39,20 @@ type StorageTombsResponse struct {
HideStorage bool `json:"hideStorage"`
}
func findStorageTombstoneByPrefix(prefixS string, remoteDB *remote.DB) ([]*StorageTombsResponse, error) {
func findStorageTombstoneByPrefix(prefixS string, remoteDB ethdb.KV) ([]*StorageTombsResponse, error) {
var results []*StorageTombsResponse
prefix := common.FromHex(prefixS)
if err := remoteDB.View(context.TODO(), func(tx *remote.Tx) error {
if err := remoteDB.View(context.TODO(), func(tx ethdb.Tx) error {
interBucket := tx.Bucket(dbutils.IntermediateTrieHashBucket)
c := interBucket.Cursor(remote.DefaultCursorOpts.PrefetchValues(true))
storage := tx.Bucket(dbutils.StorageBucket).Cursor(remote.DefaultCursorOpts.PrefetchValues(false).PrefetchSize(1))
c := interBucket.Cursor().Prefix(prefix).NoValues()
storage := tx.Bucket(dbutils.StorageBucket).Cursor().Prefetch(1).NoValues()
for k, v, err := c.Seek(prefix); k != nil; k, v, err = c.Next() {
for k, vSize, err := c.First(); k != nil || err != nil; k, vSize, err = c.Next() {
if err != nil {
return err
}
if !bytes.HasPrefix(k, prefix) {
return nil
}
if len(v) > 0 {
if vSize > 0 {
continue
}
@ -113,9 +110,9 @@ type IntegrityCheck struct {
Value string `json:"value"`
}
func storageTombstonesIntegrityDBCheck(remoteDB *remote.DB) ([]*IntegrityCheck, error) {
func storageTombstonesIntegrityDBCheck(remoteDB ethdb.KV) ([]*IntegrityCheck, error) {
var results []*IntegrityCheck
return results, remoteDB.View(context.TODO(), func(tx *remote.Tx) error {
return results, remoteDB.View(context.TODO(), func(tx ethdb.Tx) error {
res, err := storageTombstonesIntegrityDBCheckTx(tx)
if err != nil {
return err
@ -125,50 +122,25 @@ func storageTombstonesIntegrityDBCheck(remoteDB *remote.DB) ([]*IntegrityCheck,
})
}
func storageTombstonesIntegrityDBCheckTx(tx *remote.Tx) ([]*IntegrityCheck, error) {
func storageTombstonesIntegrityDBCheckTx(tx ethdb.Tx) ([]*IntegrityCheck, error) {
var res []*IntegrityCheck
var check1 = &IntegrityCheck{
Name: "1 trie prefix must be covered only by 1 tombstone",
Value: "ok",
}
res = append(res, check1)
check2 := &IntegrityCheck{
check1 := &IntegrityCheck{
Name: "tombstone must hide at least 1 storage",
Value: "ok",
}
res = append(res, check2)
res = append(res, check1)
inter := tx.Bucket(dbutils.IntermediateTrieHashBucket).Cursor(remote.DefaultCursorOpts.PrefetchValues(true).PrefetchSize(1000))
cOverlap := tx.Bucket(dbutils.IntermediateTrieHashBucket).Cursor(remote.DefaultCursorOpts.PrefetchValues(true).PrefetchSize(10))
storage := tx.Bucket(dbutils.StorageBucket).Cursor(remote.DefaultCursorOpts.PrefetchValues(false).PrefetchSize(10))
inter := tx.Bucket(dbutils.IntermediateTrieHashBucket).Cursor().Prefetch(1000).NoValues()
storage := tx.Bucket(dbutils.StorageBucket).Cursor().Prefetch(10).NoValues()
for k, v, err := inter.First(); k != nil; k, v, err = inter.Next() {
for k, vSize, err := inter.First(); k != nil || err != nil; k, vSize, err = inter.Next() {
if err != nil {
return nil, err
}
if len(v) > 0 {
if vSize > 0 {
continue
}
// 1 prefix must be covered only by 1 tombstone
from := append(k, []byte{0, 0}...)
for overlapK, overlapV, err := cOverlap.Seek(from); overlapK != nil; overlapK, overlapV, err = cOverlap.Next() {
if err != nil {
return nil, err
}
if !bytes.HasPrefix(overlapK, from) {
overlapK = nil
}
if len(overlapV) > 0 {
continue
}
if bytes.HasPrefix(overlapK, k) {
check1.Value = fmt.Sprintf("%x is prefix of %x\n", overlapK, k)
break
}
}
// each tombstone must hide at least 1 storage
addrHash := common.CopyBytes(k[:common.HashLength])
storageK, _, err := storage.Seek(addrHash)
@ -193,7 +165,7 @@ func storageTombstonesIntegrityDBCheckTx(tx *remote.Tx) ([]*IntegrityCheck, erro
}
if !hideStorage {
check2.Value = fmt.Sprintf("tombstone %x has no storage to hide\n", k)
check1.Value = fmt.Sprintf("tombstone %x has no storage to hide\n", k)
break
}
}

View File

@ -7,7 +7,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/ledgerwatch/turbo-geth/cmd/restapi/apis"
"github.com/ledgerwatch/turbo-geth/ethdb/remote"
"github.com/ledgerwatch/turbo-geth/ethdb"
)
func printError(name string, err error) {
@ -29,7 +29,7 @@ func ServeREST(localAddress, remoteDbAddress string) error {
}
})
db, err := remote.Open(context.Background(), remote.DefaultOpts.Addr(remoteDbAddress))
db, err := ethdb.NewRemote().Path(remoteDbAddress).Open(context.TODO())
if err != nil {
return err
}

View File

@ -271,7 +271,7 @@ func ClearTombstonesForReCreatedAccount(db ethdb.MinDatabase, addrHash common.Ha
}
var boltDb *bolt.DB
if hasBolt, ok := db.(ethdb.KV); ok {
if hasBolt, ok := db.(ethdb.HasKV); ok {
boltDb = hasBolt.KV()
} else {
return fmt.Errorf("only Bolt supported yet, given: %T", db)
@ -333,7 +333,7 @@ func PutTombstoneForDeletedAccount(db ethdb.MinDatabase, addrHash []byte) error
}
var boltDb *bolt.DB
if hasKV, ok := db.(ethdb.KV); ok {
if hasKV, ok := db.(ethdb.HasKV); ok {
boltDb = hasKV.KV()
} else {
return fmt.Errorf("only Bolt supported yet, given: %T", db)
@ -377,7 +377,7 @@ func PutTombstoneForDeletedAccount(db ethdb.MinDatabase, addrHash []byte) error
func ClearTombstonesForNewStorage(db ethdb.MinDatabase, storageKeyNoInc []byte) error {
var boltDb *bolt.DB
if hasKV, ok := db.(ethdb.KV); ok {
if hasKV, ok := db.(ethdb.HasKV); ok {
boltDb = hasKV.KV()
} else {
return fmt.Errorf("only Bolt supported yet, given: %T", db)

View File

@ -1215,7 +1215,7 @@ func TestClearTombstonesForReCreatedAccount(t *testing.T) {
//printBucket := func() {
// fmt.Printf("IH bucket print\n")
// _ = db.KV().View(func(tx *bolt.Tx) error {
// _ = db.HasKV().View(func(tx *bolt.Tx) error {
// tx.Bucket(dbutils.IntermediateTrieHashBucket).ForEach(func(k, v []byte) error {
// if len(v) == 0 {
// fmt.Printf("IH: %x\n", k)

View File

@ -1,4 +1,4 @@
import React from 'react';
import React, {useState} from 'react';
import {Col, Container, Nav, Row} from 'react-bootstrap';
import API from './utils/API.js'
@ -9,9 +9,8 @@ import StorageTombstonesPage from './page/StorageTombstonesPage';
import {ReactComponent as Logo} from './logo.svg';
import './App.css';
import StoragePage from './page/Storage';
import RemoteDBForm from './components/RemoteDBForm';
import RemoteSidebar from './components/RemoteSidebar';
const api = new API('http://localhost:8080')
const sidebar = [
{
url: '/accounts',
@ -28,6 +27,15 @@ const sidebar = [
];
function App() {
const [host, setHost] = useState('localhost');
const [port, setPort] = useState('8080');
const onApiChange = (data) => {
setHost(data.host)
setPort(data.port)
}
const api = new API('http://' + host + ':' + port)
return (
<ErrorCatcher>
<Router>
@ -47,7 +55,8 @@ function App() {
<div className="active-pointer"/>
</NavLink>
)}
<RemoteDBForm api={api}/>
<div className="mt-5 border-secondary border-top"/>
<RemoteSidebar api={api} restHost={host} restPort={port} onApiChange={onApiChange}/>
</Nav>
</Col>
<Col xs={9} md={10} lg={11}>

View File

@ -1,111 +0,0 @@
import React, {useState} from 'react';
import Form from 'react-bootstrap/Form';
import Button from 'react-bootstrap/Button'
import {InputGroup} from 'react-bootstrap';
import Modal from 'react-bootstrap/Modal';
const get = (api, setState) => {
setState({host: '', port: '', loading: true});
const lookupSuccess = (response) => setState({host: response.data.host, port: response.data.port, loading: false});
const lookupFail = (error) => {
setState({host: '', port: '', loading: false})
setState(() => {
throw error
})
}
return api.getRemoteDB().then(lookupSuccess).catch(lookupFail);
}
const set = (host, port, api, setState) => {
setState({host: host, port: port, loading: true});
const lookupSuccess = () => setState({host: host, port: port, loading: false});
const lookupFail = (error) => {
setState({host: host, port: port, loading: true});
setState(() => {
throw error
})
}
return api.setRemoteDB(host, port).then(lookupSuccess).catch(lookupFail);
}
const RemoteDBForm = ({api}) => {
const [state, setState] = useState({host: '', port: ''});
const [show, setShow] = useState(false);
const handleHostChange = (event) => {
const host = event.target.value;
setState((prev) => {
return {host: host, port: prev.port};
});
}
const handlePortChange = (event) => {
const port = event.target.value;
setState((prev) => {
return {host: prev.host, port: port};
});
}
const handleSubmit = (event) => {
event.preventDefault();
set(state.host, state.port, api, setState)
setShow(false)
}
const handleClick = () => {
setShow(true)
get(api, setState)
}
return (
<div className="mt-3">
<Button variant="outline-secondary" size="sm"
className="w-100 rounded-0 text-break"
onClick={handleClick}>
Remote DB<br/>
{state.host && state.host + ':' + state.port}
</Button>
<Modal show={show} onHide={() => setShow(false)}>
<Modal.Header>
<Modal.Title>Remote DB</Modal.Title>
</Modal.Header>
<Modal.Body>
<Form onSubmit={handleSubmit}>
<InputGroup className="mb-1" size="sm">
<Input label="Host" value={state.host} onChange={handleHostChange}/>
</InputGroup>
<InputGroup className="mb-1" size="sm">
<Input label="Port" value={state.port} onChange={handlePortChange}/>
<InputGroup.Append>
<Button variant="outline-primary" type="submit">Button</Button>
</InputGroup.Append>
</InputGroup>
</Form>
</Modal.Body>
</Modal>
</div>
);
}
const Input = ({label, value, onChange,}) => (
<React.Fragment>
<InputGroup.Prepend>
<InputGroup.Text id="addon{label}">{label}</InputGroup.Text>
</InputGroup.Prepend>
<Form.Control
type="text"
placeholder="{Label}"
aria-describedby="addon{label}"
name="{label}"
value={value || ''}
onChange={onChange}/>
</React.Fragment>
)
export default RemoteDBForm;

View File

@ -0,0 +1,132 @@
import React, {useState} from 'react';
import {Button, Form, Modal} from 'react-bootstrap';
const RemoteSidebar = ({api, restHost, restPort, onApiChange}) => {
return (
<React.Fragment>
<RestApiForm host={restHost} port={restPort} onApiChange={onApiChange}/>
<RemoteDBForm api={api}/>
</React.Fragment>
)
}
const RestApiForm = ({host, port, onApiChange}) => {
const [show, setShow] = useState(false);
const handleSubmit = (e) => {
e.preventDefault();
setShow(false);
let form = e.target;
onApiChange({host: form.elements.host.value, port: form.elements.port.value});
}
const handleClick = (e) => {
e.preventDefault();
setShow(true)
}
return (
<div className="mb-2 font-weight-light text-break">
<a href="/rest-api" className="nav-link px-2" onClick={handleClick}>
Rest API<br/>
{host && host + ':' + port}
</a>
<ModalWindow title="Rest API" show={show} onHide={() => setShow(false)}>
<Form onSubmit={handleSubmit}>
<Input label="Host" defaultValue={host}/>
<Input label="Port" defaultValue={port}/>
<Button type="submit">Submit</Button>
</Form>
</ModalWindow>
</div>
);
}
const get = (api, setHost, setPort) => {
const lookupSuccess = (response) => {
setHost(response.data.host);
setPort(response.data.port);
}
const lookupFail = (error) => {
setHost(() => {
throw error
})
}
return api.getRemoteDB().then(lookupSuccess).catch(lookupFail);
}
const set = (host, port, api, setHost, setPort) => {
const lookupSuccess = () => {
setHost(host);
setPort(port);
};
const lookupFail = (error) => {
setHost(() => {
throw error
})
}
return api.setRemoteDB(host, port).then(lookupSuccess).catch(lookupFail);
}
const RemoteDBForm = ({api}) => {
const [host, setHost] = useState('');
const [port, setPort] = useState('');
const [show, setShow] = useState(false);
const handleSubmit = (e) => {
e.preventDefault();
e.stopPropagation();
const form = e.target;
set(form.elements.host.value, form.elements.port.value, api, setHost, setPort)
setShow(false)
}
const handleClick = (e) => {
e.preventDefault();
setShow(true)
get(api, setHost, setPort)
}
return (
<div className="pl-2 mb-2 font-weight-light text-break">
<a href="/remote-db" onClick={handleClick}>
Remote DB<br/>
{host && host + ':' + port}
</a>
<ModalWindow title="Remote DB" show={show} onHide={() => setShow(false)}>
<Form onSubmit={handleSubmit}>
<Input label="Host" defaultValue={host}/>
<Input label="Port" defaultValue={port}/>
<Button type="submit">Submit</Button>
</Form>
</ModalWindow>
</div>
);
}
const Input = ({label, ...props}) => (
<Form.Group controlId={label.toLowerCase()}>
<Form.Label>{label}</Form.Label>
<Form.Control
type="text"
placeholder={label}
aria-describedby={"addon" + label.toLowerCase()}
name={label.toLowerCase()}
{...props}
/>
</Form.Group>
)
const ModalWindow = ({children, title, ...props}) => (
<Modal {...props}>
<Modal.Header>
<Modal.Title>{title}</Modal.Title>
</Modal.Header>
<Modal.Body>
{children}
</Modal.Body>
</Modal>
)
export default RemoteSidebar;

View File

@ -142,7 +142,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
return nil, err
}
if ctx.Config.RemoteDbListenAddress != "" {
if casted, ok := chainDb.(ethdb.KV); ok {
if casted, ok := chainDb.(ethdb.HasKV); ok {
remotedbserver.StartDeprecated(casted, ctx.Config.RemoteDbListenAddress)
}
}

View File

@ -2,22 +2,6 @@
To build 1 key-value abstraction on top of Bolt, Badger and RemoteDB (our own read-only TCP protocol for key-value databases).
## Vision:
Ethereum gives users a powerful resource (which is hard to give) which is not explicitely priced -
transaction atomicity and "serialisable" isolation (the highest level of isolation you can get in the databases).
Which means that transaction does not even need to declare in advance what it wants to lock, the entire
state is deemed "locked" for its execution. I wonder if the weaker isolation models would make sense.
For example, ["read committed"](https://en.wikipedia.org/wiki/Isolation_(database_systems)#Read_committed)
with the weaker isolation, you might be able to split any transaction into smaller parts, each of which
does not perform any Dynamic State Access (I have no proof of that though).
It is similar to Tendermint strategy, but even more granular. You can view it as a support for "continuations".
Transactions starts, and whenever it hits dynamic access, its execution stops, gas is charged, and the continuation
is added to the state. Then, transaction can be resumed, because by committing to some continuation, it makes its
next dynamic state access static.
## Design principles:
- No internal copies/allocations - all must be delegated to user.
Make it part of contract - written clearly in docs, because it's unsafe (unsafe to put slice to DB and then change it).
@ -25,7 +9,47 @@ Known problems: mutation.Put does copy internally.
- Low-level API: as close to original Bolt/Badger as possible.
- Expose concept of transaction - app-level code can .Rollback() or .Commit() at once.
## Abstraction to support:
## Result interface:
```
type DB interface {
View(ctx context.Context, f func(tx Tx) error) (err error)
Update(ctx context.Context, f func(tx Tx) error) (err error)
Close() error
}
type Tx interface {
Bucket(name []byte) Bucket
}
type Bucket interface {
Get(key []byte) (val []byte, err error)
Put(key []byte, value []byte) error
Delete(key []byte) error
Cursor() Cursor
}
type Cursor interface {
Prefix(v []byte) Cursor
MatchBits(uint) Cursor
Prefetch(v uint) Cursor
NoValues() NoValuesCursor
First() ([]byte, []byte, error)
Seek(seek []byte) ([]byte, []byte, error)
Next() ([]byte, []byte, error)
Walk(walker func(k, v []byte) (bool, error)) error
}
type NoValuesCursor interface {
First() ([]byte, uint64, error)
Seek(seek []byte) ([]byte, uint64, error)
Next() ([]byte, uint64, error)
Walk(walker func(k []byte, vSize uint64) (bool, error)) error
}
```
## Rationale and Features list:
#### Buckets concept:
- Bucket is an interface, cant be nil, can't return error
@ -78,48 +102,3 @@ Known problems: mutation.Put does copy internally.
- Monotonic int DB.GetSequence
- Nested Buckets
- Backups, tx.WriteTo
## Result interface:
```
type DB interface {
View(ctx context.Context, f func(tx Tx) error) (err error)
Update(ctx context.Context, f func(tx Tx) error) (err error)
Close() error
}
type Tx interface {
Bucket(name []byte) Bucket
}
type Bucket interface {
Get(key []byte) (val []byte, err error)
Put(key []byte, value []byte) error
Delete(key []byte) error
Cursor() Cursor
}
type Cursor interface {
Prefix(v []byte) Cursor
From(v []byte) Cursor
MatchBits(uint) Cursor
Prefetch(v uint) Cursor
NoValues() Cursor
First() ([]byte, []byte, error)
Seek(seek []byte) ([]byte, []byte, error)
Next() ([]byte, []byte, error)
FirstKey() ([]byte, uint64, error)
SeekKey(seek []byte) ([]byte, uint64, error)
NextKey() ([]byte, uint64, error)
Walk(walker func(k, v []byte) (bool, error)) error
WalkKeys(walker func(k []byte, vSize uint64) (bool, error)) error
}
```
## Naming:
- `Iter` shorter `Cursor` shorter `Iterator`
- `Opts` shorter `Options`
- `Walk` shorter `ForEach`

View File

@ -1,678 +0,0 @@
package ethdb
import (
"bytes"
"context"
"fmt"
"strconv"
"github.com/dgraph-io/badger/v2"
"github.com/ledgerwatch/bolt"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/ethdb/remote"
)
type DB interface {
View(ctx context.Context, f func(tx Tx) error) (err error)
Update(ctx context.Context, f func(tx Tx) error) (err error)
Close() error
}
type Tx interface {
Bucket(name []byte) Bucket
}
type Bucket interface {
Get(key []byte) (val []byte, err error)
Put(key []byte, value []byte) error
Delete(key []byte) error
Cursor() Cursor
}
type Cursor interface {
Prefix(v []byte) Cursor
From(v []byte) Cursor
MatchBits(uint) Cursor
Prefetch(v uint) Cursor
NoValues() Cursor
First() ([]byte, []byte, error)
Seek(seek []byte) ([]byte, []byte, error)
Next() ([]byte, []byte, error)
FirstKey() ([]byte, uint64, error)
SeekKey(seek []byte) ([]byte, uint64, error)
NextKey() ([]byte, uint64, error)
Walk(walker func(k, v []byte) (bool, error)) error
WalkKeys(walker func(k []byte, vSize uint64) (bool, error)) error
}
type DbProvider uint8
const (
Bolt DbProvider = iota
Badger
Remote
)
const DefaultProvider = Bolt
type Options struct {
provider DbProvider
Remote remote.DbOpts
Bolt *bolt.Options
Badger badger.Options
path string
}
func NewBolt() Options {
opts := Options{provider: Bolt, Bolt: bolt.DefaultOptions}
return opts
}
func NewBadger() Options {
opts := Options{provider: Badger, Badger: badger.DefaultOptions("")}
return opts
}
func NewRemote() Options {
opts := Options{provider: Badger, Remote: remote.DefaultOpts}
return opts
}
func NewMemDb() Options {
return Opts().InMem(true)
}
func ProviderOpts(provider DbProvider) Options {
switch provider {
case Bolt:
return NewBolt()
case Badger:
return NewBadger()
case Remote:
return NewRemote()
default:
panic("unknown db provider: " + strconv.Itoa(int(provider)))
}
}
func Opts() Options {
return ProviderOpts(DefaultProvider)
}
func (opts Options) Path(path string) Options {
opts.path = path
switch opts.provider {
case Bolt:
// nothing to do
case Badger:
opts.Badger = opts.Badger.WithDir(path).WithValueDir(path)
case Remote:
opts.Remote = opts.Remote.Addr(path)
}
return opts
}
func (opts Options) InMem(val bool) Options {
switch opts.provider {
case Bolt:
opts.Bolt.MemOnly = val
case Badger:
opts.Badger = opts.Badger.WithInMemory(val)
case Remote:
panic("not supported")
}
return opts
}
type allDB struct {
opts Options
bolt *bolt.DB
badger *badger.DB
remote *remote.DB
}
var buckets = [][]byte{
dbutils.IntermediateTrieHashBucket,
dbutils.AccountsBucket,
}
func (opts Options) Open(ctx context.Context) (db *allDB, err error) {
return Open(ctx, opts)
}
func Open(ctx context.Context, opts Options) (db *allDB, err error) {
db = &allDB{opts: opts}
switch db.opts.provider {
case Bolt:
db.bolt, err = bolt.Open(opts.path, 0600, opts.Bolt)
if err != nil {
return nil, err
}
err = db.bolt.Update(func(tx *bolt.Tx) error {
for _, name := range buckets {
_, createErr := tx.CreateBucketIfNotExists(name, false)
if createErr != nil {
return createErr
}
}
return nil
})
case Badger:
db.badger, err = badger.Open(opts.Badger)
case Remote:
db.remote, err = remote.Open(ctx, opts.Remote)
}
if err != nil {
return nil, err
}
return db, nil
}
// Close closes DB
// All transactions must be closed before closing the database.
func (db *allDB) Close() error {
switch db.opts.provider {
case Bolt:
return db.bolt.Close()
case Badger:
return db.badger.Close()
case Remote:
return db.remote.Close()
}
return nil
}
type tx struct {
ctx context.Context
db *allDB
bolt *bolt.Tx
badger *badger.Txn
remote *remote.Tx
badgerIterators []*badger.Iterator
}
type bucket struct {
tx *tx
bolt *bolt.Bucket
badgerPrefix []byte
nameLen uint
remote *remote.Bucket
}
type cursor struct {
ctx context.Context
bucket bucket
provider DbProvider
prefix []byte
remoteOpts remote.CursorOpts
badgerOpts badger.IteratorOptions
bolt *bolt.Cursor
badger *badger.Iterator
remote *remote.Cursor
k []byte
v []byte
err error
}
func (db *allDB) View(ctx context.Context, f func(tx Tx) error) (err error) {
t := &tx{db: db, ctx: ctx}
switch db.opts.provider {
case Bolt:
return db.bolt.View(func(tx *bolt.Tx) error {
defer t.cleanup()
t.bolt = tx
return f(t)
})
case Badger:
return db.badger.View(func(tx *badger.Txn) error {
defer t.cleanup()
t.badger = tx
return f(t)
})
case Remote:
return db.remote.View(ctx, func(tx *remote.Tx) error {
t.remote = tx
return f(t)
})
}
return err
}
func (db *allDB) Update(ctx context.Context, f func(tx Tx) error) (err error) {
t := &tx{db: db, ctx: ctx}
switch db.opts.provider {
case Bolt:
return db.bolt.Update(func(tx *bolt.Tx) error {
defer t.cleanup()
t.bolt = tx
return f(t)
})
case Badger:
return db.badger.Update(func(tx *badger.Txn) error {
defer t.cleanup()
t.badger = tx
return f(t)
})
case Remote:
return fmt.Errorf("remote db provider doesn't support .Update method")
}
return err
}
func (tx *tx) Bucket(name []byte) Bucket {
b := bucket{tx: tx, nameLen: uint(len(name))}
switch tx.db.opts.provider {
case Bolt:
b.bolt = tx.bolt.Bucket(name)
case Badger:
b.badgerPrefix = name
case Remote:
b.remote = tx.remote.Bucket(name)
}
return b
}
func (tx *tx) cleanup() {
switch tx.db.opts.provider {
case Bolt:
// nothing to cleanup
case Badger:
for _, it := range tx.badgerIterators {
it.Close()
}
case Remote:
// nothing to cleanup
}
}
func (c *cursor) Prefix(v []byte) Cursor {
c.prefix = v
return c
}
func (c *cursor) From(v []byte) Cursor {
panic("not implemented yet")
}
func (c *cursor) MatchBits(n uint) Cursor {
panic("not implemented yet")
}
func (c *cursor) Prefetch(v uint) Cursor {
switch c.provider {
case Bolt:
// nothing to do
case Badger:
c.badgerOpts.PrefetchSize = int(v)
case Remote:
c.remoteOpts.PrefetchSize(uint64(v))
}
return c
}
func (c *cursor) NoValues() Cursor {
switch c.provider {
case Bolt:
// nothing to do
case Badger:
c.badgerOpts.PrefetchValues = false
case Remote:
c.remoteOpts.PrefetchValues(false)
}
return c
}
func (b bucket) Get(key []byte) (val []byte, err error) {
select {
case <-b.tx.ctx.Done():
return nil, b.tx.ctx.Err()
default:
}
switch b.tx.db.opts.provider {
case Bolt:
val, _ = b.bolt.Get(key)
case Badger:
var item *badger.Item
b.badgerPrefix = append(b.badgerPrefix[:b.nameLen], key...)
item, err = b.tx.badger.Get(b.badgerPrefix)
if item != nil {
val, err = item.ValueCopy(nil) // can improve this by using pool
}
case Remote:
val, err = b.remote.Get(key)
}
return val, err
}
func (b bucket) Put(key []byte, value []byte) error {
select {
case <-b.tx.ctx.Done():
return b.tx.ctx.Err()
default:
}
switch b.tx.db.opts.provider {
case Bolt:
return b.bolt.Put(key, value)
case Badger:
b.badgerPrefix = append(b.badgerPrefix[:b.nameLen], key...)
return b.tx.badger.Set(b.badgerPrefix, value)
case Remote:
panic("not supported")
}
return nil
}
func (b bucket) Delete(key []byte) error {
select {
case <-b.tx.ctx.Done():
return b.tx.ctx.Err()
default:
}
switch b.tx.db.opts.provider {
case Bolt:
return b.bolt.Delete(key)
case Badger:
b.badgerPrefix = append(b.badgerPrefix[:b.nameLen], key...)
return b.tx.badger.Delete(b.badgerPrefix)
case Remote:
panic("not supported")
}
return nil
}
func (b bucket) Cursor() Cursor {
c := &cursor{bucket: b, ctx: b.tx.ctx, provider: b.tx.db.opts.provider}
switch c.provider {
case Bolt:
// nothing to do
case Badger:
c.badgerOpts = badger.DefaultIteratorOptions
b.badgerPrefix = append(b.badgerPrefix[:b.nameLen], c.prefix...) // set bucket
c.badgerOpts.Prefix = b.badgerPrefix // set bucket
case Remote:
c.remoteOpts = remote.DefaultCursorOpts
}
return c
}
func (c *cursor) initCursor() {
switch c.provider {
case Bolt:
if c.bolt != nil {
return
}
c.bolt = c.bucket.bolt.Cursor()
case Badger:
if c.badger != nil {
return
}
c.badger = c.bucket.tx.badger.NewIterator(c.badgerOpts)
// add to auto-cleanup on end of transactions
if c.bucket.tx.badgerIterators == nil {
c.bucket.tx.badgerIterators = make([]*badger.Iterator, 0, 1)
}
c.bucket.tx.badgerIterators = append(c.bucket.tx.badgerIterators, c.badger)
case Remote:
if c.remote != nil {
return
}
c.remote = c.bucket.remote.Cursor(c.remoteOpts)
}
}
func (c *cursor) First() ([]byte, []byte, error) {
c.initCursor()
switch c.provider {
case Bolt:
if c.prefix != nil {
c.k, c.v = c.bolt.Seek(c.prefix)
} else {
c.k, c.v = c.bolt.First()
}
case Badger:
c.badger.Rewind()
if !c.badger.Valid() {
c.k = nil
break
}
item := c.badger.Item()
c.k = item.Key()[c.bucket.nameLen:]
if c.badgerOpts.PrefetchValues {
c.v, c.err = item.ValueCopy(c.v) // bech show: using .ValueCopy on same buffer has same speed as item.Value()
}
case Remote:
if c.prefix != nil {
c.k, c.v, c.err = c.remote.Seek(c.prefix)
} else {
c.k, c.v, c.err = c.remote.First()
}
}
return c.k, c.v, c.err
}
func (c *cursor) Seek(seek []byte) ([]byte, []byte, error) {
select {
case <-c.ctx.Done():
return nil, nil, c.ctx.Err()
default:
}
c.initCursor()
switch c.provider {
case Bolt:
c.k, c.v = c.bolt.Seek(seek)
case Badger:
c.bucket.badgerPrefix = append(c.bucket.badgerPrefix[:c.bucket.nameLen], seek...)
c.badger.Seek(c.bucket.badgerPrefix)
if !c.badger.Valid() {
c.k = nil
break
}
item := c.badger.Item()
c.k = item.Key()[c.bucket.nameLen:]
if c.badgerOpts.PrefetchValues {
c.v, c.err = item.ValueCopy(c.v)
}
case Remote:
c.k, c.v, c.err = c.remote.Seek(seek)
}
return c.k, c.v, c.err
}
func (c *cursor) Next() ([]byte, []byte, error) {
select {
case <-c.ctx.Done():
return nil, nil, c.ctx.Err()
default:
}
switch c.provider {
case Bolt:
c.k, c.v = c.bolt.Next()
if c.prefix != nil && !bytes.HasPrefix(c.k, c.prefix) {
return nil, nil, nil
}
case Badger:
c.badger.Next()
if !c.badger.Valid() {
c.k = nil
break
}
item := c.badger.Item()
c.k = item.Key()[c.bucket.nameLen:]
if c.badgerOpts.PrefetchValues {
c.v, c.err = item.ValueCopy(c.v)
}
case Remote:
c.k, c.v, c.err = c.remote.Next()
if c.err != nil {
return nil, nil, c.err
}
if c.prefix != nil && !bytes.HasPrefix(c.k, c.prefix) {
return nil, nil, nil
}
}
return c.k, c.v, c.err
}
func (c *cursor) FirstKey() ([]byte, uint64, error) {
c.initCursor()
var vSize uint64
switch c.provider {
case Bolt:
var v []byte
if c.prefix != nil {
c.k, v = c.bolt.Seek(c.prefix)
} else {
c.k, v = c.bolt.First()
}
vSize = uint64(len(v))
case Badger:
c.badger.Rewind()
if !c.badger.Valid() {
c.k = nil
break
}
item := c.badger.Item()
c.k = item.Key()[c.bucket.nameLen:]
vSize = uint64(item.ValueSize())
case Remote:
var vIsEmpty bool
if c.prefix != nil {
c.k, vIsEmpty, c.err = c.remote.SeekKey(c.prefix)
} else {
c.k, vIsEmpty, c.err = c.remote.FirstKey()
}
if !vIsEmpty {
vSize = 1
}
}
return c.k, vSize, c.err
}
func (c *cursor) SeekKey(seek []byte) ([]byte, uint64, error) {
select {
case <-c.ctx.Done():
return nil, 0, c.ctx.Err()
default:
}
c.initCursor()
var vSize uint64
switch c.provider {
case Bolt:
var v []byte
c.k, v = c.bolt.Seek(seek)
vSize = uint64(len(v))
case Badger:
c.bucket.badgerPrefix = append(c.bucket.badgerPrefix[:c.bucket.nameLen], seek...)
c.badger.Seek(c.bucket.badgerPrefix)
if !c.badger.Valid() {
c.k = nil
break
}
item := c.badger.Item()
c.k = item.Key()[c.bucket.nameLen:]
vSize = uint64(item.ValueSize())
case Remote:
var vIsEmpty bool
c.k, vIsEmpty, c.err = c.remote.SeekKey(seek)
if !vIsEmpty {
vSize = 1
}
}
return c.k, vSize, c.err
}
func (c *cursor) NextKey() ([]byte, uint64, error) {
select {
case <-c.ctx.Done():
return nil, 0, c.ctx.Err()
default:
}
var vSize uint64
switch c.provider {
case Bolt:
var v []byte
c.k, v = c.bolt.Next()
vSize = uint64(len(v))
if c.prefix != nil && !bytes.HasPrefix(c.k, c.prefix) {
return nil, 0, nil
}
case Badger:
c.badger.Next()
if !c.badger.Valid() {
c.k = nil
break
}
item := c.badger.Item()
c.k = item.Key()[c.bucket.nameLen:]
vSize = uint64(item.ValueSize())
case Remote:
var vIsEmpty bool
c.k, vIsEmpty, c.err = c.remote.NextKey()
if !vIsEmpty {
vSize = 1
}
if c.err != nil {
return nil, 0, c.err
}
if c.prefix != nil && !bytes.HasPrefix(c.k, c.prefix) {
return nil, 0, nil
}
}
return c.k, vSize, c.err
}
func (c *cursor) Walk(walker func(k, v []byte) (bool, error)) error {
for k, v, err := c.First(); k != nil || err != nil; k, v, err = c.Next() {
if err != nil {
return err
}
ok, err := walker(k, v)
if err != nil {
return err
}
if !ok {
return nil
}
}
return nil
}
func (c *cursor) WalkKeys(walker func(k []byte, vSize uint64) (bool, error)) error {
for k, vSize, err := c.FirstKey(); k != nil || err != nil; k, vSize, err = c.NextKey() {
if err != nil {
return err
}
ok, err := walker(k, vSize)
if err != nil {
return err
}
if !ok {
return nil
}
}
return nil
}

View File

@ -16,8 +16,8 @@ import (
var boltOriginDb *bolt.DB
var badgerOriginDb *badger.DB
var boltDb ethdb.DB
var badgerDb ethdb.DB
var boltDb ethdb.KV
var badgerDb ethdb.KV
func TestMain(m *testing.M) {
setupDatabases()
@ -134,9 +134,7 @@ func BenchmarkCursor(b *testing.B) {
b.Run("abstract bolt", func(b *testing.B) {
for i := 0; i < b.N; i++ {
if err := boltDb.View(ctx, func(tx ethdb.Tx) error {
bucket := tx.Bucket(dbutils.AccountsBucket)
c := bucket.Cursor()
c := tx.Bucket(dbutils.AccountsBucket).Cursor()
for k, v, err := c.First(); k != nil || err != nil; k, v, err = c.Next() {
if err != nil {
return err

View File

@ -123,7 +123,7 @@ type DbWithPendingMutations interface {
BatchSize() int
}
type KV interface {
type HasKV interface {
KV() *bolt.DB
}

175
ethdb/kv_abstract.go Normal file
View File

@ -0,0 +1,175 @@
package ethdb
import (
"context"
"strconv"
"github.com/dgraph-io/badger/v2"
"github.com/ledgerwatch/bolt"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/ethdb/remote"
)
type KV interface {
Options() Options
View(ctx context.Context, f func(tx Tx) error) (err error)
Update(ctx context.Context, f func(tx Tx) error) (err error)
Close() error
}
type Tx interface {
Bucket(name []byte) Bucket
}
type Bucket interface {
Get(key []byte) (val []byte, err error)
Put(key []byte, value []byte) error
Delete(key []byte) error
Cursor() Cursor
}
type Cursor interface {
Prefix(v []byte) Cursor
MatchBits(uint) Cursor
Prefetch(v uint) Cursor
NoValues() NoValuesCursor
First() ([]byte, []byte, error)
Seek(seek []byte) ([]byte, []byte, error)
Next() ([]byte, []byte, error)
Walk(walker func(k, v []byte) (bool, error)) error
}
type NoValuesCursor interface {
First() ([]byte, uint64, error)
Seek(seek []byte) ([]byte, uint64, error)
Next() ([]byte, uint64, error)
Walk(walker func(k []byte, vSize uint64) (bool, error)) error
}
type DbProvider uint8
const (
Bolt DbProvider = iota
Badger
Remote
)
const DefaultProvider = Bolt
type Options struct {
provider DbProvider
Remote remote.DbOpts
Bolt *bolt.Options
Badger badger.Options
path string
}
func NewBolt() Options {
opts := Options{provider: Bolt, Bolt: bolt.DefaultOptions}
return opts
}
func NewBadger() Options {
opts := Options{provider: Badger, Badger: badger.DefaultOptions("")}
return opts
}
func NewRemote() Options {
opts := Options{provider: Remote, Remote: remote.DefaultOpts}
return opts
}
func NewMemDb() Options {
return Opts().InMem(true)
}
func ProviderOpts(provider DbProvider) Options {
switch provider {
case Bolt:
return NewBolt()
case Badger:
return NewBadger()
case Remote:
return NewRemote()
default:
panic("unknown db provider: " + strconv.Itoa(int(provider)))
}
}
func Opts() Options {
return ProviderOpts(DefaultProvider)
}
func (opts Options) Path(path string) Options {
opts.path = path
switch opts.provider {
case Bolt:
// nothing to do
case Badger:
opts.Badger = opts.Badger.WithDir(path).WithValueDir(path)
case Remote:
opts.Remote = opts.Remote.Addr(path)
}
return opts
}
func (opts Options) InMem(val bool) Options {
switch opts.provider {
case Bolt:
opts.Bolt.MemOnly = val
case Badger:
opts.Badger = opts.Badger.WithInMemory(val)
case Remote:
panic("not supported")
}
return opts
}
func (opts Options) Open(ctx context.Context) (db KV, err error) {
return Open(ctx, opts)
}
func Open(ctx context.Context, opts Options) (KV, error) {
switch opts.provider {
case Bolt:
db := &BoltKV{opts: opts}
boltDB, err := bolt.Open(opts.path, 0600, opts.Bolt)
if err != nil {
return nil, err
}
db.bolt = boltDB
err = db.bolt.Update(func(tx *bolt.Tx) error {
for _, name := range dbutils.Buckets {
_, createErr := tx.CreateBucketIfNotExists(name, false)
if createErr != nil {
return createErr
}
}
return nil
})
if err != nil {
return nil, err
}
return db, nil
case Badger:
db := &badgerDB{opts: opts}
badgerDB, err := badger.Open(opts.Badger)
if err != nil {
return nil, err
}
db.badger = badgerDB
return db, nil
case Remote:
db := &remoteDB{opts: opts}
remoteDb, err := remote.Open(ctx, opts.Remote)
if err != nil {
return nil, err
}
db.remote = remoteDb
return db, nil
}
panic("unknown db provider")
}

View File

@ -17,7 +17,7 @@ func TestManagedTx(t *testing.T) {
ctx := context.Background()
t.Run("Bolt", func(t *testing.T) {
var db ethdb.DB
var db ethdb.KV
var errOpen error
db, errOpen = ethdb.NewBolt().InMem(true).Open(ctx)
assert.NoError(t, errOpen)
@ -52,7 +52,8 @@ func TestManagedTx(t *testing.T) {
_ = v
}
for k, vSize, err := c.FirstKey(); k != nil || err != nil; k, vSize, err = c.NextKey() {
c2 := c.NoValues()
for k, vSize, err := c2.First(); k != nil || err != nil; k, vSize, err = c2.Next() {
if err != nil {
return err
}
@ -73,7 +74,7 @@ func TestManagedTx(t *testing.T) {
})
t.Run("Badger", func(t *testing.T) {
var db ethdb.DB
var db ethdb.KV
var errOpen error
db, errOpen = ethdb.NewBadger().InMem(true).Open(ctx)
assert.NoError(t, errOpen)
@ -113,7 +114,7 @@ func TestManagedTx(t *testing.T) {
//c, err := tx.Bucket(dbutil.AccountBucket).CursorOpts().From(key).Cursor()
//
//c, err := b.Cursor(b.CursorOpts().From(key).MatchBits(common.HashLength * 8))
c := b.Cursor()
c := b.Cursor().NoValues()
for k, v, err := c.First(); k != nil || err != nil; k, v, err = c.Next() {
if err != nil {
@ -122,7 +123,7 @@ func TestManagedTx(t *testing.T) {
_ = v
}
for k, vSize, err := c.FirstKey(); k != nil || err != nil; k, vSize, err = c.NextKey() {
for k, vSize, err := c.First(); k != nil || err != nil; k, vSize, err = c.Next() {
if err != nil {
return err
}
@ -160,7 +161,7 @@ func TestCancelTest(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Microsecond)
defer cancel()
var db ethdb.DB
var db ethdb.KV
var errOpen error
db, errOpen = ethdb.NewBolt().InMem(true).Open(ctx)
assert.NoError(t, errOpen)
@ -186,7 +187,7 @@ func TestCancelTest(t *testing.T) {
func TestFilterTest(t *testing.T) {
ctx := context.Background()
var db ethdb.DB
var db ethdb.KV
var errOpen error
db, errOpen = ethdb.NewBolt().InMem(true).Open(ctx)
assert.NoError(t, errOpen)
@ -247,7 +248,7 @@ func TestUnmanagedTx(t *testing.T) {
ctx := context.Background()
t.Run("Bolt", func(t *testing.T) {
var db ethdb.DB
var db ethdb.KV
var errOpen error
db, errOpen = ethdb.NewBolt().InMem(true).Open(ctx)
assert.NoError(t, errOpen)

316
ethdb/kv_badger.go Normal file
View File

@ -0,0 +1,316 @@
package ethdb
import (
"context"
"github.com/dgraph-io/badger/v2"
)
type badgerDB struct {
opts Options
badger *badger.DB
}
// Close closes BoltKV
// All transactions must be closed before closing the database.
func (db *badgerDB) Close() error {
return db.badger.Close()
}
type badgerTx struct {
ctx context.Context
db *badgerDB
badger *badger.Txn
badgerIterators []*badger.Iterator
}
type badgerBucket struct {
tx *badgerTx
badgerPrefix []byte
nameLen uint
}
type badgerCursor struct {
ctx context.Context
bucket badgerBucket
prefix []byte
badgerOpts badger.IteratorOptions
badger *badger.Iterator
k []byte
v []byte
err error
}
func (db *badgerDB) Options() Options {
return db.opts
}
func (db *badgerDB) View(ctx context.Context, f func(tx Tx) error) (err error) {
t := &badgerTx{db: db, ctx: ctx}
return db.badger.View(func(tx *badger.Txn) error {
defer t.cleanup()
t.badger = tx
return f(t)
})
}
func (db *badgerDB) Update(ctx context.Context, f func(tx Tx) error) (err error) {
t := &badgerTx{db: db, ctx: ctx}
return db.badger.Update(func(tx *badger.Txn) error {
defer t.cleanup()
t.badger = tx
return f(t)
})
}
func (tx *badgerTx) Bucket(name []byte) Bucket {
b := badgerBucket{tx: tx, nameLen: uint(len(name))}
b.badgerPrefix = name
return b
}
func (tx *badgerTx) cleanup() {
for _, it := range tx.badgerIterators {
it.Close()
}
}
func (c *badgerCursor) Prefix(v []byte) Cursor {
c.prefix = v
return c
}
func (c *badgerCursor) MatchBits(n uint) Cursor {
panic("not implemented yet")
}
func (c *badgerCursor) Prefetch(v uint) Cursor {
c.badgerOpts.PrefetchSize = int(v)
return c
}
func (c *badgerCursor) NoValues() NoValuesCursor {
c.badgerOpts.PrefetchValues = false
return &badgerNoValuesCursor{badgerCursor: *c}
}
func (b badgerBucket) Get(key []byte) (val []byte, err error) {
select {
case <-b.tx.ctx.Done():
return nil, b.tx.ctx.Err()
default:
}
var item *badger.Item
b.badgerPrefix = append(b.badgerPrefix[:b.nameLen], key...)
item, err = b.tx.badger.Get(b.badgerPrefix)
if item != nil {
val, err = item.ValueCopy(nil) // can improve this by using pool
}
return val, err
}
func (b badgerBucket) Put(key []byte, value []byte) error {
select {
case <-b.tx.ctx.Done():
return b.tx.ctx.Err()
default:
}
b.badgerPrefix = append(b.badgerPrefix[:b.nameLen], key...)
return b.tx.badger.Set(b.badgerPrefix, value)
}
func (b badgerBucket) Delete(key []byte) error {
select {
case <-b.tx.ctx.Done():
return b.tx.ctx.Err()
default:
}
b.badgerPrefix = append(b.badgerPrefix[:b.nameLen], key...)
return b.tx.badger.Delete(b.badgerPrefix)
return nil
}
func (b badgerBucket) Cursor() Cursor {
c := &badgerCursor{bucket: b, ctx: b.tx.ctx}
// nothing to do
c.badgerOpts = badger.DefaultIteratorOptions
b.badgerPrefix = append(b.badgerPrefix[:b.nameLen], c.prefix...) // set bucket
c.badgerOpts.Prefix = b.badgerPrefix // set bucket
return c
}
func (c *badgerCursor) initCursor() {
if c.badger != nil {
return
}
c.badger = c.bucket.tx.badger.NewIterator(c.badgerOpts)
// add to auto-cleanup on end of transactions
if c.bucket.tx.badgerIterators == nil {
c.bucket.tx.badgerIterators = make([]*badger.Iterator, 0, 1)
}
c.bucket.tx.badgerIterators = append(c.bucket.tx.badgerIterators, c.badger)
}
func (c *badgerCursor) First() ([]byte, []byte, error) {
c.initCursor()
c.badger.Rewind()
if !c.badger.Valid() {
c.k = nil
return c.k, c.v, c.err
}
item := c.badger.Item()
c.k = item.Key()[c.bucket.nameLen:]
if c.badgerOpts.PrefetchValues {
c.v, c.err = item.ValueCopy(c.v) // bech show: using .ValueCopy on same buffer has same speed as item.Value()
}
return c.k, c.v, c.err
}
func (c *badgerCursor) Seek(seek []byte) ([]byte, []byte, error) {
select {
case <-c.ctx.Done():
return nil, nil, c.ctx.Err()
default:
}
c.initCursor()
c.bucket.badgerPrefix = append(c.bucket.badgerPrefix[:c.bucket.nameLen], seek...)
c.badger.Seek(c.bucket.badgerPrefix)
if !c.badger.Valid() {
c.k = nil
return c.k, c.v, c.err
}
item := c.badger.Item()
c.k = item.Key()[c.bucket.nameLen:]
if c.badgerOpts.PrefetchValues {
c.v, c.err = item.ValueCopy(c.v)
}
return c.k, c.v, c.err
}
func (c *badgerCursor) Next() ([]byte, []byte, error) {
select {
case <-c.ctx.Done():
return nil, nil, c.ctx.Err()
default:
}
c.badger.Next()
if !c.badger.Valid() {
c.k = nil
return c.k, c.v, c.err
}
item := c.badger.Item()
c.k = item.Key()[c.bucket.nameLen:]
if c.badgerOpts.PrefetchValues {
c.v, c.err = item.ValueCopy(c.v)
}
return c.k, c.v, c.err
}
func (c *badgerCursor) Walk(walker func(k, v []byte) (bool, error)) error {
for k, v, err := c.First(); k != nil || err != nil; k, v, err = c.Next() {
if err != nil {
return err
}
ok, err := walker(k, v)
if err != nil {
return err
}
if !ok {
return nil
}
}
return nil
}
type badgerNoValuesCursor struct {
badgerCursor
}
func (c *badgerNoValuesCursor) Walk(walker func(k []byte, vSize uint64) (bool, error)) error {
for k, vSize, err := c.First(); k != nil || err != nil; k, vSize, err = c.Next() {
if err != nil {
return err
}
ok, err := walker(k, vSize)
if err != nil {
return err
}
if !ok {
return nil
}
}
return nil
}
func (c *badgerNoValuesCursor) First() ([]byte, uint64, error) {
c.initCursor()
var vSize uint64
c.badger.Rewind()
if !c.badger.Valid() {
c.k = nil
return c.k, vSize, c.err
}
item := c.badger.Item()
c.k = item.Key()[c.bucket.nameLen:]
vSize = uint64(item.ValueSize())
return c.k, vSize, c.err
}
func (c *badgerNoValuesCursor) Seek(seek []byte) ([]byte, uint64, error) {
select {
case <-c.ctx.Done():
return nil, 0, c.ctx.Err()
default:
}
c.initCursor()
var vSize uint64
c.bucket.badgerPrefix = append(c.bucket.badgerPrefix[:c.bucket.nameLen], seek...)
c.badger.Seek(c.bucket.badgerPrefix)
if !c.badger.Valid() {
c.k = nil
return c.k, vSize, c.err
}
item := c.badger.Item()
c.k = item.Key()[c.bucket.nameLen:]
vSize = uint64(item.ValueSize())
return c.k, vSize, c.err
}
func (c *badgerNoValuesCursor) Next() ([]byte, uint64, error) {
select {
case <-c.ctx.Done():
return nil, 0, c.ctx.Err()
default:
}
var vSize uint64
c.badger.Next()
if !c.badger.Valid() {
c.k = nil
return c.k, vSize, c.err
}
item := c.badger.Item()
c.k = item.Key()[c.bucket.nameLen:]
vSize = uint64(item.ValueSize())
return c.k, vSize, c.err
}

260
ethdb/kv_bolt.go Normal file
View File

@ -0,0 +1,260 @@
package ethdb
import (
"bytes"
"context"
"github.com/ledgerwatch/bolt"
)
type BoltKV struct {
opts Options
bolt *bolt.DB
}
// Close closes BoltKV
// All transactions must be closed before closing the database.
func (db *BoltKV) Close() error {
return db.bolt.Close()
}
func (db *BoltKV) Options() Options {
return db.opts
}
type boltTx struct {
ctx context.Context
db *BoltKV
bolt *bolt.Tx
}
type boltBucket struct {
tx *boltTx
bolt *bolt.Bucket
nameLen uint
}
type boltCursor struct {
ctx context.Context
bucket boltBucket
prefix []byte
bolt *bolt.Cursor
k []byte
v []byte
err error
}
func (db *BoltKV) View(ctx context.Context, f func(tx Tx) error) (err error) {
t := &boltTx{db: db, ctx: ctx}
return db.bolt.View(func(tx *bolt.Tx) error {
defer t.cleanup()
t.bolt = tx
return f(t)
})
}
func (db *BoltKV) Update(ctx context.Context, f func(tx Tx) error) (err error) {
t := &boltTx{db: db, ctx: ctx}
return db.bolt.Update(func(tx *bolt.Tx) error {
defer t.cleanup()
t.bolt = tx
return f(t)
})
}
func (tx *boltTx) Bucket(name []byte) Bucket {
b := boltBucket{tx: tx, nameLen: uint(len(name))}
b.bolt = tx.bolt.Bucket(name)
return b
}
func (tx *boltTx) cleanup() {
// nothing to cleanup
}
func (c *boltCursor) Prefix(v []byte) Cursor {
c.prefix = v
return c
}
func (c *boltCursor) MatchBits(n uint) Cursor {
panic("not implemented yet")
}
func (c *boltCursor) Prefetch(v uint) Cursor {
// nothing to do
return c
}
func (c *boltCursor) NoValues() NoValuesCursor {
return &noValuesBoltCursor{boltCursor: *c}
}
func (b boltBucket) Get(key []byte) (val []byte, err error) {
select {
case <-b.tx.ctx.Done():
return nil, b.tx.ctx.Err()
default:
}
val, _ = b.bolt.Get(key)
return val, err
}
func (b boltBucket) Put(key []byte, value []byte) error {
select {
case <-b.tx.ctx.Done():
return b.tx.ctx.Err()
default:
}
return b.bolt.Put(key, value)
}
func (b boltBucket) Delete(key []byte) error {
select {
case <-b.tx.ctx.Done():
return b.tx.ctx.Err()
default:
}
return b.bolt.Delete(key)
}
func (b boltBucket) Cursor() Cursor {
c := &boltCursor{bucket: b, ctx: b.tx.ctx}
// nothing to do
return c
}
func (c *boltCursor) initCursor() {
if c.bolt != nil {
return
}
c.bolt = c.bucket.bolt.Cursor()
}
func (c *boltCursor) First() ([]byte, []byte, error) {
c.initCursor()
if c.prefix != nil {
c.k, c.v = c.bolt.Seek(c.prefix)
} else {
c.k, c.v = c.bolt.First()
}
return c.k, c.v, c.err
}
func (c *boltCursor) Seek(seek []byte) ([]byte, []byte, error) {
select {
case <-c.ctx.Done():
return nil, nil, c.ctx.Err()
default:
}
c.initCursor()
c.k, c.v = c.bolt.Seek(seek)
return c.k, c.v, c.err
}
func (c *boltCursor) Next() ([]byte, []byte, error) {
select {
case <-c.ctx.Done():
return nil, nil, c.ctx.Err()
default:
}
c.k, c.v = c.bolt.Next()
if c.prefix != nil && !bytes.HasPrefix(c.k, c.prefix) {
return nil, nil, nil
}
return c.k, c.v, c.err
}
func (c *boltCursor) Walk(walker func(k, v []byte) (bool, error)) error {
for k, v, err := c.First(); k != nil || err != nil; k, v, err = c.Next() {
if err != nil {
return err
}
ok, err := walker(k, v)
if err != nil {
return err
}
if !ok {
return nil
}
}
return nil
}
type noValuesBoltCursor struct {
boltCursor
}
func (c *noValuesBoltCursor) Walk(walker func(k []byte, vSize uint64) (bool, error)) error {
for k, vSize, err := c.First(); k != nil || err != nil; k, vSize, err = c.Next() {
if err != nil {
return err
}
ok, err := walker(k, vSize)
if err != nil {
return err
}
if !ok {
return nil
}
}
return nil
}
func (c *noValuesBoltCursor) First() ([]byte, uint64, error) {
c.initCursor()
var vSize uint64
var v []byte
if c.prefix != nil {
c.k, v = c.bolt.Seek(c.prefix)
} else {
c.k, v = c.bolt.First()
}
vSize = uint64(len(v))
return c.k, vSize, c.err
}
func (c *noValuesBoltCursor) Seek(seek []byte) ([]byte, uint64, error) {
select {
case <-c.ctx.Done():
return nil, 0, c.ctx.Err()
default:
}
c.initCursor()
var vSize uint64
var v []byte
c.k, v = c.bolt.Seek(seek)
vSize = uint64(len(v))
return c.k, vSize, c.err
}
func (c *noValuesBoltCursor) Next() ([]byte, uint64, error) {
select {
case <-c.ctx.Done():
return nil, 0, c.ctx.Err()
default:
}
var vSize uint64
var v []byte
c.k, v = c.bolt.Next()
vSize = uint64(len(v))
if c.prefix != nil && !bytes.HasPrefix(c.k, c.prefix) {
return nil, 0, nil
}
return c.k, vSize, c.err
}

259
ethdb/kv_remote.go Normal file
View File

@ -0,0 +1,259 @@
package ethdb
import (
"bytes"
"context"
"fmt"
"github.com/ledgerwatch/turbo-geth/ethdb/remote"
)
type remoteDB struct {
opts Options
remote *remote.DB
}
func (db *remoteDB) Options() Options {
return db.opts
}
// Close closes BoltKV
// All transactions must be closed before closing the database.
func (db *remoteDB) Close() error {
return db.remote.Close()
}
type remoteTx struct {
ctx context.Context
db *remoteDB
remote *remote.Tx
}
type remoteBucket struct {
tx *remoteTx
nameLen uint
remote *remote.Bucket
}
type remoteCursor struct {
ctx context.Context
bucket remoteBucket
prefix []byte
remoteOpts remote.CursorOpts
remote *remote.Cursor
k []byte
v []byte
err error
}
func (db *remoteDB) View(ctx context.Context, f func(tx Tx) error) (err error) {
t := &remoteTx{db: db, ctx: ctx}
return db.remote.View(ctx, func(tx *remote.Tx) error {
t.remote = tx
return f(t)
})
}
func (db *remoteDB) Update(ctx context.Context, f func(tx Tx) error) (err error) {
return fmt.Errorf("remote db provider doesn't support .Update method")
}
func (tx *remoteTx) Bucket(name []byte) Bucket {
b := remoteBucket{tx: tx, nameLen: uint(len(name))}
b.remote = tx.remote.Bucket(name)
return b
}
func (tx *remoteTx) cleanup() {
// nothing to cleanup
}
func (c *remoteCursor) Prefix(v []byte) Cursor {
c.prefix = v
return c
}
func (c *remoteCursor) MatchBits(n uint) Cursor {
panic("not implemented yet")
}
func (c *remoteCursor) Prefetch(v uint) Cursor {
c.remoteOpts.PrefetchSize(uint64(v))
return c
}
func (c *remoteCursor) NoValues() NoValuesCursor {
c.remoteOpts.PrefetchValues(false)
return &remoteNoValuesCursor{remoteCursor: *c}
}
func (b remoteBucket) Get(key []byte) (val []byte, err error) {
select {
case <-b.tx.ctx.Done():
return nil, b.tx.ctx.Err()
default:
}
val, err = b.remote.Get(key)
return val, err
}
func (b remoteBucket) Put(key []byte, value []byte) error {
panic("not supported")
}
func (b remoteBucket) Delete(key []byte) error {
panic("not supported")
}
func (b remoteBucket) Cursor() Cursor {
c := &remoteCursor{bucket: b, ctx: b.tx.ctx}
c.remoteOpts = remote.DefaultCursorOpts
return c
}
func (c *remoteCursor) initCursor() {
if c.remote != nil {
return
}
c.remote = c.bucket.remote.Cursor(c.remoteOpts)
}
func (c *remoteCursor) First() ([]byte, []byte, error) {
c.initCursor()
if c.prefix != nil {
c.k, c.v, c.err = c.remote.Seek(c.prefix)
} else {
c.k, c.v, c.err = c.remote.First()
}
return c.k, c.v, c.err
}
func (c *remoteCursor) Seek(seek []byte) ([]byte, []byte, error) {
select {
case <-c.ctx.Done():
return nil, nil, c.ctx.Err()
default:
}
c.initCursor()
c.k, c.v, c.err = c.remote.Seek(seek)
return c.k, c.v, c.err
}
func (c *remoteCursor) Next() ([]byte, []byte, error) {
select {
case <-c.ctx.Done():
return nil, nil, c.ctx.Err()
default:
}
c.k, c.v, c.err = c.remote.Next()
if c.err != nil {
return nil, nil, c.err
}
if c.prefix != nil && !bytes.HasPrefix(c.k, c.prefix) {
return nil, nil, nil
}
return c.k, c.v, c.err
}
func (c *remoteCursor) Walk(walker func(k, v []byte) (bool, error)) error {
for k, v, err := c.First(); k != nil || err != nil; k, v, err = c.Next() {
if err != nil {
return err
}
ok, err := walker(k, v)
if err != nil {
return err
}
if !ok {
return nil
}
}
return nil
}
type remoteNoValuesCursor struct {
remoteCursor
}
func (c *remoteNoValuesCursor) Walk(walker func(k []byte, vSize uint64) (bool, error)) error {
for k, vSize, err := c.First(); k != nil || err != nil; k, vSize, err = c.Next() {
if err != nil {
return err
}
ok, err := walker(k, vSize)
if err != nil {
return err
}
if !ok {
return nil
}
}
return nil
}
func (c *remoteNoValuesCursor) First() ([]byte, uint64, error) {
c.initCursor()
var vSize uint64
var vIsEmpty bool
if c.prefix != nil {
c.k, vIsEmpty, c.err = c.remote.SeekKey(c.prefix)
} else {
c.k, vIsEmpty, c.err = c.remote.FirstKey()
}
if !vIsEmpty {
vSize = 1
}
return c.k, vSize, c.err
}
func (c *remoteNoValuesCursor) Seek(seek []byte) ([]byte, uint64, error) {
select {
case <-c.ctx.Done():
return nil, 0, c.ctx.Err()
default:
}
c.initCursor()
var vSize uint64
var vIsEmpty bool
c.k, vIsEmpty, c.err = c.remote.SeekKey(seek)
if !vIsEmpty {
vSize = 1
}
return c.k, vSize, c.err
}
func (c *remoteNoValuesCursor) Next() ([]byte, uint64, error) {
select {
case <-c.ctx.Done():
return nil, 0, c.ctx.Err()
default:
}
var vSize uint64
var vIsEmpty bool
c.k, vIsEmpty, c.err = c.remote.NextKey()
if !vIsEmpty {
vSize = 1
}
if c.err != nil {
return nil, 0, c.err
}
if c.prefix != nil && !bytes.HasPrefix(c.k, c.prefix) {
return nil, 0, nil
}
return c.k, vSize, c.err
}

View File

@ -94,7 +94,7 @@ type mutation struct {
}
func (m *mutation) KV() *bolt.DB {
if casted, ok := m.db.(KV); !ok {
if casted, ok := m.db.(HasKV); !ok {
return nil
} else {
return casted.KV()

View File

@ -139,7 +139,7 @@ type conn struct {
}
type DbOpts struct {
dialAddress string
DialAddress string
DialFunc DialFunc
DialTimeout time.Duration
PingTimeout time.Duration
@ -157,7 +157,7 @@ var DefaultOpts = DbOpts{
}
func (opts DbOpts) Addr(v string) DbOpts {
opts.dialAddress = v
opts.DialAddress = v
return opts
}
@ -261,10 +261,10 @@ func (closer notifyOnClose) Close() error {
func Open(parentCtx context.Context, opts DbOpts) (*DB, error) {
if opts.DialFunc == nil {
opts.DialFunc = func(ctx context.Context) (in io.Reader, out io.Writer, closer io.Closer, err error) {
if opts.dialAddress == "" {
return nil, nil, nil, fmt.Errorf("please set opts.dialAddress or opts.DialFunc")
if opts.DialAddress == "" {
return nil, nil, nil, fmt.Errorf("please set opts.DialAddress or opts.DialFunc")
}
return defaultDialFunc(ctx, opts.dialAddress)
return defaultDialFunc(ctx, opts.DialAddress)
}
}
@ -343,10 +343,6 @@ func (db *DB) autoReconnect(ctx context.Context) {
}
}
func (db *DB) GetDialAddr() string {
return db.opts.dialAddress
}
// Close closes DB by using the closer field
func (db *DB) Close() error {
db.cancelConnections()

View File

@ -26,7 +26,7 @@ const Version uint64 = 2
// It runs while the connection is active and keep the entire connection's context
// in the local variables
// For tests, bytes.Buffer can be used for both `in` and `out`
func Server(ctx context.Context, db ethdb.KV, in io.Reader, out io.Writer, closer io.Closer) error {
func Server(ctx context.Context, db ethdb.HasKV, in io.Reader, out io.Writer, closer io.Closer) error {
defer func() {
if err1 := closer.Close(); err1 != nil {
logger.Error("Could not close connection", "err", err1)
@ -475,7 +475,7 @@ func encodeErr(encoder *codec.Encoder, mainError error) {
var netAddr string
var stopNetInterface context.CancelFunc
func StartDeprecated(db ethdb.KV, addr string) {
func StartDeprecated(db ethdb.HasKV, addr string) {
if stopNetInterface != nil {
stopNetInterface()
}
@ -515,7 +515,7 @@ func StartDeprecated(db ethdb.KV, addr string) {
// Listener starts listener that for each incoming connection
// spawn a go-routine invoking Server
func Listen(ctx context.Context, ln net.Listener, db ethdb.KV) {
func Listen(ctx context.Context, ln net.Listener, db ethdb.HasKV) {
defer func() {
if err := ln.Close(); err != nil {
logger.Error("Could not close listener", "err", err)

View File

@ -184,7 +184,7 @@ func (tr *ResolverStatefulCached) RebuildTrie(
}
var boltDb *bolt.DB
if hasBolt, ok := db.(ethdb.KV); ok {
if hasBolt, ok := db.(ethdb.HasKV); ok {
boltDb = hasBolt.KV()
}