erigon-pulse/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache.go
Péter Szilágyi 289b30715d Godeps, vendor: convert dependency management to trash (#3198)
This commit converts the dependency management from Godeps to the vendor
folder, also switching the tool from godep to trash. Since the upstream tool
lacks a few features proposed via a few PRs, until those PRs are merged in
(if), use github.com/karalabe/trash.

You can update dependencies via trash --update.

All dependencies have been updated to their latest version.

Parts of the build system are reworked to drop old notions of Godeps and
invocation of the go vet command so that it doesn't run against the vendor
folder, as that will just blow up during vetting.

The conversion drops OpenCL (and hence GPU mining support) from ethash and our
codebase. The short reasoning is that there's noone to maintain and having
opencl libs in our deps messes up builds as go install ./... tries to build
them, failing with unsatisfied link errors for the C OpenCL deps.

golang.org/x/net/context is not vendored in. We expect it to be fetched by the
user (i.e. using go get). To keep ci.go builds reproducible the package is
"vendored" in build/_vendor.
2016-10-28 19:05:01 +02:00

706 lines
14 KiB
Go

// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// Package cache provides interface and implementation of a cache algorithms.
package cache
import (
"sync"
"sync/atomic"
"unsafe"
"github.com/syndtr/goleveldb/leveldb/util"
)
// Cacher provides interface to implements a caching functionality.
// An implementation must be safe for concurrent use.
type Cacher interface {
// Capacity returns cache capacity.
Capacity() int
// SetCapacity sets cache capacity.
SetCapacity(capacity int)
// Promote promotes the 'cache node'.
Promote(n *Node)
// Ban evicts the 'cache node' and prevent subsequent 'promote'.
Ban(n *Node)
// Evict evicts the 'cache node'.
Evict(n *Node)
// EvictNS evicts 'cache node' with the given namespace.
EvictNS(ns uint64)
// EvictAll evicts all 'cache node'.
EvictAll()
// Close closes the 'cache tree'
Close() error
}
// Value is a 'cacheable object'. It may implements util.Releaser, if
// so the the Release method will be called once object is released.
type Value interface{}
// NamespaceGetter provides convenient wrapper for namespace.
type NamespaceGetter struct {
Cache *Cache
NS uint64
}
// Get simply calls Cache.Get() method.
func (g *NamespaceGetter) Get(key uint64, setFunc func() (size int, value Value)) *Handle {
return g.Cache.Get(g.NS, key, setFunc)
}
// The hash tables implementation is based on:
// "Dynamic-Sized Nonblocking Hash Tables", by Yujie Liu,
// Kunlong Zhang, and Michael Spear.
// ACM Symposium on Principles of Distributed Computing, Jul 2014.
const (
mInitialSize = 1 << 4
mOverflowThreshold = 1 << 5
mOverflowGrowThreshold = 1 << 7
)
type mBucket struct {
mu sync.Mutex
node []*Node
frozen bool
}
func (b *mBucket) freeze() []*Node {
b.mu.Lock()
defer b.mu.Unlock()
if !b.frozen {
b.frozen = true
}
return b.node
}
func (b *mBucket) get(r *Cache, h *mNode, hash uint32, ns, key uint64, noset bool) (done, added bool, n *Node) {
b.mu.Lock()
if b.frozen {
b.mu.Unlock()
return
}
// Scan the node.
for _, n := range b.node {
if n.hash == hash && n.ns == ns && n.key == key {
atomic.AddInt32(&n.ref, 1)
b.mu.Unlock()
return true, false, n
}
}
// Get only.
if noset {
b.mu.Unlock()
return true, false, nil
}
// Create node.
n = &Node{
r: r,
hash: hash,
ns: ns,
key: key,
ref: 1,
}
// Add node to bucket.
b.node = append(b.node, n)
bLen := len(b.node)
b.mu.Unlock()
// Update counter.
grow := atomic.AddInt32(&r.nodes, 1) >= h.growThreshold
if bLen > mOverflowThreshold {
grow = grow || atomic.AddInt32(&h.overflow, 1) >= mOverflowGrowThreshold
}
// Grow.
if grow && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) {
nhLen := len(h.buckets) << 1
nh := &mNode{
buckets: make([]unsafe.Pointer, nhLen),
mask: uint32(nhLen) - 1,
pred: unsafe.Pointer(h),
growThreshold: int32(nhLen * mOverflowThreshold),
shrinkThreshold: int32(nhLen >> 1),
}
ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh))
if !ok {
panic("BUG: failed swapping head")
}
go nh.initBuckets()
}
return true, true, n
}
func (b *mBucket) delete(r *Cache, h *mNode, hash uint32, ns, key uint64) (done, deleted bool) {
b.mu.Lock()
if b.frozen {
b.mu.Unlock()
return
}
// Scan the node.
var (
n *Node
bLen int
)
for i := range b.node {
n = b.node[i]
if n.ns == ns && n.key == key {
if atomic.LoadInt32(&n.ref) == 0 {
deleted = true
// Call releaser.
if n.value != nil {
if r, ok := n.value.(util.Releaser); ok {
r.Release()
}
n.value = nil
}
// Remove node from bucket.
b.node = append(b.node[:i], b.node[i+1:]...)
bLen = len(b.node)
}
break
}
}
b.mu.Unlock()
if deleted {
// Call OnDel.
for _, f := range n.onDel {
f()
}
// Update counter.
atomic.AddInt32(&r.size, int32(n.size)*-1)
shrink := atomic.AddInt32(&r.nodes, -1) < h.shrinkThreshold
if bLen >= mOverflowThreshold {
atomic.AddInt32(&h.overflow, -1)
}
// Shrink.
if shrink && len(h.buckets) > mInitialSize && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) {
nhLen := len(h.buckets) >> 1
nh := &mNode{
buckets: make([]unsafe.Pointer, nhLen),
mask: uint32(nhLen) - 1,
pred: unsafe.Pointer(h),
growThreshold: int32(nhLen * mOverflowThreshold),
shrinkThreshold: int32(nhLen >> 1),
}
ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh))
if !ok {
panic("BUG: failed swapping head")
}
go nh.initBuckets()
}
}
return true, deleted
}
type mNode struct {
buckets []unsafe.Pointer // []*mBucket
mask uint32
pred unsafe.Pointer // *mNode
resizeInProgess int32
overflow int32
growThreshold int32
shrinkThreshold int32
}
func (n *mNode) initBucket(i uint32) *mBucket {
if b := (*mBucket)(atomic.LoadPointer(&n.buckets[i])); b != nil {
return b
}
p := (*mNode)(atomic.LoadPointer(&n.pred))
if p != nil {
var node []*Node
if n.mask > p.mask {
// Grow.
pb := (*mBucket)(atomic.LoadPointer(&p.buckets[i&p.mask]))
if pb == nil {
pb = p.initBucket(i & p.mask)
}
m := pb.freeze()
// Split nodes.
for _, x := range m {
if x.hash&n.mask == i {
node = append(node, x)
}
}
} else {
// Shrink.
pb0 := (*mBucket)(atomic.LoadPointer(&p.buckets[i]))
if pb0 == nil {
pb0 = p.initBucket(i)
}
pb1 := (*mBucket)(atomic.LoadPointer(&p.buckets[i+uint32(len(n.buckets))]))
if pb1 == nil {
pb1 = p.initBucket(i + uint32(len(n.buckets)))
}
m0 := pb0.freeze()
m1 := pb1.freeze()
// Merge nodes.
node = make([]*Node, 0, len(m0)+len(m1))
node = append(node, m0...)
node = append(node, m1...)
}
b := &mBucket{node: node}
if atomic.CompareAndSwapPointer(&n.buckets[i], nil, unsafe.Pointer(b)) {
if len(node) > mOverflowThreshold {
atomic.AddInt32(&n.overflow, int32(len(node)-mOverflowThreshold))
}
return b
}
}
return (*mBucket)(atomic.LoadPointer(&n.buckets[i]))
}
func (n *mNode) initBuckets() {
for i := range n.buckets {
n.initBucket(uint32(i))
}
atomic.StorePointer(&n.pred, nil)
}
// Cache is a 'cache map'.
type Cache struct {
mu sync.RWMutex
mHead unsafe.Pointer // *mNode
nodes int32
size int32
cacher Cacher
closed bool
}
// NewCache creates a new 'cache map'. The cacher is optional and
// may be nil.
func NewCache(cacher Cacher) *Cache {
h := &mNode{
buckets: make([]unsafe.Pointer, mInitialSize),
mask: mInitialSize - 1,
growThreshold: int32(mInitialSize * mOverflowThreshold),
shrinkThreshold: 0,
}
for i := range h.buckets {
h.buckets[i] = unsafe.Pointer(&mBucket{})
}
r := &Cache{
mHead: unsafe.Pointer(h),
cacher: cacher,
}
return r
}
func (r *Cache) getBucket(hash uint32) (*mNode, *mBucket) {
h := (*mNode)(atomic.LoadPointer(&r.mHead))
i := hash & h.mask
b := (*mBucket)(atomic.LoadPointer(&h.buckets[i]))
if b == nil {
b = h.initBucket(i)
}
return h, b
}
func (r *Cache) delete(n *Node) bool {
for {
h, b := r.getBucket(n.hash)
done, deleted := b.delete(r, h, n.hash, n.ns, n.key)
if done {
return deleted
}
}
return false
}
// Nodes returns number of 'cache node' in the map.
func (r *Cache) Nodes() int {
return int(atomic.LoadInt32(&r.nodes))
}
// Size returns sums of 'cache node' size in the map.
func (r *Cache) Size() int {
return int(atomic.LoadInt32(&r.size))
}
// Capacity returns cache capacity.
func (r *Cache) Capacity() int {
if r.cacher == nil {
return 0
}
return r.cacher.Capacity()
}
// SetCapacity sets cache capacity.
func (r *Cache) SetCapacity(capacity int) {
if r.cacher != nil {
r.cacher.SetCapacity(capacity)
}
}
// Get gets 'cache node' with the given namespace and key.
// If cache node is not found and setFunc is not nil, Get will atomically creates
// the 'cache node' by calling setFunc. Otherwise Get will returns nil.
//
// The returned 'cache handle' should be released after use by calling Release
// method.
func (r *Cache) Get(ns, key uint64, setFunc func() (size int, value Value)) *Handle {
r.mu.RLock()
defer r.mu.RUnlock()
if r.closed {
return nil
}
hash := murmur32(ns, key, 0xf00)
for {
h, b := r.getBucket(hash)
done, _, n := b.get(r, h, hash, ns, key, setFunc == nil)
if done {
if n != nil {
n.mu.Lock()
if n.value == nil {
if setFunc == nil {
n.mu.Unlock()
n.unref()
return nil
}
n.size, n.value = setFunc()
if n.value == nil {
n.size = 0
n.mu.Unlock()
n.unref()
return nil
}
atomic.AddInt32(&r.size, int32(n.size))
}
n.mu.Unlock()
if r.cacher != nil {
r.cacher.Promote(n)
}
return &Handle{unsafe.Pointer(n)}
}
break
}
}
return nil
}
// Delete removes and ban 'cache node' with the given namespace and key.
// A banned 'cache node' will never inserted into the 'cache tree'. Ban
// only attributed to the particular 'cache node', so when a 'cache node'
// is recreated it will not be banned.
//
// If onDel is not nil, then it will be executed if such 'cache node'
// doesn't exist or once the 'cache node' is released.
//
// Delete return true is such 'cache node' exist.
func (r *Cache) Delete(ns, key uint64, onDel func()) bool {
r.mu.RLock()
defer r.mu.RUnlock()
if r.closed {
return false
}
hash := murmur32(ns, key, 0xf00)
for {
h, b := r.getBucket(hash)
done, _, n := b.get(r, h, hash, ns, key, true)
if done {
if n != nil {
if onDel != nil {
n.mu.Lock()
n.onDel = append(n.onDel, onDel)
n.mu.Unlock()
}
if r.cacher != nil {
r.cacher.Ban(n)
}
n.unref()
return true
}
break
}
}
if onDel != nil {
onDel()
}
return false
}
// Evict evicts 'cache node' with the given namespace and key. This will
// simply call Cacher.Evict.
//
// Evict return true is such 'cache node' exist.
func (r *Cache) Evict(ns, key uint64) bool {
r.mu.RLock()
defer r.mu.RUnlock()
if r.closed {
return false
}
hash := murmur32(ns, key, 0xf00)
for {
h, b := r.getBucket(hash)
done, _, n := b.get(r, h, hash, ns, key, true)
if done {
if n != nil {
if r.cacher != nil {
r.cacher.Evict(n)
}
n.unref()
return true
}
break
}
}
return false
}
// EvictNS evicts 'cache node' with the given namespace. This will
// simply call Cacher.EvictNS.
func (r *Cache) EvictNS(ns uint64) {
r.mu.RLock()
defer r.mu.RUnlock()
if r.closed {
return
}
if r.cacher != nil {
r.cacher.EvictNS(ns)
}
}
// EvictAll evicts all 'cache node'. This will simply call Cacher.EvictAll.
func (r *Cache) EvictAll() {
r.mu.RLock()
defer r.mu.RUnlock()
if r.closed {
return
}
if r.cacher != nil {
r.cacher.EvictAll()
}
}
// Close closes the 'cache map' and forcefully releases all 'cache node'.
func (r *Cache) Close() error {
r.mu.Lock()
if !r.closed {
r.closed = true
h := (*mNode)(r.mHead)
h.initBuckets()
for i := range h.buckets {
b := (*mBucket)(h.buckets[i])
for _, n := range b.node {
// Call releaser.
if n.value != nil {
if r, ok := n.value.(util.Releaser); ok {
r.Release()
}
n.value = nil
}
// Call OnDel.
for _, f := range n.onDel {
f()
}
n.onDel = nil
}
}
}
r.mu.Unlock()
// Avoid deadlock.
if r.cacher != nil {
if err := r.cacher.Close(); err != nil {
return err
}
}
return nil
}
// CloseWeak closes the 'cache map' and evict all 'cache node' from cacher, but
// unlike Close it doesn't forcefully releases 'cache node'.
func (r *Cache) CloseWeak() error {
r.mu.Lock()
if !r.closed {
r.closed = true
}
r.mu.Unlock()
// Avoid deadlock.
if r.cacher != nil {
r.cacher.EvictAll()
if err := r.cacher.Close(); err != nil {
return err
}
}
return nil
}
// Node is a 'cache node'.
type Node struct {
r *Cache
hash uint32
ns, key uint64
mu sync.Mutex
size int
value Value
ref int32
onDel []func()
CacheData unsafe.Pointer
}
// NS returns this 'cache node' namespace.
func (n *Node) NS() uint64 {
return n.ns
}
// Key returns this 'cache node' key.
func (n *Node) Key() uint64 {
return n.key
}
// Size returns this 'cache node' size.
func (n *Node) Size() int {
return n.size
}
// Value returns this 'cache node' value.
func (n *Node) Value() Value {
return n.value
}
// Ref returns this 'cache node' ref counter.
func (n *Node) Ref() int32 {
return atomic.LoadInt32(&n.ref)
}
// GetHandle returns an handle for this 'cache node'.
func (n *Node) GetHandle() *Handle {
if atomic.AddInt32(&n.ref, 1) <= 1 {
panic("BUG: Node.GetHandle on zero ref")
}
return &Handle{unsafe.Pointer(n)}
}
func (n *Node) unref() {
if atomic.AddInt32(&n.ref, -1) == 0 {
n.r.delete(n)
}
}
func (n *Node) unrefLocked() {
if atomic.AddInt32(&n.ref, -1) == 0 {
n.r.mu.RLock()
if !n.r.closed {
n.r.delete(n)
}
n.r.mu.RUnlock()
}
}
// Handle is a 'cache handle' of a 'cache node'.
type Handle struct {
n unsafe.Pointer // *Node
}
// Value returns the value of the 'cache node'.
func (h *Handle) Value() Value {
n := (*Node)(atomic.LoadPointer(&h.n))
if n != nil {
return n.value
}
return nil
}
// Release releases this 'cache handle'.
// It is safe to call release multiple times.
func (h *Handle) Release() {
nPtr := atomic.LoadPointer(&h.n)
if nPtr != nil && atomic.CompareAndSwapPointer(&h.n, nPtr, nil) {
n := (*Node)(nPtr)
n.unrefLocked()
}
}
func murmur32(ns, key uint64, seed uint32) uint32 {
const (
m = uint32(0x5bd1e995)
r = 24
)
k1 := uint32(ns >> 32)
k2 := uint32(ns)
k3 := uint32(key >> 32)
k4 := uint32(key)
k1 *= m
k1 ^= k1 >> r
k1 *= m
k2 *= m
k2 ^= k2 >> r
k2 *= m
k3 *= m
k3 ^= k3 >> r
k3 *= m
k4 *= m
k4 ^= k4 >> r
k4 *= m
h := seed
h *= m
h ^= k1
h *= m
h ^= k2
h *= m
h ^= k3
h *= m
h ^= k4
h ^= h >> 13
h *= m
h ^= h >> 15
return h
}