123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package cache
- import (
- "errors"
- "fmt"
- "sync"
- "k8s.io/apimachinery/pkg/util/sets"
- "k8s.io/klog"
- )
- // NewDeltaFIFO returns a Store which can be used process changes to items.
- //
- // keyFunc is used to figure out what key an object should have. (It's
- // exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
- //
- // 'keyLister' is expected to return a list of keys that the consumer of
- // this queue "knows about". It is used to decide which items are missing
- // when Replace() is called; 'Deleted' deltas are produced for these items.
- // It may be nil if you don't need to detect all deletions.
- // TODO: consider merging keyLister with this object, tracking a list of
- // "known" keys when Pop() is called. Have to think about how that
- // affects error retrying.
- // NOTE: It is possible to misuse this and cause a race when using an
- // external known object source.
- // Whether there is a potential race depends on how the comsumer
- // modifies knownObjects. In Pop(), process function is called under
- // lock, so it is safe to update data structures in it that need to be
- // in sync with the queue (e.g. knownObjects).
- //
- // Example:
- // In case of sharedIndexInformer being a consumer
- // (https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/
- // src/k8s.io/client-go/tools/cache/shared_informer.go#L192),
- // there is no race as knownObjects (s.indexer) is modified safely
- // under DeltaFIFO's lock. The only exceptions are GetStore() and
- // GetIndexer() methods, which expose ways to modify the underlying
- // storage. Currently these two methods are used for creating Lister
- // and internal tests.
- //
- // Also see the comment on DeltaFIFO.
- func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
- f := &DeltaFIFO{
- items: map[string]Deltas{},
- queue: []string{},
- keyFunc: keyFunc,
- knownObjects: knownObjects,
- }
- f.cond.L = &f.lock
- return f
- }
- // DeltaFIFO is like FIFO, but allows you to process deletes.
- //
- // DeltaFIFO is a producer-consumer queue, where a Reflector is
- // intended to be the producer, and the consumer is whatever calls
- // the Pop() method.
- //
- // DeltaFIFO solves this use case:
- // * You want to process every object change (delta) at most once.
- // * When you process an object, you want to see everything
- // that's happened to it since you last processed it.
- // * You want to process the deletion of objects.
- // * You might want to periodically reprocess objects.
- //
- // DeltaFIFO's Pop(), Get(), and GetByKey() methods return
- // interface{} to satisfy the Store/Queue interfaces, but it
- // will always return an object of type Deltas.
- //
- // A note on threading: If you call Pop() in parallel from multiple
- // threads, you could end up with multiple threads processing slightly
- // different versions of the same object.
- //
- // A note on the KeyLister used by the DeltaFIFO: It's main purpose is
- // to list keys that are "known", for the purpose of figuring out which
- // items have been deleted when Replace() or Delete() are called. The deleted
- // object will be included in the DeleteFinalStateUnknown markers. These objects
- // could be stale.
- type DeltaFIFO struct {
- // lock/cond protects access to 'items' and 'queue'.
- lock sync.RWMutex
- cond sync.Cond
- // We depend on the property that items in the set are in
- // the queue and vice versa, and that all Deltas in this
- // map have at least one Delta.
- items map[string]Deltas
- queue []string
- // populated is true if the first batch of items inserted by Replace() has been populated
- // or Delete/Add/Update was called first.
- populated bool
- // initialPopulationCount is the number of items inserted by the first call of Replace()
- initialPopulationCount int
- // keyFunc is used to make the key used for queued item
- // insertion and retrieval, and should be deterministic.
- keyFunc KeyFunc
- // knownObjects list keys that are "known", for the
- // purpose of figuring out which items have been deleted
- // when Replace() or Delete() is called.
- knownObjects KeyListerGetter
- // Indication the queue is closed.
- // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
- // Currently, not used to gate any of CRED operations.
- closed bool
- closedLock sync.Mutex
- }
- var (
- _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
- )
- var (
- // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
- // object with zero length is encountered (should be impossible,
- // but included for completeness).
- ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
- )
- // Close the queue.
- func (f *DeltaFIFO) Close() {
- f.closedLock.Lock()
- defer f.closedLock.Unlock()
- f.closed = true
- f.cond.Broadcast()
- }
- // KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
- // DeletedFinalStateUnknown objects.
- func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
- if d, ok := obj.(Deltas); ok {
- if len(d) == 0 {
- return "", KeyError{obj, ErrZeroLengthDeltasObject}
- }
- obj = d.Newest().Object
- }
- if d, ok := obj.(DeletedFinalStateUnknown); ok {
- return d.Key, nil
- }
- return f.keyFunc(obj)
- }
- // Return true if an Add/Update/Delete/AddIfNotPresent are called first,
- // or an Update called first but the first batch of items inserted by Replace() has been popped
- func (f *DeltaFIFO) HasSynced() bool {
- f.lock.Lock()
- defer f.lock.Unlock()
- return f.populated && f.initialPopulationCount == 0
- }
- // Add inserts an item, and puts it in the queue. The item is only enqueued
- // if it doesn't already exist in the set.
- func (f *DeltaFIFO) Add(obj interface{}) error {
- f.lock.Lock()
- defer f.lock.Unlock()
- f.populated = true
- return f.queueActionLocked(Added, obj)
- }
- // Update is just like Add, but makes an Updated Delta.
- func (f *DeltaFIFO) Update(obj interface{}) error {
- f.lock.Lock()
- defer f.lock.Unlock()
- f.populated = true
- return f.queueActionLocked(Updated, obj)
- }
- // Delete is just like Add, but makes an Deleted Delta. If the item does not
- // already exist, it will be ignored. (It may have already been deleted by a
- // Replace (re-list), for example.
- func (f *DeltaFIFO) Delete(obj interface{}) error {
- id, err := f.KeyOf(obj)
- if err != nil {
- return KeyError{obj, err}
- }
- f.lock.Lock()
- defer f.lock.Unlock()
- f.populated = true
- if f.knownObjects == nil {
- if _, exists := f.items[id]; !exists {
- // Presumably, this was deleted when a relist happened.
- // Don't provide a second report of the same deletion.
- return nil
- }
- } else {
- // We only want to skip the "deletion" action if the object doesn't
- // exist in knownObjects and it doesn't have corresponding item in items.
- // Note that even if there is a "deletion" action in items, we can ignore it,
- // because it will be deduped automatically in "queueActionLocked"
- _, exists, err := f.knownObjects.GetByKey(id)
- _, itemsExist := f.items[id]
- if err == nil && !exists && !itemsExist {
- // Presumably, this was deleted when a relist happened.
- // Don't provide a second report of the same deletion.
- return nil
- }
- }
- return f.queueActionLocked(Deleted, obj)
- }
- // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
- // present in the set, it is neither enqueued nor added to the set.
- //
- // This is useful in a single producer/consumer scenario so that the consumer can
- // safely retry items without contending with the producer and potentially enqueueing
- // stale items.
- //
- // Important: obj must be a Deltas (the output of the Pop() function). Yes, this is
- // different from the Add/Update/Delete functions.
- func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
- deltas, ok := obj.(Deltas)
- if !ok {
- return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
- }
- id, err := f.KeyOf(deltas.Newest().Object)
- if err != nil {
- return KeyError{obj, err}
- }
- f.lock.Lock()
- defer f.lock.Unlock()
- f.addIfNotPresent(id, deltas)
- return nil
- }
- // addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
- // already holds the fifo lock.
- func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
- f.populated = true
- if _, exists := f.items[id]; exists {
- return
- }
- f.queue = append(f.queue, id)
- f.items[id] = deltas
- f.cond.Broadcast()
- }
- // re-listing and watching can deliver the same update multiple times in any
- // order. This will combine the most recent two deltas if they are the same.
- func dedupDeltas(deltas Deltas) Deltas {
- n := len(deltas)
- if n < 2 {
- return deltas
- }
- a := &deltas[n-1]
- b := &deltas[n-2]
- if out := isDup(a, b); out != nil {
- d := append(Deltas{}, deltas[:n-2]...)
- return append(d, *out)
- }
- return deltas
- }
- // If a & b represent the same event, returns the delta that ought to be kept.
- // Otherwise, returns nil.
- // TODO: is there anything other than deletions that need deduping?
- func isDup(a, b *Delta) *Delta {
- if out := isDeletionDup(a, b); out != nil {
- return out
- }
- // TODO: Detect other duplicate situations? Are there any?
- return nil
- }
- // keep the one with the most information if both are deletions.
- func isDeletionDup(a, b *Delta) *Delta {
- if b.Type != Deleted || a.Type != Deleted {
- return nil
- }
- // Do more sophisticated checks, or is this sufficient?
- if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
- return a
- }
- return b
- }
- // willObjectBeDeletedLocked returns true only if the last delta for the
- // given object is Delete. Caller must lock first.
- func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
- deltas := f.items[id]
- return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
- }
- // queueActionLocked appends to the delta list for the object.
- // Caller must lock first.
- func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
- id, err := f.KeyOf(obj)
- if err != nil {
- return KeyError{obj, err}
- }
- // If object is supposed to be deleted (last event is Deleted),
- // then we should ignore Sync events, because it would result in
- // recreation of this object.
- if actionType == Sync && f.willObjectBeDeletedLocked(id) {
- return nil
- }
- newDeltas := append(f.items[id], Delta{actionType, obj})
- newDeltas = dedupDeltas(newDeltas)
- if len(newDeltas) > 0 {
- if _, exists := f.items[id]; !exists {
- f.queue = append(f.queue, id)
- }
- f.items[id] = newDeltas
- f.cond.Broadcast()
- } else {
- // We need to remove this from our map (extra items in the queue are
- // ignored if they are not in the map).
- delete(f.items, id)
- }
- return nil
- }
- // List returns a list of all the items; it returns the object
- // from the most recent Delta.
- // You should treat the items returned inside the deltas as immutable.
- func (f *DeltaFIFO) List() []interface{} {
- f.lock.RLock()
- defer f.lock.RUnlock()
- return f.listLocked()
- }
- func (f *DeltaFIFO) listLocked() []interface{} {
- list := make([]interface{}, 0, len(f.items))
- for _, item := range f.items {
- list = append(list, item.Newest().Object)
- }
- return list
- }
- // ListKeys returns a list of all the keys of the objects currently
- // in the FIFO.
- func (f *DeltaFIFO) ListKeys() []string {
- f.lock.RLock()
- defer f.lock.RUnlock()
- list := make([]string, 0, len(f.items))
- for key := range f.items {
- list = append(list, key)
- }
- return list
- }
- // Get returns the complete list of deltas for the requested item,
- // or sets exists=false.
- // You should treat the items returned inside the deltas as immutable.
- func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
- key, err := f.KeyOf(obj)
- if err != nil {
- return nil, false, KeyError{obj, err}
- }
- return f.GetByKey(key)
- }
- // GetByKey returns the complete list of deltas for the requested item,
- // setting exists=false if that list is empty.
- // You should treat the items returned inside the deltas as immutable.
- func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
- f.lock.RLock()
- defer f.lock.RUnlock()
- d, exists := f.items[key]
- if exists {
- // Copy item's slice so operations on this slice
- // won't interfere with the object we return.
- d = copyDeltas(d)
- }
- return d, exists, nil
- }
- // Checks if the queue is closed
- func (f *DeltaFIFO) IsClosed() bool {
- f.closedLock.Lock()
- defer f.closedLock.Unlock()
- return f.closed
- }
- // Pop blocks until an item is added to the queue, and then returns it. If
- // multiple items are ready, they are returned in the order in which they were
- // added/updated. The item is removed from the queue (and the store) before it
- // is returned, so if you don't successfully process it, you need to add it back
- // with AddIfNotPresent().
- // process function is called under lock, so it is safe update data structures
- // in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
- // may return an instance of ErrRequeue with a nested error to indicate the current
- // item should be requeued (equivalent to calling AddIfNotPresent under the lock).
- //
- // Pop returns a 'Deltas', which has a complete list of all the things
- // that happened to the object (deltas) while it was sitting in the queue.
- func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
- f.lock.Lock()
- defer f.lock.Unlock()
- for {
- for len(f.queue) == 0 {
- // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
- // When Close() is called, the f.closed is set and the condition is broadcasted.
- // Which causes this loop to continue and return from the Pop().
- if f.IsClosed() {
- return nil, FIFOClosedError
- }
- f.cond.Wait()
- }
- id := f.queue[0]
- f.queue = f.queue[1:]
- if f.initialPopulationCount > 0 {
- f.initialPopulationCount--
- }
- item, ok := f.items[id]
- if !ok {
- // Item may have been deleted subsequently.
- continue
- }
- delete(f.items, id)
- err := process(item)
- if e, ok := err.(ErrRequeue); ok {
- f.addIfNotPresent(id, item)
- err = e.Err
- }
- // Don't need to copyDeltas here, because we're transferring
- // ownership to the caller.
- return item, err
- }
- }
- // Replace will delete the contents of 'f', using instead the given map.
- // 'f' takes ownership of the map, you should not reference the map again
- // after calling this function. f's queue is reset, too; upon return, it
- // will contain the items in the map, in no particular order.
- func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
- f.lock.Lock()
- defer f.lock.Unlock()
- keys := make(sets.String, len(list))
- for _, item := range list {
- key, err := f.KeyOf(item)
- if err != nil {
- return KeyError{item, err}
- }
- keys.Insert(key)
- if err := f.queueActionLocked(Sync, item); err != nil {
- return fmt.Errorf("couldn't enqueue object: %v", err)
- }
- }
- if f.knownObjects == nil {
- // Do deletion detection against our own list.
- for k, oldItem := range f.items {
- if keys.Has(k) {
- continue
- }
- var deletedObj interface{}
- if n := oldItem.Newest(); n != nil {
- deletedObj = n.Object
- }
- if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
- return err
- }
- }
- if !f.populated {
- f.populated = true
- f.initialPopulationCount = len(list)
- }
- return nil
- }
- // Detect deletions not already in the queue.
- knownKeys := f.knownObjects.ListKeys()
- queuedDeletions := 0
- for _, k := range knownKeys {
- if keys.Has(k) {
- continue
- }
- deletedObj, exists, err := f.knownObjects.GetByKey(k)
- if err != nil {
- deletedObj = nil
- klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
- } else if !exists {
- deletedObj = nil
- klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
- }
- queuedDeletions++
- if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
- return err
- }
- }
- if !f.populated {
- f.populated = true
- f.initialPopulationCount = len(list) + queuedDeletions
- }
- return nil
- }
- // Resync will send a sync event for each item
- func (f *DeltaFIFO) Resync() error {
- f.lock.Lock()
- defer f.lock.Unlock()
- if f.knownObjects == nil {
- return nil
- }
- keys := f.knownObjects.ListKeys()
- for _, k := range keys {
- if err := f.syncKeyLocked(k); err != nil {
- return err
- }
- }
- return nil
- }
- func (f *DeltaFIFO) syncKey(key string) error {
- f.lock.Lock()
- defer f.lock.Unlock()
- return f.syncKeyLocked(key)
- }
- func (f *DeltaFIFO) syncKeyLocked(key string) error {
- obj, exists, err := f.knownObjects.GetByKey(key)
- if err != nil {
- klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
- return nil
- } else if !exists {
- klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
- return nil
- }
- // If we are doing Resync() and there is already an event queued for that object,
- // we ignore the Resync for it. This is to avoid the race, in which the resync
- // comes with the previous value of object (since queueing an event for the object
- // doesn't trigger changing the underlying store <knownObjects>.
- id, err := f.KeyOf(obj)
- if err != nil {
- return KeyError{obj, err}
- }
- if len(f.items[id]) > 0 {
- return nil
- }
- if err := f.queueActionLocked(Sync, obj); err != nil {
- return fmt.Errorf("couldn't queue object: %v", err)
- }
- return nil
- }
- // A KeyListerGetter is anything that knows how to list its keys and look up by key.
- type KeyListerGetter interface {
- KeyLister
- KeyGetter
- }
- // A KeyLister is anything that knows how to list its keys.
- type KeyLister interface {
- ListKeys() []string
- }
- // A KeyGetter is anything that knows how to get the value stored under a given key.
- type KeyGetter interface {
- GetByKey(key string) (interface{}, bool, error)
- }
- // DeltaType is the type of a change (addition, deletion, etc)
- type DeltaType string
- const (
- Added DeltaType = "Added"
- Updated DeltaType = "Updated"
- Deleted DeltaType = "Deleted"
- // The other types are obvious. You'll get Sync deltas when:
- // * A watch expires/errors out and a new list/watch cycle is started.
- // * You've turned on periodic syncs.
- // (Anything that trigger's DeltaFIFO's Replace() method.)
- Sync DeltaType = "Sync"
- )
- // Delta is the type stored by a DeltaFIFO. It tells you what change
- // happened, and the object's state after* that change.
- //
- // [*] Unless the change is a deletion, and then you'll get the final
- // state of the object before it was deleted.
- type Delta struct {
- Type DeltaType
- Object interface{}
- }
- // Deltas is a list of one or more 'Delta's to an individual object.
- // The oldest delta is at index 0, the newest delta is the last one.
- type Deltas []Delta
- // Oldest is a convenience function that returns the oldest delta, or
- // nil if there are no deltas.
- func (d Deltas) Oldest() *Delta {
- if len(d) > 0 {
- return &d[0]
- }
- return nil
- }
- // Newest is a convenience function that returns the newest delta, or
- // nil if there are no deltas.
- func (d Deltas) Newest() *Delta {
- if n := len(d); n > 0 {
- return &d[n-1]
- }
- return nil
- }
- // copyDeltas returns a shallow copy of d; that is, it copies the slice but not
- // the objects in the slice. This allows Get/List to return an object that we
- // know won't be clobbered by a subsequent modifications.
- func copyDeltas(d Deltas) Deltas {
- d2 := make(Deltas, len(d))
- copy(d2, d)
- return d2
- }
- // DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
- // an object was deleted but the watch deletion event was missed. In this
- // case we don't know the final "resting" state of the object, so there's
- // a chance the included `Obj` is stale.
- type DeletedFinalStateUnknown struct {
- Key string
- Obj interface{}
- }
|