go-pulse/swarm/storage/ldbstore.go

1085 lines
27 KiB
Go

// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// disk storage layer for the package bzz
// DbStore implements the ChunkStore interface and is used by the FileStore as
// persistent storage of chunks
// it implements purging based on access count allowing for external control of
// max capacity
package storage
import (
"archive/tar"
"bytes"
"context"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io"
"io/ioutil"
"sync"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/storage/mock"
"github.com/syndtr/goleveldb/leveldb"
)
const (
defaultGCRatio = 10
defaultMaxGCRound = 10000
defaultMaxGCBatch = 5000
wEntryCnt = 1 << 0
wIndexCnt = 1 << 1
wAccessCnt = 1 << 2
)
var (
dbEntryCount = metrics.NewRegisteredCounter("ldbstore.entryCnt", nil)
)
var (
keyIndex = byte(0)
keyAccessCnt = []byte{2}
keyEntryCnt = []byte{3}
keyDataIdx = []byte{4}
keyData = byte(6)
keyDistanceCnt = byte(7)
keySchema = []byte{8}
keyGCIdx = byte(9) // access to chunk data index, used by garbage collection in ascending order from first entry
)
var (
ErrDBClosed = errors.New("LDBStore closed")
)
type LDBStoreParams struct {
*StoreParams
Path string
Po func(Address) uint8
}
// NewLDBStoreParams constructs LDBStoreParams with the specified values.
func NewLDBStoreParams(storeparams *StoreParams, path string) *LDBStoreParams {
return &LDBStoreParams{
StoreParams: storeparams,
Path: path,
Po: func(k Address) (ret uint8) { return uint8(Proximity(storeparams.BaseKey, k[:])) },
}
}
type garbage struct {
maxRound int // maximum number of chunks to delete in one garbage collection round
maxBatch int // maximum number of chunks to delete in one db request batch
ratio int // 1/x ratio to calculate the number of chunks to gc on a low capacity db
count int // number of chunks deleted in running round
target int // number of chunks to delete in running round
batch *dbBatch // the delete batch
runC chan struct{} // struct in chan means gc is NOT running
}
type LDBStore struct {
db *LDBDatabase
// this should be stored in db, accessed transactionally
entryCnt uint64 // number of items in the LevelDB
accessCnt uint64 // ever-accumulating number increased every time we read/access an entry
dataIdx uint64 // similar to entryCnt, but we only increment it
capacity uint64
bucketCnt []uint64
hashfunc SwarmHasher
po func(Address) uint8
batchesC chan struct{}
closed bool
batch *dbBatch
lock sync.RWMutex
quit chan struct{}
gc *garbage
// Functions encodeDataFunc is used to bypass
// the default functionality of DbStore with
// mock.NodeStore for testing purposes.
encodeDataFunc func(chunk Chunk) []byte
// If getDataFunc is defined, it will be used for
// retrieving the chunk data instead from the local
// LevelDB database.
getDataFunc func(key Address) (data []byte, err error)
}
type dbBatch struct {
*leveldb.Batch
err error
c chan struct{}
}
func newBatch() *dbBatch {
return &dbBatch{Batch: new(leveldb.Batch), c: make(chan struct{})}
}
// TODO: Instead of passing the distance function, just pass the address from which distances are calculated
// to avoid the appearance of a pluggable distance metric and opportunities of bugs associated with providing
// a function different from the one that is actually used.
func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) {
s = new(LDBStore)
s.hashfunc = params.Hash
s.quit = make(chan struct{})
s.batchesC = make(chan struct{}, 1)
go s.writeBatches()
s.batch = newBatch()
// associate encodeData with default functionality
s.encodeDataFunc = encodeData
s.db, err = NewLDBDatabase(params.Path)
if err != nil {
return nil, err
}
s.po = params.Po
s.setCapacity(params.DbCapacity)
s.bucketCnt = make([]uint64, 0x100)
for i := 0; i < 0x100; i++ {
k := make([]byte, 2)
k[0] = keyDistanceCnt
k[1] = uint8(i)
cnt, _ := s.db.Get(k)
s.bucketCnt[i] = BytesToU64(cnt)
}
data, _ := s.db.Get(keyEntryCnt)
s.entryCnt = BytesToU64(data)
data, _ = s.db.Get(keyAccessCnt)
s.accessCnt = BytesToU64(data)
data, _ = s.db.Get(keyDataIdx)
s.dataIdx = BytesToU64(data)
// set up garbage collection
s.gc = &garbage{
maxBatch: defaultMaxGCBatch,
maxRound: defaultMaxGCRound,
ratio: defaultGCRatio,
}
s.gc.runC = make(chan struct{}, 1)
s.gc.runC <- struct{}{}
return s, nil
}
// MarkAccessed increments the access counter as a best effort for a chunk, so
// the chunk won't get garbage collected.
func (s *LDBStore) MarkAccessed(addr Address) {
s.lock.Lock()
defer s.lock.Unlock()
if s.closed {
return
}
proximity := s.po(addr)
s.tryAccessIdx(addr, proximity)
}
// initialize and set values for processing of gc round
func (s *LDBStore) startGC(c int) {
s.gc.count = 0
// calculate the target number of deletions
if c >= s.gc.maxRound {
s.gc.target = s.gc.maxRound
} else {
s.gc.target = c / s.gc.ratio
}
s.gc.batch = newBatch()
log.Debug("startgc", "requested", c, "target", s.gc.target)
}
// NewMockDbStore creates a new instance of DbStore with
// mockStore set to a provided value. If mockStore argument is nil,
// this function behaves exactly as NewDbStore.
func NewMockDbStore(params *LDBStoreParams, mockStore *mock.NodeStore) (s *LDBStore, err error) {
s, err = NewLDBStore(params)
if err != nil {
return nil, err
}
// replace put and get with mock store functionality
if mockStore != nil {
s.encodeDataFunc = newMockEncodeDataFunc(mockStore)
s.getDataFunc = newMockGetDataFunc(mockStore)
}
return
}
type dpaDBIndex struct {
Idx uint64
Access uint64
}
func BytesToU64(data []byte) uint64 {
if len(data) < 8 {
return 0
}
return binary.BigEndian.Uint64(data)
}
func U64ToBytes(val uint64) []byte {
data := make([]byte, 8)
binary.BigEndian.PutUint64(data, val)
return data
}
func (s *LDBStore) updateIndexAccess(index *dpaDBIndex) {
index.Access = s.accessCnt
}
func getIndexKey(hash Address) []byte {
hashSize := len(hash)
key := make([]byte, hashSize+1)
key[0] = keyIndex
copy(key[1:], hash[:])
return key
}
func getDataKey(idx uint64, po uint8) []byte {
key := make([]byte, 10)
key[0] = keyData
key[1] = po
binary.BigEndian.PutUint64(key[2:], idx)
return key
}
func getGCIdxKey(index *dpaDBIndex) []byte {
key := make([]byte, 9)
key[0] = keyGCIdx
binary.BigEndian.PutUint64(key[1:], index.Access)
return key
}
func getGCIdxValue(index *dpaDBIndex, po uint8, addr Address) []byte {
val := make([]byte, 41) // po = 1, index.Index = 8, Address = 32
val[0] = po
binary.BigEndian.PutUint64(val[1:], index.Idx)
copy(val[9:], addr)
return val
}
func parseIdxKey(key []byte) (byte, []byte) {
return key[0], key[1:]
}
func parseGCIdxEntry(accessCnt []byte, val []byte) (index *dpaDBIndex, po uint8, addr Address) {
index = &dpaDBIndex{
Idx: binary.BigEndian.Uint64(val[1:]),
Access: binary.BigEndian.Uint64(accessCnt),
}
po = val[0]
addr = val[9:]
return
}
func encodeIndex(index *dpaDBIndex) []byte {
data, _ := rlp.EncodeToBytes(index)
return data
}
func encodeData(chunk Chunk) []byte {
// Always create a new underlying array for the returned byte slice.
// The chunk.Address array may be used in the returned slice which
// may be changed later in the code or by the LevelDB, resulting
// that the Address is changed as well.
return append(append([]byte{}, chunk.Address()[:]...), chunk.Data()...)
}
func decodeIndex(data []byte, index *dpaDBIndex) error {
dec := rlp.NewStream(bytes.NewReader(data), 0)
return dec.Decode(index)
}
func decodeData(addr Address, data []byte) (*chunk, error) {
return NewChunk(addr, data[32:]), nil
}
func (s *LDBStore) collectGarbage() error {
// prevent duplicate gc from starting when one is already running
select {
case <-s.gc.runC:
default:
return nil
}
s.lock.Lock()
entryCnt := s.entryCnt
s.lock.Unlock()
metrics.GetOrRegisterCounter("ldbstore.collectgarbage", nil).Inc(1)
// calculate the amount of chunks to collect and reset counter
s.startGC(int(entryCnt))
log.Debug("collectGarbage", "target", s.gc.target, "entryCnt", entryCnt)
var totalDeleted int
for s.gc.count < s.gc.target {
it := s.db.NewIterator()
ok := it.Seek([]byte{keyGCIdx})
var singleIterationCount int
// every batch needs a lock so we avoid entries changing accessidx in the meantime
s.lock.Lock()
for ; ok && (singleIterationCount < s.gc.maxBatch); ok = it.Next() {
// quit if no more access index keys
itkey := it.Key()
if (itkey == nil) || (itkey[0] != keyGCIdx) {
break
}
// get chunk data entry from access index
val := it.Value()
index, po, hash := parseGCIdxEntry(itkey[1:], val)
keyIdx := make([]byte, 33)
keyIdx[0] = keyIndex
copy(keyIdx[1:], hash)
// add delete operation to batch
s.delete(s.gc.batch.Batch, index, keyIdx, po)
singleIterationCount++
s.gc.count++
log.Trace("garbage collect enqueued chunk for deletion", "key", hash)
// break if target is not on max garbage batch boundary
if s.gc.count >= s.gc.target {
break
}
}
s.writeBatch(s.gc.batch, wEntryCnt)
s.lock.Unlock()
it.Release()
log.Trace("garbage collect batch done", "batch", singleIterationCount, "total", s.gc.count)
}
s.gc.runC <- struct{}{}
log.Debug("garbage collect done", "c", s.gc.count)
metrics.GetOrRegisterCounter("ldbstore.collectgarbage.delete", nil).Inc(int64(totalDeleted))
return nil
}
// Export writes all chunks from the store to a tar archive, returning the
// number of chunks written.
func (s *LDBStore) Export(out io.Writer) (int64, error) {
tw := tar.NewWriter(out)
defer tw.Close()
it := s.db.NewIterator()
defer it.Release()
var count int64
for ok := it.Seek([]byte{keyIndex}); ok; ok = it.Next() {
key := it.Key()
if (key == nil) || (key[0] != keyIndex) {
break
}
var index dpaDBIndex
hash := key[1:]
decodeIndex(it.Value(), &index)
po := s.po(hash)
datakey := getDataKey(index.Idx, po)
log.Trace("store.export", "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po)
data, err := s.db.Get(datakey)
if err != nil {
log.Warn(fmt.Sprintf("Chunk %x found but could not be accessed: %v", key, err))
continue
}
hdr := &tar.Header{
Name: hex.EncodeToString(hash),
Mode: 0644,
Size: int64(len(data)),
}
if err := tw.WriteHeader(hdr); err != nil {
return count, err
}
if _, err := tw.Write(data); err != nil {
return count, err
}
count++
}
return count, nil
}
// of chunks read.
func (s *LDBStore) Import(in io.Reader) (int64, error) {
tr := tar.NewReader(in)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
countC := make(chan int64)
errC := make(chan error)
var count int64
go func() {
for {
hdr, err := tr.Next()
if err == io.EOF {
break
} else if err != nil {
select {
case errC <- err:
case <-ctx.Done():
}
}
if len(hdr.Name) != 64 {
log.Warn("ignoring non-chunk file", "name", hdr.Name)
continue
}
keybytes, err := hex.DecodeString(hdr.Name)
if err != nil {
log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err)
continue
}
data, err := ioutil.ReadAll(tr)
if err != nil {
select {
case errC <- err:
case <-ctx.Done():
}
}
key := Address(keybytes)
chunk := NewChunk(key, data[32:])
go func() {
select {
case errC <- s.Put(ctx, chunk):
case <-ctx.Done():
}
}()
count++
}
countC <- count
}()
// wait for all chunks to be stored
i := int64(0)
var total int64
for {
select {
case err := <-errC:
if err != nil {
return count, err
}
i++
case total = <-countC:
case <-ctx.Done():
return i, ctx.Err()
}
if total > 0 && i == total {
return total, nil
}
}
}
// Cleanup iterates over the database and deletes chunks if they pass the `f` condition
func (s *LDBStore) Cleanup(f func(*chunk) bool) {
var errorsFound, removed, total int
it := s.db.NewIterator()
defer it.Release()
for ok := it.Seek([]byte{keyIndex}); ok; ok = it.Next() {
key := it.Key()
if (key == nil) || (key[0] != keyIndex) {
break
}
total++
var index dpaDBIndex
err := decodeIndex(it.Value(), &index)
if err != nil {
log.Warn("Cannot decode")
errorsFound++
continue
}
hash := key[1:]
po := s.po(hash)
datakey := getDataKey(index.Idx, po)
data, err := s.db.Get(datakey)
if err != nil {
found := false
// highest possible proximity is 255
for po = 1; po <= 255; po++ {
datakey = getDataKey(index.Idx, po)
data, err = s.db.Get(datakey)
if err == nil {
found = true
break
}
}
if !found {
log.Warn(fmt.Sprintf("Chunk %x found but count not be accessed with any po", key))
errorsFound++
continue
}
}
ck := data[:32]
c, err := decodeData(ck, data)
if err != nil {
log.Error("decodeData error", "err", err)
continue
}
cs := int64(binary.LittleEndian.Uint64(c.sdata[:8]))
log.Trace("chunk", "key", fmt.Sprintf("%x", key), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.sdata), "size", cs)
// if chunk is to be removed
if f(c) {
log.Warn("chunk for cleanup", "key", fmt.Sprintf("%x", key), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.sdata), "size", cs)
s.deleteNow(&index, getIndexKey(key[1:]), po)
removed++
errorsFound++
}
}
log.Warn(fmt.Sprintf("Found %v errors out of %v entries. Removed %v chunks.", errorsFound, total, removed))
}
// CleanGCIndex rebuilds the garbage collector index from scratch, while
// removing inconsistent elements, e.g., indices with missing data chunks.
// WARN: it's a pretty heavy, long running function.
func (s *LDBStore) CleanGCIndex() error {
s.lock.Lock()
defer s.lock.Unlock()
batch := leveldb.Batch{}
var okEntryCount uint64
var totalEntryCount uint64
// throw out all gc indices, we will rebuild from cleaned index
it := s.db.NewIterator()
it.Seek([]byte{keyGCIdx})
var gcDeletes int
for it.Valid() {
rowType, _ := parseIdxKey(it.Key())
if rowType != keyGCIdx {
break
}
batch.Delete(it.Key())
gcDeletes++
it.Next()
}
log.Debug("gc", "deletes", gcDeletes)
if err := s.db.Write(&batch); err != nil {
return err
}
batch.Reset()
it.Release()
// corrected po index pointer values
var poPtrs [256]uint64
// set to true if chunk count not on 4096 iteration boundary
var doneIterating bool
// last key index in previous iteration
lastIdxKey := []byte{keyIndex}
// counter for debug output
var cleanBatchCount int
// go through all key index entries
for !doneIterating {
cleanBatchCount++
var idxs []dpaDBIndex
var chunkHashes [][]byte
var pos []uint8
it := s.db.NewIterator()
it.Seek(lastIdxKey)
// 4096 is just a nice number, don't look for any hidden meaning here...
var i int
for i = 0; i < 4096; i++ {
// this really shouldn't happen unless database is empty
// but let's keep it to be safe
if !it.Valid() {
doneIterating = true
break
}
// if it's not keyindex anymore we're done iterating
rowType, chunkHash := parseIdxKey(it.Key())
if rowType != keyIndex {
doneIterating = true
break
}
// decode the retrieved index
var idx dpaDBIndex
err := decodeIndex(it.Value(), &idx)
if err != nil {
return fmt.Errorf("corrupt index: %v", err)
}
po := s.po(chunkHash)
lastIdxKey = it.Key()
// if we don't find the data key, remove the entry
// if we find it, add to the array of new gc indices to create
dataKey := getDataKey(idx.Idx, po)
_, err = s.db.Get(dataKey)
if err != nil {
log.Warn("deleting inconsistent index (missing data)", "key", chunkHash)
batch.Delete(it.Key())
} else {
idxs = append(idxs, idx)
chunkHashes = append(chunkHashes, chunkHash)
pos = append(pos, po)
okEntryCount++
if idx.Idx > poPtrs[po] {
poPtrs[po] = idx.Idx
}
}
totalEntryCount++
it.Next()
}
it.Release()
// flush the key index corrections
err := s.db.Write(&batch)
if err != nil {
return err
}
batch.Reset()
// add correct gc indices
for i, okIdx := range idxs {
gcIdxKey := getGCIdxKey(&okIdx)
gcIdxData := getGCIdxValue(&okIdx, pos[i], chunkHashes[i])
batch.Put(gcIdxKey, gcIdxData)
log.Trace("clean ok", "key", chunkHashes[i], "gcKey", gcIdxKey, "gcData", gcIdxData)
}
// flush them
err = s.db.Write(&batch)
if err != nil {
return err
}
batch.Reset()
log.Debug("clean gc index pass", "batch", cleanBatchCount, "checked", i, "kept", len(idxs))
}
log.Debug("gc cleanup entries", "ok", okEntryCount, "total", totalEntryCount, "batchlen", batch.Len())
// lastly add updated entry count
var entryCount [8]byte
binary.BigEndian.PutUint64(entryCount[:], okEntryCount)
batch.Put(keyEntryCnt, entryCount[:])
// and add the new po index pointers
var poKey [2]byte
poKey[0] = keyDistanceCnt
for i, poPtr := range poPtrs {
poKey[1] = uint8(i)
if poPtr == 0 {
batch.Delete(poKey[:])
} else {
var idxCount [8]byte
binary.BigEndian.PutUint64(idxCount[:], poPtr)
batch.Put(poKey[:], idxCount[:])
}
}
// if you made it this far your harddisk has survived. Congratulations
return s.db.Write(&batch)
}
// Delete is removes a chunk and updates indices.
// Is thread safe
func (s *LDBStore) Delete(addr Address) error {
s.lock.Lock()
defer s.lock.Unlock()
ikey := getIndexKey(addr)
idata, err := s.db.Get(ikey)
if err != nil {
return err
}
var idx dpaDBIndex
decodeIndex(idata, &idx)
proximity := s.po(addr)
return s.deleteNow(&idx, ikey, proximity)
}
// executes one delete operation immediately
// see *LDBStore.delete
func (s *LDBStore) deleteNow(idx *dpaDBIndex, idxKey []byte, po uint8) error {
batch := new(leveldb.Batch)
s.delete(batch, idx, idxKey, po)
return s.db.Write(batch)
}
// adds a delete chunk operation to the provided batch
// if called directly, decrements entrycount regardless if the chunk exists upon deletion. Risk of wrap to max uint64
func (s *LDBStore) delete(batch *leveldb.Batch, idx *dpaDBIndex, idxKey []byte, po uint8) {
metrics.GetOrRegisterCounter("ldbstore.delete", nil).Inc(1)
gcIdxKey := getGCIdxKey(idx)
batch.Delete(gcIdxKey)
dataKey := getDataKey(idx.Idx, po)
batch.Delete(dataKey)
batch.Delete(idxKey)
s.entryCnt--
dbEntryCount.Dec(1)
cntKey := make([]byte, 2)
cntKey[0] = keyDistanceCnt
cntKey[1] = po
batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
batch.Put(cntKey, U64ToBytes(s.bucketCnt[po]))
}
func (s *LDBStore) BinIndex(po uint8) uint64 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.bucketCnt[po]
}
func (s *LDBStore) Size() uint64 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.entryCnt
}
func (s *LDBStore) CurrentStorageIndex() uint64 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.dataIdx
}
// Put adds a chunk to the database, adding indices and incrementing global counters.
// If it already exists, it merely increments the access count of the existing entry.
// Is thread safe
func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error {
metrics.GetOrRegisterCounter("ldbstore.put", nil).Inc(1)
log.Trace("ldbstore.put", "key", chunk.Address())
ikey := getIndexKey(chunk.Address())
var index dpaDBIndex
po := s.po(chunk.Address())
s.lock.Lock()
if s.closed {
s.lock.Unlock()
return ErrDBClosed
}
batch := s.batch
log.Trace("ldbstore.put: s.db.Get", "key", chunk.Address(), "ikey", fmt.Sprintf("%x", ikey))
idata, err := s.db.Get(ikey)
if err != nil {
s.doPut(chunk, &index, po)
}
idata = encodeIndex(&index)
s.batch.Put(ikey, idata)
// add the access-chunkindex index for garbage collection
gcIdxKey := getGCIdxKey(&index)
gcIdxData := getGCIdxValue(&index, po, chunk.Address())
s.batch.Put(gcIdxKey, gcIdxData)
s.lock.Unlock()
select {
case s.batchesC <- struct{}{}:
default:
}
select {
case <-batch.c:
return batch.err
case <-ctx.Done():
return ctx.Err()
}
}
// force putting into db, does not check or update necessary indices
func (s *LDBStore) doPut(chunk Chunk, index *dpaDBIndex, po uint8) {
data := s.encodeDataFunc(chunk)
dkey := getDataKey(s.dataIdx, po)
s.batch.Put(dkey, data)
index.Idx = s.dataIdx
s.bucketCnt[po] = s.dataIdx
s.entryCnt++
dbEntryCount.Inc(1)
s.dataIdx++
index.Access = s.accessCnt
s.accessCnt++
cntKey := make([]byte, 2)
cntKey[0] = keyDistanceCnt
cntKey[1] = po
s.batch.Put(cntKey, U64ToBytes(s.bucketCnt[po]))
}
func (s *LDBStore) writeBatches() {
for {
select {
case <-s.quit:
log.Debug("DbStore: quit batch write loop")
return
case <-s.batchesC:
err := s.writeCurrentBatch()
if err != nil {
log.Debug("DbStore: quit batch write loop", "err", err.Error())
return
}
}
}
}
func (s *LDBStore) writeCurrentBatch() error {
s.lock.Lock()
defer s.lock.Unlock()
b := s.batch
l := b.Len()
if l == 0 {
return nil
}
s.batch = newBatch()
b.err = s.writeBatch(b, wEntryCnt|wAccessCnt|wIndexCnt)
close(b.c)
if s.entryCnt >= s.capacity {
go s.collectGarbage()
}
return nil
}
// must be called non concurrently
func (s *LDBStore) writeBatch(b *dbBatch, wFlag uint8) error {
if wFlag&wEntryCnt > 0 {
b.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
}
if wFlag&wIndexCnt > 0 {
b.Put(keyDataIdx, U64ToBytes(s.dataIdx))
}
if wFlag&wAccessCnt > 0 {
b.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
}
l := b.Len()
if err := s.db.Write(b.Batch); err != nil {
return fmt.Errorf("unable to write batch: %v", err)
}
log.Trace(fmt.Sprintf("batch write (%d entries)", l))
return nil
}
// newMockEncodeDataFunc returns a function that stores the chunk data
// to a mock store to bypass the default functionality encodeData.
// The constructed function always returns the nil data, as DbStore does
// not need to store the data, but still need to create the index.
func newMockEncodeDataFunc(mockStore *mock.NodeStore) func(chunk Chunk) []byte {
return func(chunk Chunk) []byte {
if err := mockStore.Put(chunk.Address(), encodeData(chunk)); err != nil {
log.Error(fmt.Sprintf("%T: Chunk %v put: %v", mockStore, chunk.Address().Log(), err))
}
return chunk.Address()[:]
}
}
// tryAccessIdx tries to find index entry. If found then increments the access
// count for garbage collection and returns the index entry and true for found,
// otherwise returns nil and false.
func (s *LDBStore) tryAccessIdx(addr Address, po uint8) (*dpaDBIndex, bool) {
ikey := getIndexKey(addr)
idata, err := s.db.Get(ikey)
if err != nil {
return nil, false
}
index := new(dpaDBIndex)
decodeIndex(idata, index)
oldGCIdxKey := getGCIdxKey(index)
s.batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
index.Access = s.accessCnt
idata = encodeIndex(index)
s.accessCnt++
s.batch.Put(ikey, idata)
newGCIdxKey := getGCIdxKey(index)
newGCIdxData := getGCIdxValue(index, po, ikey[1:])
s.batch.Delete(oldGCIdxKey)
s.batch.Put(newGCIdxKey, newGCIdxData)
select {
case s.batchesC <- struct{}{}:
default:
}
return index, true
}
// GetSchema is returning the current named schema of the datastore as read from LevelDB
func (s *LDBStore) GetSchema() (string, error) {
s.lock.Lock()
defer s.lock.Unlock()
data, err := s.db.Get(keySchema)
if err != nil {
if err == leveldb.ErrNotFound {
return DbSchemaNone, nil
}
return "", err
}
return string(data), nil
}
// PutSchema is saving a named schema to the LevelDB datastore
func (s *LDBStore) PutSchema(schema string) error {
s.lock.Lock()
defer s.lock.Unlock()
return s.db.Put(keySchema, []byte(schema))
}
// Get retrieves the chunk matching the provided key from the database.
// If the chunk entry does not exist, it returns an error
// Updates access count and is thread safe
func (s *LDBStore) Get(_ context.Context, addr Address) (chunk Chunk, err error) {
metrics.GetOrRegisterCounter("ldbstore.get", nil).Inc(1)
log.Trace("ldbstore.get", "key", addr)
s.lock.Lock()
defer s.lock.Unlock()
return s.get(addr)
}
// TODO: To conform with other private methods of this object indices should not be updated
func (s *LDBStore) get(addr Address) (chunk *chunk, err error) {
if s.closed {
return nil, ErrDBClosed
}
proximity := s.po(addr)
index, found := s.tryAccessIdx(addr, proximity)
if found {
var data []byte
if s.getDataFunc != nil {
// if getDataFunc is defined, use it to retrieve the chunk data
log.Trace("ldbstore.get retrieve with getDataFunc", "key", addr)
data, err = s.getDataFunc(addr)
if err != nil {
return
}
} else {
// default DbStore functionality to retrieve chunk data
datakey := getDataKey(index.Idx, proximity)
data, err = s.db.Get(datakey)
log.Trace("ldbstore.get retrieve", "key", addr, "indexkey", index.Idx, "datakey", fmt.Sprintf("%x", datakey), "proximity", proximity)
if err != nil {
log.Trace("ldbstore.get chunk found but could not be accessed", "key", addr, "err", err)
s.deleteNow(index, getIndexKey(addr), s.po(addr))
return
}
}
return decodeData(addr, data)
} else {
err = ErrChunkNotFound
}
return
}
// newMockGetFunc returns a function that reads chunk data from
// the mock database, which is used as the value for DbStore.getFunc
// to bypass the default functionality of DbStore with a mock store.
func newMockGetDataFunc(mockStore *mock.NodeStore) func(addr Address) (data []byte, err error) {
return func(addr Address) (data []byte, err error) {
data, err = mockStore.Get(addr)
if err == mock.ErrNotFound {
// preserve ErrChunkNotFound error
err = ErrChunkNotFound
}
return data, err
}
}
func (s *LDBStore) setCapacity(c uint64) {
s.lock.Lock()
defer s.lock.Unlock()
s.capacity = c
for s.entryCnt > c {
s.collectGarbage()
}
}
func (s *LDBStore) Close() {
close(s.quit)
s.lock.Lock()
s.closed = true
s.lock.Unlock()
// force writing out current batch
s.writeCurrentBatch()
close(s.batchesC)
s.db.Close()
}
// SyncIterator(start, stop, po, f) calls f on each hash of a bin po from start to stop
func (s *LDBStore) SyncIterator(since uint64, until uint64, po uint8, f func(Address, uint64) bool) error {
metrics.GetOrRegisterCounter("ldbstore.synciterator", nil).Inc(1)
sincekey := getDataKey(since, po)
untilkey := getDataKey(until, po)
it := s.db.NewIterator()
defer it.Release()
for ok := it.Seek(sincekey); ok; ok = it.Next() {
metrics.GetOrRegisterCounter("ldbstore.synciterator.seek", nil).Inc(1)
dbkey := it.Key()
if dbkey[0] != keyData || dbkey[1] != po || bytes.Compare(untilkey, dbkey) < 0 {
break
}
key := make([]byte, 32)
val := it.Value()
copy(key, val[:32])
if !f(Address(key), binary.BigEndian.Uint64(dbkey[2:])) {
break
}
}
return it.Error()
}