delta_fifo.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "errors"
  16. "fmt"
  17. "sync"
  18. "k8s.io/apimachinery/pkg/util/sets"
  19. "k8s.io/klog"
  20. )
  21. // NewDeltaFIFO returns a Store which can be used process changes to items.
  22. //
  23. // keyFunc is used to figure out what key an object should have. (It's
  24. // exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
  25. //
  26. // 'keyLister' is expected to return a list of keys that the consumer of
  27. // this queue "knows about". It is used to decide which items are missing
  28. // when Replace() is called; 'Deleted' deltas are produced for these items.
  29. // It may be nil if you don't need to detect all deletions.
  30. // TODO: consider merging keyLister with this object, tracking a list of
  31. // "known" keys when Pop() is called. Have to think about how that
  32. // affects error retrying.
  33. // NOTE: It is possible to misuse this and cause a race when using an
  34. // external known object source.
  35. // Whether there is a potential race depends on how the comsumer
  36. // modifies knownObjects. In Pop(), process function is called under
  37. // lock, so it is safe to update data structures in it that need to be
  38. // in sync with the queue (e.g. knownObjects).
  39. //
  40. // Example:
  41. // In case of sharedIndexInformer being a consumer
  42. // (https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/
  43. // src/k8s.io/client-go/tools/cache/shared_informer.go#L192),
  44. // there is no race as knownObjects (s.indexer) is modified safely
  45. // under DeltaFIFO's lock. The only exceptions are GetStore() and
  46. // GetIndexer() methods, which expose ways to modify the underlying
  47. // storage. Currently these two methods are used for creating Lister
  48. // and internal tests.
  49. //
  50. // Also see the comment on DeltaFIFO.
  51. func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
  52. f := &DeltaFIFO{
  53. items: map[string]Deltas{},
  54. queue: []string{},
  55. keyFunc: keyFunc,
  56. knownObjects: knownObjects,
  57. }
  58. f.cond.L = &f.lock
  59. return f
  60. }
  61. // DeltaFIFO is like FIFO, but allows you to process deletes.
  62. //
  63. // DeltaFIFO is a producer-consumer queue, where a Reflector is
  64. // intended to be the producer, and the consumer is whatever calls
  65. // the Pop() method.
  66. //
  67. // DeltaFIFO solves this use case:
  68. // * You want to process every object change (delta) at most once.
  69. // * When you process an object, you want to see everything
  70. // that's happened to it since you last processed it.
  71. // * You want to process the deletion of objects.
  72. // * You might want to periodically reprocess objects.
  73. //
  74. // DeltaFIFO's Pop(), Get(), and GetByKey() methods return
  75. // interface{} to satisfy the Store/Queue interfaces, but it
  76. // will always return an object of type Deltas.
  77. //
  78. // A note on threading: If you call Pop() in parallel from multiple
  79. // threads, you could end up with multiple threads processing slightly
  80. // different versions of the same object.
  81. //
  82. // A note on the KeyLister used by the DeltaFIFO: It's main purpose is
  83. // to list keys that are "known", for the purpose of figuring out which
  84. // items have been deleted when Replace() or Delete() are called. The deleted
  85. // object will be included in the DeleteFinalStateUnknown markers. These objects
  86. // could be stale.
  87. type DeltaFIFO struct {
  88. // lock/cond protects access to 'items' and 'queue'.
  89. lock sync.RWMutex
  90. cond sync.Cond
  91. // We depend on the property that items in the set are in
  92. // the queue and vice versa, and that all Deltas in this
  93. // map have at least one Delta.
  94. items map[string]Deltas
  95. queue []string
  96. // populated is true if the first batch of items inserted by Replace() has been populated
  97. // or Delete/Add/Update was called first.
  98. populated bool
  99. // initialPopulationCount is the number of items inserted by the first call of Replace()
  100. initialPopulationCount int
  101. // keyFunc is used to make the key used for queued item
  102. // insertion and retrieval, and should be deterministic.
  103. keyFunc KeyFunc
  104. // knownObjects list keys that are "known", for the
  105. // purpose of figuring out which items have been deleted
  106. // when Replace() or Delete() is called.
  107. knownObjects KeyListerGetter
  108. // Indication the queue is closed.
  109. // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
  110. // Currently, not used to gate any of CRED operations.
  111. closed bool
  112. closedLock sync.Mutex
  113. }
  114. var (
  115. _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
  116. )
  117. var (
  118. // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
  119. // object with zero length is encountered (should be impossible,
  120. // but included for completeness).
  121. ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
  122. )
  123. // Close the queue.
  124. func (f *DeltaFIFO) Close() {
  125. f.closedLock.Lock()
  126. defer f.closedLock.Unlock()
  127. f.closed = true
  128. f.cond.Broadcast()
  129. }
  130. // KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
  131. // DeletedFinalStateUnknown objects.
  132. func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
  133. if d, ok := obj.(Deltas); ok {
  134. if len(d) == 0 {
  135. return "", KeyError{obj, ErrZeroLengthDeltasObject}
  136. }
  137. obj = d.Newest().Object
  138. }
  139. if d, ok := obj.(DeletedFinalStateUnknown); ok {
  140. return d.Key, nil
  141. }
  142. return f.keyFunc(obj)
  143. }
  144. // Return true if an Add/Update/Delete/AddIfNotPresent are called first,
  145. // or an Update called first but the first batch of items inserted by Replace() has been popped
  146. func (f *DeltaFIFO) HasSynced() bool {
  147. f.lock.Lock()
  148. defer f.lock.Unlock()
  149. return f.populated && f.initialPopulationCount == 0
  150. }
  151. // Add inserts an item, and puts it in the queue. The item is only enqueued
  152. // if it doesn't already exist in the set.
  153. func (f *DeltaFIFO) Add(obj interface{}) error {
  154. f.lock.Lock()
  155. defer f.lock.Unlock()
  156. f.populated = true
  157. return f.queueActionLocked(Added, obj)
  158. }
  159. // Update is just like Add, but makes an Updated Delta.
  160. func (f *DeltaFIFO) Update(obj interface{}) error {
  161. f.lock.Lock()
  162. defer f.lock.Unlock()
  163. f.populated = true
  164. return f.queueActionLocked(Updated, obj)
  165. }
  166. // Delete is just like Add, but makes an Deleted Delta. If the item does not
  167. // already exist, it will be ignored. (It may have already been deleted by a
  168. // Replace (re-list), for example.
  169. func (f *DeltaFIFO) Delete(obj interface{}) error {
  170. id, err := f.KeyOf(obj)
  171. if err != nil {
  172. return KeyError{obj, err}
  173. }
  174. f.lock.Lock()
  175. defer f.lock.Unlock()
  176. f.populated = true
  177. if f.knownObjects == nil {
  178. if _, exists := f.items[id]; !exists {
  179. // Presumably, this was deleted when a relist happened.
  180. // Don't provide a second report of the same deletion.
  181. return nil
  182. }
  183. } else {
  184. // We only want to skip the "deletion" action if the object doesn't
  185. // exist in knownObjects and it doesn't have corresponding item in items.
  186. // Note that even if there is a "deletion" action in items, we can ignore it,
  187. // because it will be deduped automatically in "queueActionLocked"
  188. _, exists, err := f.knownObjects.GetByKey(id)
  189. _, itemsExist := f.items[id]
  190. if err == nil && !exists && !itemsExist {
  191. // Presumably, this was deleted when a relist happened.
  192. // Don't provide a second report of the same deletion.
  193. return nil
  194. }
  195. }
  196. return f.queueActionLocked(Deleted, obj)
  197. }
  198. // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
  199. // present in the set, it is neither enqueued nor added to the set.
  200. //
  201. // This is useful in a single producer/consumer scenario so that the consumer can
  202. // safely retry items without contending with the producer and potentially enqueueing
  203. // stale items.
  204. //
  205. // Important: obj must be a Deltas (the output of the Pop() function). Yes, this is
  206. // different from the Add/Update/Delete functions.
  207. func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
  208. deltas, ok := obj.(Deltas)
  209. if !ok {
  210. return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
  211. }
  212. id, err := f.KeyOf(deltas.Newest().Object)
  213. if err != nil {
  214. return KeyError{obj, err}
  215. }
  216. f.lock.Lock()
  217. defer f.lock.Unlock()
  218. f.addIfNotPresent(id, deltas)
  219. return nil
  220. }
  221. // addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
  222. // already holds the fifo lock.
  223. func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
  224. f.populated = true
  225. if _, exists := f.items[id]; exists {
  226. return
  227. }
  228. f.queue = append(f.queue, id)
  229. f.items[id] = deltas
  230. f.cond.Broadcast()
  231. }
  232. // re-listing and watching can deliver the same update multiple times in any
  233. // order. This will combine the most recent two deltas if they are the same.
  234. func dedupDeltas(deltas Deltas) Deltas {
  235. n := len(deltas)
  236. if n < 2 {
  237. return deltas
  238. }
  239. a := &deltas[n-1]
  240. b := &deltas[n-2]
  241. if out := isDup(a, b); out != nil {
  242. d := append(Deltas{}, deltas[:n-2]...)
  243. return append(d, *out)
  244. }
  245. return deltas
  246. }
  247. // If a & b represent the same event, returns the delta that ought to be kept.
  248. // Otherwise, returns nil.
  249. // TODO: is there anything other than deletions that need deduping?
  250. func isDup(a, b *Delta) *Delta {
  251. if out := isDeletionDup(a, b); out != nil {
  252. return out
  253. }
  254. // TODO: Detect other duplicate situations? Are there any?
  255. return nil
  256. }
  257. // keep the one with the most information if both are deletions.
  258. func isDeletionDup(a, b *Delta) *Delta {
  259. if b.Type != Deleted || a.Type != Deleted {
  260. return nil
  261. }
  262. // Do more sophisticated checks, or is this sufficient?
  263. if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
  264. return a
  265. }
  266. return b
  267. }
  268. // willObjectBeDeletedLocked returns true only if the last delta for the
  269. // given object is Delete. Caller must lock first.
  270. func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
  271. deltas := f.items[id]
  272. return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
  273. }
  274. // queueActionLocked appends to the delta list for the object.
  275. // Caller must lock first.
  276. func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
  277. id, err := f.KeyOf(obj)
  278. if err != nil {
  279. return KeyError{obj, err}
  280. }
  281. // If object is supposed to be deleted (last event is Deleted),
  282. // then we should ignore Sync events, because it would result in
  283. // recreation of this object.
  284. if actionType == Sync && f.willObjectBeDeletedLocked(id) {
  285. return nil
  286. }
  287. newDeltas := append(f.items[id], Delta{actionType, obj})
  288. newDeltas = dedupDeltas(newDeltas)
  289. if len(newDeltas) > 0 {
  290. if _, exists := f.items[id]; !exists {
  291. f.queue = append(f.queue, id)
  292. }
  293. f.items[id] = newDeltas
  294. f.cond.Broadcast()
  295. } else {
  296. // We need to remove this from our map (extra items in the queue are
  297. // ignored if they are not in the map).
  298. delete(f.items, id)
  299. }
  300. return nil
  301. }
  302. // List returns a list of all the items; it returns the object
  303. // from the most recent Delta.
  304. // You should treat the items returned inside the deltas as immutable.
  305. func (f *DeltaFIFO) List() []interface{} {
  306. f.lock.RLock()
  307. defer f.lock.RUnlock()
  308. return f.listLocked()
  309. }
  310. func (f *DeltaFIFO) listLocked() []interface{} {
  311. list := make([]interface{}, 0, len(f.items))
  312. for _, item := range f.items {
  313. list = append(list, item.Newest().Object)
  314. }
  315. return list
  316. }
  317. // ListKeys returns a list of all the keys of the objects currently
  318. // in the FIFO.
  319. func (f *DeltaFIFO) ListKeys() []string {
  320. f.lock.RLock()
  321. defer f.lock.RUnlock()
  322. list := make([]string, 0, len(f.items))
  323. for key := range f.items {
  324. list = append(list, key)
  325. }
  326. return list
  327. }
  328. // Get returns the complete list of deltas for the requested item,
  329. // or sets exists=false.
  330. // You should treat the items returned inside the deltas as immutable.
  331. func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
  332. key, err := f.KeyOf(obj)
  333. if err != nil {
  334. return nil, false, KeyError{obj, err}
  335. }
  336. return f.GetByKey(key)
  337. }
  338. // GetByKey returns the complete list of deltas for the requested item,
  339. // setting exists=false if that list is empty.
  340. // You should treat the items returned inside the deltas as immutable.
  341. func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
  342. f.lock.RLock()
  343. defer f.lock.RUnlock()
  344. d, exists := f.items[key]
  345. if exists {
  346. // Copy item's slice so operations on this slice
  347. // won't interfere with the object we return.
  348. d = copyDeltas(d)
  349. }
  350. return d, exists, nil
  351. }
  352. // Checks if the queue is closed
  353. func (f *DeltaFIFO) IsClosed() bool {
  354. f.closedLock.Lock()
  355. defer f.closedLock.Unlock()
  356. return f.closed
  357. }
  358. // Pop blocks until an item is added to the queue, and then returns it. If
  359. // multiple items are ready, they are returned in the order in which they were
  360. // added/updated. The item is removed from the queue (and the store) before it
  361. // is returned, so if you don't successfully process it, you need to add it back
  362. // with AddIfNotPresent().
  363. // process function is called under lock, so it is safe update data structures
  364. // in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
  365. // may return an instance of ErrRequeue with a nested error to indicate the current
  366. // item should be requeued (equivalent to calling AddIfNotPresent under the lock).
  367. //
  368. // Pop returns a 'Deltas', which has a complete list of all the things
  369. // that happened to the object (deltas) while it was sitting in the queue.
  370. func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
  371. f.lock.Lock()
  372. defer f.lock.Unlock()
  373. for {
  374. for len(f.queue) == 0 {
  375. // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
  376. // When Close() is called, the f.closed is set and the condition is broadcasted.
  377. // Which causes this loop to continue and return from the Pop().
  378. if f.IsClosed() {
  379. return nil, FIFOClosedError
  380. }
  381. f.cond.Wait()
  382. }
  383. id := f.queue[0]
  384. f.queue = f.queue[1:]
  385. if f.initialPopulationCount > 0 {
  386. f.initialPopulationCount--
  387. }
  388. item, ok := f.items[id]
  389. if !ok {
  390. // Item may have been deleted subsequently.
  391. continue
  392. }
  393. delete(f.items, id)
  394. err := process(item)
  395. if e, ok := err.(ErrRequeue); ok {
  396. f.addIfNotPresent(id, item)
  397. err = e.Err
  398. }
  399. // Don't need to copyDeltas here, because we're transferring
  400. // ownership to the caller.
  401. return item, err
  402. }
  403. }
  404. // Replace will delete the contents of 'f', using instead the given map.
  405. // 'f' takes ownership of the map, you should not reference the map again
  406. // after calling this function. f's queue is reset, too; upon return, it
  407. // will contain the items in the map, in no particular order.
  408. func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
  409. f.lock.Lock()
  410. defer f.lock.Unlock()
  411. keys := make(sets.String, len(list))
  412. for _, item := range list {
  413. key, err := f.KeyOf(item)
  414. if err != nil {
  415. return KeyError{item, err}
  416. }
  417. keys.Insert(key)
  418. if err := f.queueActionLocked(Sync, item); err != nil {
  419. return fmt.Errorf("couldn't enqueue object: %v", err)
  420. }
  421. }
  422. if f.knownObjects == nil {
  423. // Do deletion detection against our own list.
  424. for k, oldItem := range f.items {
  425. if keys.Has(k) {
  426. continue
  427. }
  428. var deletedObj interface{}
  429. if n := oldItem.Newest(); n != nil {
  430. deletedObj = n.Object
  431. }
  432. if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
  433. return err
  434. }
  435. }
  436. if !f.populated {
  437. f.populated = true
  438. f.initialPopulationCount = len(list)
  439. }
  440. return nil
  441. }
  442. // Detect deletions not already in the queue.
  443. knownKeys := f.knownObjects.ListKeys()
  444. queuedDeletions := 0
  445. for _, k := range knownKeys {
  446. if keys.Has(k) {
  447. continue
  448. }
  449. deletedObj, exists, err := f.knownObjects.GetByKey(k)
  450. if err != nil {
  451. deletedObj = nil
  452. klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
  453. } else if !exists {
  454. deletedObj = nil
  455. klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
  456. }
  457. queuedDeletions++
  458. if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
  459. return err
  460. }
  461. }
  462. if !f.populated {
  463. f.populated = true
  464. f.initialPopulationCount = len(list) + queuedDeletions
  465. }
  466. return nil
  467. }
  468. // Resync will send a sync event for each item
  469. func (f *DeltaFIFO) Resync() error {
  470. f.lock.Lock()
  471. defer f.lock.Unlock()
  472. if f.knownObjects == nil {
  473. return nil
  474. }
  475. keys := f.knownObjects.ListKeys()
  476. for _, k := range keys {
  477. if err := f.syncKeyLocked(k); err != nil {
  478. return err
  479. }
  480. }
  481. return nil
  482. }
  483. func (f *DeltaFIFO) syncKey(key string) error {
  484. f.lock.Lock()
  485. defer f.lock.Unlock()
  486. return f.syncKeyLocked(key)
  487. }
  488. func (f *DeltaFIFO) syncKeyLocked(key string) error {
  489. obj, exists, err := f.knownObjects.GetByKey(key)
  490. if err != nil {
  491. klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
  492. return nil
  493. } else if !exists {
  494. klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
  495. return nil
  496. }
  497. // If we are doing Resync() and there is already an event queued for that object,
  498. // we ignore the Resync for it. This is to avoid the race, in which the resync
  499. // comes with the previous value of object (since queueing an event for the object
  500. // doesn't trigger changing the underlying store <knownObjects>.
  501. id, err := f.KeyOf(obj)
  502. if err != nil {
  503. return KeyError{obj, err}
  504. }
  505. if len(f.items[id]) > 0 {
  506. return nil
  507. }
  508. if err := f.queueActionLocked(Sync, obj); err != nil {
  509. return fmt.Errorf("couldn't queue object: %v", err)
  510. }
  511. return nil
  512. }
  513. // A KeyListerGetter is anything that knows how to list its keys and look up by key.
  514. type KeyListerGetter interface {
  515. KeyLister
  516. KeyGetter
  517. }
  518. // A KeyLister is anything that knows how to list its keys.
  519. type KeyLister interface {
  520. ListKeys() []string
  521. }
  522. // A KeyGetter is anything that knows how to get the value stored under a given key.
  523. type KeyGetter interface {
  524. GetByKey(key string) (interface{}, bool, error)
  525. }
  526. // DeltaType is the type of a change (addition, deletion, etc)
  527. type DeltaType string
  528. const (
  529. Added DeltaType = "Added"
  530. Updated DeltaType = "Updated"
  531. Deleted DeltaType = "Deleted"
  532. // The other types are obvious. You'll get Sync deltas when:
  533. // * A watch expires/errors out and a new list/watch cycle is started.
  534. // * You've turned on periodic syncs.
  535. // (Anything that trigger's DeltaFIFO's Replace() method.)
  536. Sync DeltaType = "Sync"
  537. )
  538. // Delta is the type stored by a DeltaFIFO. It tells you what change
  539. // happened, and the object's state after* that change.
  540. //
  541. // [*] Unless the change is a deletion, and then you'll get the final
  542. // state of the object before it was deleted.
  543. type Delta struct {
  544. Type DeltaType
  545. Object interface{}
  546. }
  547. // Deltas is a list of one or more 'Delta's to an individual object.
  548. // The oldest delta is at index 0, the newest delta is the last one.
  549. type Deltas []Delta
  550. // Oldest is a convenience function that returns the oldest delta, or
  551. // nil if there are no deltas.
  552. func (d Deltas) Oldest() *Delta {
  553. if len(d) > 0 {
  554. return &d[0]
  555. }
  556. return nil
  557. }
  558. // Newest is a convenience function that returns the newest delta, or
  559. // nil if there are no deltas.
  560. func (d Deltas) Newest() *Delta {
  561. if n := len(d); n > 0 {
  562. return &d[n-1]
  563. }
  564. return nil
  565. }
  566. // copyDeltas returns a shallow copy of d; that is, it copies the slice but not
  567. // the objects in the slice. This allows Get/List to return an object that we
  568. // know won't be clobbered by a subsequent modifications.
  569. func copyDeltas(d Deltas) Deltas {
  570. d2 := make(Deltas, len(d))
  571. copy(d2, d)
  572. return d2
  573. }
  574. // DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
  575. // an object was deleted but the watch deletion event was missed. In this
  576. // case we don't know the final "resting" state of the object, so there's
  577. // a chance the included `Obj` is stale.
  578. type DeletedFinalStateUnknown struct {
  579. Key string
  580. Obj interface{}
  581. }