123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323 |
- /*
- Copyright 2017 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- // This file implements a heap data structure.
- package cache
- import (
- "container/heap"
- "fmt"
- "sync"
- )
- const (
- closedMsg = "heap is closed"
- )
- type LessFunc func(interface{}, interface{}) bool
- type heapItem struct {
- obj interface{} // The object which is stored in the heap.
- index int // The index of the object's key in the Heap.queue.
- }
- type itemKeyValue struct {
- key string
- obj interface{}
- }
- // heapData is an internal struct that implements the standard heap interface
- // and keeps the data stored in the heap.
- type heapData struct {
- // items is a map from key of the objects to the objects and their index.
- // We depend on the property that items in the map are in the queue and vice versa.
- items map[string]*heapItem
- // queue implements a heap data structure and keeps the order of elements
- // according to the heap invariant. The queue keeps the keys of objects stored
- // in "items".
- queue []string
- // keyFunc is used to make the key used for queued item insertion and retrieval, and
- // should be deterministic.
- keyFunc KeyFunc
- // lessFunc is used to compare two objects in the heap.
- lessFunc LessFunc
- }
- var (
- _ = heap.Interface(&heapData{}) // heapData is a standard heap
- )
- // Less compares two objects and returns true if the first one should go
- // in front of the second one in the heap.
- func (h *heapData) Less(i, j int) bool {
- if i > len(h.queue) || j > len(h.queue) {
- return false
- }
- itemi, ok := h.items[h.queue[i]]
- if !ok {
- return false
- }
- itemj, ok := h.items[h.queue[j]]
- if !ok {
- return false
- }
- return h.lessFunc(itemi.obj, itemj.obj)
- }
- // Len returns the number of items in the Heap.
- func (h *heapData) Len() int { return len(h.queue) }
- // Swap implements swapping of two elements in the heap. This is a part of standard
- // heap interface and should never be called directly.
- func (h *heapData) Swap(i, j int) {
- h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
- item := h.items[h.queue[i]]
- item.index = i
- item = h.items[h.queue[j]]
- item.index = j
- }
- // Push is supposed to be called by heap.Push only.
- func (h *heapData) Push(kv interface{}) {
- keyValue := kv.(*itemKeyValue)
- n := len(h.queue)
- h.items[keyValue.key] = &heapItem{keyValue.obj, n}
- h.queue = append(h.queue, keyValue.key)
- }
- // Pop is supposed to be called by heap.Pop only.
- func (h *heapData) Pop() interface{} {
- key := h.queue[len(h.queue)-1]
- h.queue = h.queue[0 : len(h.queue)-1]
- item, ok := h.items[key]
- if !ok {
- // This is an error
- return nil
- }
- delete(h.items, key)
- return item.obj
- }
- // Heap is a thread-safe producer/consumer queue that implements a heap data structure.
- // It can be used to implement priority queues and similar data structures.
- type Heap struct {
- lock sync.RWMutex
- cond sync.Cond
- // data stores objects and has a queue that keeps their ordering according
- // to the heap invariant.
- data *heapData
- // closed indicates that the queue is closed.
- // It is mainly used to let Pop() exit its control loop while waiting for an item.
- closed bool
- }
- // Close the Heap and signals condition variables that may be waiting to pop
- // items from the heap.
- func (h *Heap) Close() {
- h.lock.Lock()
- defer h.lock.Unlock()
- h.closed = true
- h.cond.Broadcast()
- }
- // Add inserts an item, and puts it in the queue. The item is updated if it
- // already exists.
- func (h *Heap) Add(obj interface{}) error {
- key, err := h.data.keyFunc(obj)
- if err != nil {
- return KeyError{obj, err}
- }
- h.lock.Lock()
- defer h.lock.Unlock()
- if h.closed {
- return fmt.Errorf(closedMsg)
- }
- if _, exists := h.data.items[key]; exists {
- h.data.items[key].obj = obj
- heap.Fix(h.data, h.data.items[key].index)
- } else {
- h.addIfNotPresentLocked(key, obj)
- }
- h.cond.Broadcast()
- return nil
- }
- // Adds all the items in the list to the queue and then signals the condition
- // variable. It is useful when the caller would like to add all of the items
- // to the queue before consumer starts processing them.
- func (h *Heap) BulkAdd(list []interface{}) error {
- h.lock.Lock()
- defer h.lock.Unlock()
- if h.closed {
- return fmt.Errorf(closedMsg)
- }
- for _, obj := range list {
- key, err := h.data.keyFunc(obj)
- if err != nil {
- return KeyError{obj, err}
- }
- if _, exists := h.data.items[key]; exists {
- h.data.items[key].obj = obj
- heap.Fix(h.data, h.data.items[key].index)
- } else {
- h.addIfNotPresentLocked(key, obj)
- }
- }
- h.cond.Broadcast()
- return nil
- }
- // AddIfNotPresent inserts an item, and puts it in the queue. If an item with
- // the key is present in the map, no changes is made to the item.
- //
- // This is useful in a single producer/consumer scenario so that the consumer can
- // safely retry items without contending with the producer and potentially enqueueing
- // stale items.
- func (h *Heap) AddIfNotPresent(obj interface{}) error {
- id, err := h.data.keyFunc(obj)
- if err != nil {
- return KeyError{obj, err}
- }
- h.lock.Lock()
- defer h.lock.Unlock()
- if h.closed {
- return fmt.Errorf(closedMsg)
- }
- h.addIfNotPresentLocked(id, obj)
- h.cond.Broadcast()
- return nil
- }
- // addIfNotPresentLocked assumes the lock is already held and adds the provided
- // item to the queue if it does not already exist.
- func (h *Heap) addIfNotPresentLocked(key string, obj interface{}) {
- if _, exists := h.data.items[key]; exists {
- return
- }
- heap.Push(h.data, &itemKeyValue{key, obj})
- }
- // Update is the same as Add in this implementation. When the item does not
- // exist, it is added.
- func (h *Heap) Update(obj interface{}) error {
- return h.Add(obj)
- }
- // Delete removes an item.
- func (h *Heap) Delete(obj interface{}) error {
- key, err := h.data.keyFunc(obj)
- if err != nil {
- return KeyError{obj, err}
- }
- h.lock.Lock()
- defer h.lock.Unlock()
- if item, ok := h.data.items[key]; ok {
- heap.Remove(h.data, item.index)
- return nil
- }
- return fmt.Errorf("object not found")
- }
- // Pop waits until an item is ready. If multiple items are
- // ready, they are returned in the order given by Heap.data.lessFunc.
- func (h *Heap) Pop() (interface{}, error) {
- h.lock.Lock()
- defer h.lock.Unlock()
- for len(h.data.queue) == 0 {
- // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
- // When Close() is called, the h.closed is set and the condition is broadcast,
- // which causes this loop to continue and return from the Pop().
- if h.closed {
- return nil, fmt.Errorf("heap is closed")
- }
- h.cond.Wait()
- }
- obj := heap.Pop(h.data)
- if obj != nil {
- return obj, nil
- } else {
- return nil, fmt.Errorf("object was removed from heap data")
- }
- }
- // List returns a list of all the items.
- func (h *Heap) List() []interface{} {
- h.lock.RLock()
- defer h.lock.RUnlock()
- list := make([]interface{}, 0, len(h.data.items))
- for _, item := range h.data.items {
- list = append(list, item.obj)
- }
- return list
- }
- // ListKeys returns a list of all the keys of the objects currently in the Heap.
- func (h *Heap) ListKeys() []string {
- h.lock.RLock()
- defer h.lock.RUnlock()
- list := make([]string, 0, len(h.data.items))
- for key := range h.data.items {
- list = append(list, key)
- }
- return list
- }
- // Get returns the requested item, or sets exists=false.
- func (h *Heap) Get(obj interface{}) (interface{}, bool, error) {
- key, err := h.data.keyFunc(obj)
- if err != nil {
- return nil, false, KeyError{obj, err}
- }
- return h.GetByKey(key)
- }
- // GetByKey returns the requested item, or sets exists=false.
- func (h *Heap) GetByKey(key string) (interface{}, bool, error) {
- h.lock.RLock()
- defer h.lock.RUnlock()
- item, exists := h.data.items[key]
- if !exists {
- return nil, false, nil
- }
- return item.obj, true, nil
- }
- // IsClosed returns true if the queue is closed.
- func (h *Heap) IsClosed() bool {
- h.lock.RLock()
- defer h.lock.RUnlock()
- if h.closed {
- return true
- }
- return false
- }
- // NewHeap returns a Heap which can be used to queue up items to process.
- func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap {
- h := &Heap{
- data: &heapData{
- items: map[string]*heapItem{},
- queue: []string{},
- keyFunc: keyFn,
- lessFunc: lessFn,
- },
- }
- h.cond.L = &h.lock
- return h
- }
|