mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-09 12:31:21 +00:00
289b30715d
This commit converts the dependency management from Godeps to the vendor folder, also switching the tool from godep to trash. Since the upstream tool lacks a few features proposed via a few PRs, until those PRs are merged in (if), use github.com/karalabe/trash. You can update dependencies via trash --update. All dependencies have been updated to their latest version. Parts of the build system are reworked to drop old notions of Godeps and invocation of the go vet command so that it doesn't run against the vendor folder, as that will just blow up during vetting. The conversion drops OpenCL (and hence GPU mining support) from ethash and our codebase. The short reasoning is that there's noone to maintain and having opencl libs in our deps messes up builds as go install ./... tries to build them, failing with unsatisfied link errors for the C OpenCL deps. golang.org/x/net/context is not vendored in. We expect it to be fetched by the user (i.e. using go get). To keep ci.go builds reproducible the package is "vendored" in build/_vendor.
444 lines
10 KiB
Go
444 lines
10 KiB
Go
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
|
|
// All rights reserved.
|
|
//
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file.
|
|
|
|
package leveldb
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/syndtr/goleveldb/leveldb/memdb"
|
|
"github.com/syndtr/goleveldb/leveldb/opt"
|
|
"github.com/syndtr/goleveldb/leveldb/util"
|
|
)
|
|
|
|
func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error {
|
|
wr, err := db.journal.Next()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := writeBatchesWithHeader(wr, batches, seq); err != nil {
|
|
return err
|
|
}
|
|
if err := db.journal.Flush(); err != nil {
|
|
return err
|
|
}
|
|
if sync {
|
|
return db.journalWriter.Sync()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
|
|
// Wait for pending memdb compaction.
|
|
err = db.compTriggerWait(db.mcompCmdC)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// Create new memdb and journal.
|
|
mem, err = db.newMem(n)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// Schedule memdb compaction.
|
|
if wait {
|
|
err = db.compTriggerWait(db.mcompCmdC)
|
|
} else {
|
|
db.compTrigger(db.mcompCmdC)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
|
|
delayed := false
|
|
slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger()
|
|
pauseTrigger := db.s.o.GetWriteL0PauseTrigger()
|
|
flush := func() (retry bool) {
|
|
mdb = db.getEffectiveMem()
|
|
if mdb == nil {
|
|
err = ErrClosed
|
|
return false
|
|
}
|
|
defer func() {
|
|
if retry {
|
|
mdb.decref()
|
|
mdb = nil
|
|
}
|
|
}()
|
|
tLen := db.s.tLen(0)
|
|
mdbFree = mdb.Free()
|
|
switch {
|
|
case tLen >= slowdownTrigger && !delayed:
|
|
delayed = true
|
|
time.Sleep(time.Millisecond)
|
|
case mdbFree >= n:
|
|
return false
|
|
case tLen >= pauseTrigger:
|
|
delayed = true
|
|
err = db.compTriggerWait(db.tcompCmdC)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
default:
|
|
// Allow memdb to grow if it has no entry.
|
|
if mdb.Len() == 0 {
|
|
mdbFree = n
|
|
} else {
|
|
mdb.decref()
|
|
mdb, err = db.rotateMem(n, false)
|
|
if err == nil {
|
|
mdbFree = mdb.Free()
|
|
} else {
|
|
mdbFree = 0
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
start := time.Now()
|
|
for flush() {
|
|
}
|
|
if delayed {
|
|
db.writeDelay += time.Since(start)
|
|
db.writeDelayN++
|
|
} else if db.writeDelayN > 0 {
|
|
db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
|
|
db.writeDelay = 0
|
|
db.writeDelayN = 0
|
|
}
|
|
return
|
|
}
|
|
|
|
type writeMerge struct {
|
|
sync bool
|
|
batch *Batch
|
|
keyType keyType
|
|
key, value []byte
|
|
}
|
|
|
|
func (db *DB) unlockWrite(overflow bool, merged int, err error) {
|
|
for i := 0; i < merged; i++ {
|
|
db.writeAckC <- err
|
|
}
|
|
if overflow {
|
|
// Pass lock to the next write (that failed to merge).
|
|
db.writeMergedC <- false
|
|
} else {
|
|
// Release lock.
|
|
<-db.writeLockC
|
|
}
|
|
}
|
|
|
|
// ourBatch if defined should equal with batch.
|
|
func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
|
|
// Try to flush memdb. This method would also trying to throttle writes
|
|
// if it is too fast and compaction cannot catch-up.
|
|
mdb, mdbFree, err := db.flush(batch.internalLen)
|
|
if err != nil {
|
|
db.unlockWrite(false, 0, err)
|
|
return err
|
|
}
|
|
defer mdb.decref()
|
|
|
|
var (
|
|
overflow bool
|
|
merged int
|
|
batches = []*Batch{batch}
|
|
)
|
|
|
|
if merge {
|
|
// Merge limit.
|
|
var mergeLimit int
|
|
if batch.internalLen > 128<<10 {
|
|
mergeLimit = (1 << 20) - batch.internalLen
|
|
} else {
|
|
mergeLimit = 128 << 10
|
|
}
|
|
mergeCap := mdbFree - batch.internalLen
|
|
if mergeLimit > mergeCap {
|
|
mergeLimit = mergeCap
|
|
}
|
|
|
|
merge:
|
|
for mergeLimit > 0 {
|
|
select {
|
|
case incoming := <-db.writeMergeC:
|
|
if incoming.batch != nil {
|
|
// Merge batch.
|
|
if incoming.batch.internalLen > mergeLimit {
|
|
overflow = true
|
|
break merge
|
|
}
|
|
batches = append(batches, incoming.batch)
|
|
mergeLimit -= incoming.batch.internalLen
|
|
} else {
|
|
// Merge put.
|
|
internalLen := len(incoming.key) + len(incoming.value) + 8
|
|
if internalLen > mergeLimit {
|
|
overflow = true
|
|
break merge
|
|
}
|
|
if ourBatch == nil {
|
|
ourBatch = db.batchPool.Get().(*Batch)
|
|
ourBatch.Reset()
|
|
batches = append(batches, ourBatch)
|
|
}
|
|
// We can use same batch since concurrent write doesn't
|
|
// guarantee write order.
|
|
ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value)
|
|
mergeLimit -= internalLen
|
|
}
|
|
sync = sync || incoming.sync
|
|
merged++
|
|
db.writeMergedC <- true
|
|
|
|
default:
|
|
break merge
|
|
}
|
|
}
|
|
}
|
|
|
|
// Seq number.
|
|
seq := db.seq + 1
|
|
|
|
// Write journal.
|
|
if err := db.writeJournal(batches, seq, sync); err != nil {
|
|
db.unlockWrite(overflow, merged, err)
|
|
return err
|
|
}
|
|
|
|
// Put batches.
|
|
for _, batch := range batches {
|
|
if err := batch.putMem(seq, mdb.DB); err != nil {
|
|
panic(err)
|
|
}
|
|
seq += uint64(batch.Len())
|
|
}
|
|
|
|
// Incr seq number.
|
|
db.addSeq(uint64(batchesLen(batches)))
|
|
|
|
// Rotate memdb if it's reach the threshold.
|
|
if batch.internalLen >= mdbFree {
|
|
db.rotateMem(0, false)
|
|
}
|
|
|
|
db.unlockWrite(overflow, merged, nil)
|
|
return nil
|
|
}
|
|
|
|
// Write apply the given batch to the DB. The batch records will be applied
|
|
// sequentially. Write might be used concurrently, when used concurrently and
|
|
// batch is small enough, write will try to merge the batches. Set NoWriteMerge
|
|
// option to true to disable write merge.
|
|
//
|
|
// It is safe to modify the contents of the arguments after Write returns but
|
|
// not before. Write will not modify content of the batch.
|
|
func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error {
|
|
if err := db.ok(); err != nil || batch == nil || batch.Len() == 0 {
|
|
return err
|
|
}
|
|
|
|
// If the batch size is larger than write buffer, it may justified to write
|
|
// using transaction instead. Using transaction the batch will be written
|
|
// into tables directly, skipping the journaling.
|
|
if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
|
|
tr, err := db.OpenTransaction()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := tr.Write(batch, wo); err != nil {
|
|
tr.Discard()
|
|
return err
|
|
}
|
|
return tr.Commit()
|
|
}
|
|
|
|
merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
|
|
sync := wo.GetSync() && !db.s.o.GetNoSync()
|
|
|
|
// Acquire write lock.
|
|
if merge {
|
|
select {
|
|
case db.writeMergeC <- writeMerge{sync: sync, batch: batch}:
|
|
if <-db.writeMergedC {
|
|
// Write is merged.
|
|
return <-db.writeAckC
|
|
}
|
|
// Write is not merged, the write lock is handed to us. Continue.
|
|
case db.writeLockC <- struct{}{}:
|
|
// Write lock acquired.
|
|
case err := <-db.compPerErrC:
|
|
// Compaction error.
|
|
return err
|
|
case <-db.closeC:
|
|
// Closed
|
|
return ErrClosed
|
|
}
|
|
} else {
|
|
select {
|
|
case db.writeLockC <- struct{}{}:
|
|
// Write lock acquired.
|
|
case err := <-db.compPerErrC:
|
|
// Compaction error.
|
|
return err
|
|
case <-db.closeC:
|
|
// Closed
|
|
return ErrClosed
|
|
}
|
|
}
|
|
|
|
return db.writeLocked(batch, nil, merge, sync)
|
|
}
|
|
|
|
func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error {
|
|
if err := db.ok(); err != nil {
|
|
return err
|
|
}
|
|
|
|
merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
|
|
sync := wo.GetSync() && !db.s.o.GetNoSync()
|
|
|
|
// Acquire write lock.
|
|
if merge {
|
|
select {
|
|
case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}:
|
|
if <-db.writeMergedC {
|
|
// Write is merged.
|
|
return <-db.writeAckC
|
|
}
|
|
// Write is not merged, the write lock is handed to us. Continue.
|
|
case db.writeLockC <- struct{}{}:
|
|
// Write lock acquired.
|
|
case err := <-db.compPerErrC:
|
|
// Compaction error.
|
|
return err
|
|
case <-db.closeC:
|
|
// Closed
|
|
return ErrClosed
|
|
}
|
|
} else {
|
|
select {
|
|
case db.writeLockC <- struct{}{}:
|
|
// Write lock acquired.
|
|
case err := <-db.compPerErrC:
|
|
// Compaction error.
|
|
return err
|
|
case <-db.closeC:
|
|
// Closed
|
|
return ErrClosed
|
|
}
|
|
}
|
|
|
|
batch := db.batchPool.Get().(*Batch)
|
|
batch.Reset()
|
|
batch.appendRec(kt, key, value)
|
|
return db.writeLocked(batch, batch, merge, sync)
|
|
}
|
|
|
|
// Put sets the value for the given key. It overwrites any previous value
|
|
// for that key; a DB is not a multi-map. Write merge also applies for Put, see
|
|
// Write.
|
|
//
|
|
// It is safe to modify the contents of the arguments after Put returns but not
|
|
// before.
|
|
func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
|
|
return db.putRec(keyTypeVal, key, value, wo)
|
|
}
|
|
|
|
// Delete deletes the value for the given key. Delete will not returns error if
|
|
// key doesn't exist. Write merge also applies for Delete, see Write.
|
|
//
|
|
// It is safe to modify the contents of the arguments after Delete returns but
|
|
// not before.
|
|
func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
|
|
return db.putRec(keyTypeDel, key, nil, wo)
|
|
}
|
|
|
|
func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool {
|
|
iter := mem.NewIterator(nil)
|
|
defer iter.Release()
|
|
return (max == nil || (iter.First() && icmp.uCompare(max, internalKey(iter.Key()).ukey()) >= 0)) &&
|
|
(min == nil || (iter.Last() && icmp.uCompare(min, internalKey(iter.Key()).ukey()) <= 0))
|
|
}
|
|
|
|
// CompactRange compacts the underlying DB for the given key range.
|
|
// In particular, deleted and overwritten versions are discarded,
|
|
// and the data is rearranged to reduce the cost of operations
|
|
// needed to access the data. This operation should typically only
|
|
// be invoked by users who understand the underlying implementation.
|
|
//
|
|
// A nil Range.Start is treated as a key before all keys in the DB.
|
|
// And a nil Range.Limit is treated as a key after all keys in the DB.
|
|
// Therefore if both is nil then it will compact entire DB.
|
|
func (db *DB) CompactRange(r util.Range) error {
|
|
if err := db.ok(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Lock writer.
|
|
select {
|
|
case db.writeLockC <- struct{}{}:
|
|
case err := <-db.compPerErrC:
|
|
return err
|
|
case <-db.closeC:
|
|
return ErrClosed
|
|
}
|
|
|
|
// Check for overlaps in memdb.
|
|
mdb := db.getEffectiveMem()
|
|
if mdb == nil {
|
|
return ErrClosed
|
|
}
|
|
defer mdb.decref()
|
|
if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) {
|
|
// Memdb compaction.
|
|
if _, err := db.rotateMem(0, false); err != nil {
|
|
<-db.writeLockC
|
|
return err
|
|
}
|
|
<-db.writeLockC
|
|
if err := db.compTriggerWait(db.mcompCmdC); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
<-db.writeLockC
|
|
}
|
|
|
|
// Table compaction.
|
|
return db.compTriggerRange(db.tcompCmdC, -1, r.Start, r.Limit)
|
|
}
|
|
|
|
// SetReadOnly makes DB read-only. It will stay read-only until reopened.
|
|
func (db *DB) SetReadOnly() error {
|
|
if err := db.ok(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Lock writer.
|
|
select {
|
|
case db.writeLockC <- struct{}{}:
|
|
db.compWriteLocking = true
|
|
case err := <-db.compPerErrC:
|
|
return err
|
|
case <-db.closeC:
|
|
return ErrClosed
|
|
}
|
|
|
|
// Set compaction read-only.
|
|
select {
|
|
case db.compErrSetC <- ErrReadOnly:
|
|
case perr := <-db.compPerErrC:
|
|
return perr
|
|
case <-db.closeC:
|
|
return ErrClosed
|
|
}
|
|
|
|
return nil
|
|
}
|