123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package cache
- import (
- "sync"
- "time"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/util/clock"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/wait"
- )
- // Config contains all the settings for a Controller.
- type Config struct {
- // The queue for your objects; either a FIFO or
- // a DeltaFIFO. Your Process() function should accept
- // the output of this Queue's Pop() method.
- Queue
- // Something that can list and watch your objects.
- ListerWatcher
- // Something that can process your objects.
- Process ProcessFunc
- // The type of your objects.
- ObjectType runtime.Object
- // Reprocess everything at least this often.
- // Note that if it takes longer for you to clear the queue than this
- // period, you will end up processing items in the order determined
- // by FIFO.Replace(). Currently, this is random. If this is a
- // problem, we can change that replacement policy to append new
- // things to the end of the queue instead of replacing the entire
- // queue.
- FullResyncPeriod time.Duration
- // ShouldResync, if specified, is invoked when the controller's reflector determines the next
- // periodic sync should occur. If this returns true, it means the reflector should proceed with
- // the resync.
- ShouldResync ShouldResyncFunc
- // If true, when Process() returns an error, re-enqueue the object.
- // TODO: add interface to let you inject a delay/backoff or drop
- // the object completely if desired. Pass the object in
- // question to this interface as a parameter.
- RetryOnError bool
- }
- // ShouldResyncFunc is a type of function that indicates if a reflector should perform a
- // resync or not. It can be used by a shared informer to support multiple event handlers with custom
- // resync periods.
- type ShouldResyncFunc func() bool
- // ProcessFunc processes a single object.
- type ProcessFunc func(obj interface{}) error
- // Controller is a generic controller framework.
- type controller struct {
- config Config
- reflector *Reflector
- reflectorMutex sync.RWMutex
- clock clock.Clock
- }
- type Controller interface {
- Run(stopCh <-chan struct{})
- HasSynced() bool
- LastSyncResourceVersion() string
- }
- // New makes a new Controller from the given Config.
- func New(c *Config) Controller {
- ctlr := &controller{
- config: *c,
- clock: &clock.RealClock{},
- }
- return ctlr
- }
- // Run begins processing items, and will continue until a value is sent down stopCh.
- // It's an error to call Run more than once.
- // Run blocks; call via go.
- func (c *controller) Run(stopCh <-chan struct{}) {
- defer utilruntime.HandleCrash()
- go func() {
- <-stopCh
- c.config.Queue.Close()
- }()
- r := NewReflector(
- c.config.ListerWatcher,
- c.config.ObjectType,
- c.config.Queue,
- c.config.FullResyncPeriod,
- )
- r.ShouldResync = c.config.ShouldResync
- r.clock = c.clock
- c.reflectorMutex.Lock()
- c.reflector = r
- c.reflectorMutex.Unlock()
- var wg wait.Group
- defer wg.Wait()
- wg.StartWithChannel(stopCh, r.Run)
- wait.Until(c.processLoop, time.Second, stopCh)
- }
- // Returns true once this controller has completed an initial resource listing
- func (c *controller) HasSynced() bool {
- return c.config.Queue.HasSynced()
- }
- func (c *controller) LastSyncResourceVersion() string {
- if c.reflector == nil {
- return ""
- }
- return c.reflector.LastSyncResourceVersion()
- }
- // processLoop drains the work queue.
- // TODO: Consider doing the processing in parallel. This will require a little thought
- // to make sure that we don't end up processing the same object multiple times
- // concurrently.
- //
- // TODO: Plumb through the stopCh here (and down to the queue) so that this can
- // actually exit when the controller is stopped. Or just give up on this stuff
- // ever being stoppable. Converting this whole package to use Context would
- // also be helpful.
- func (c *controller) processLoop() {
- for {
- obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
- if err != nil {
- if err == FIFOClosedError {
- return
- }
- if c.config.RetryOnError {
- // This is the safe way to re-enqueue.
- c.config.Queue.AddIfNotPresent(obj)
- }
- }
- }
- }
- // ResourceEventHandler can handle notifications for events that happen to a
- // resource. The events are informational only, so you can't return an
- // error.
- // * OnAdd is called when an object is added.
- // * OnUpdate is called when an object is modified. Note that oldObj is the
- // last known state of the object-- it is possible that several changes
- // were combined together, so you can't use this to see every single
- // change. OnUpdate is also called when a re-list happens, and it will
- // get called even if nothing changed. This is useful for periodically
- // evaluating or syncing something.
- // * OnDelete will get the final state of the item if it is known, otherwise
- // it will get an object of type DeletedFinalStateUnknown. This can
- // happen if the watch is closed and misses the delete event and we don't
- // notice the deletion until the subsequent re-list.
- type ResourceEventHandler interface {
- OnAdd(obj interface{})
- OnUpdate(oldObj, newObj interface{})
- OnDelete(obj interface{})
- }
- // ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
- // as few of the notification functions as you want while still implementing
- // ResourceEventHandler.
- type ResourceEventHandlerFuncs struct {
- AddFunc func(obj interface{})
- UpdateFunc func(oldObj, newObj interface{})
- DeleteFunc func(obj interface{})
- }
- // OnAdd calls AddFunc if it's not nil.
- func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
- if r.AddFunc != nil {
- r.AddFunc(obj)
- }
- }
- // OnUpdate calls UpdateFunc if it's not nil.
- func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
- if r.UpdateFunc != nil {
- r.UpdateFunc(oldObj, newObj)
- }
- }
- // OnDelete calls DeleteFunc if it's not nil.
- func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
- if r.DeleteFunc != nil {
- r.DeleteFunc(obj)
- }
- }
- // FilteringResourceEventHandler applies the provided filter to all events coming
- // in, ensuring the appropriate nested handler method is invoked. An object
- // that starts passing the filter after an update is considered an add, and an
- // object that stops passing the filter after an update is considered a delete.
- type FilteringResourceEventHandler struct {
- FilterFunc func(obj interface{}) bool
- Handler ResourceEventHandler
- }
- // OnAdd calls the nested handler only if the filter succeeds
- func (r FilteringResourceEventHandler) OnAdd(obj interface{}) {
- if !r.FilterFunc(obj) {
- return
- }
- r.Handler.OnAdd(obj)
- }
- // OnUpdate ensures the proper handler is called depending on whether the filter matches
- func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj interface{}) {
- newer := r.FilterFunc(newObj)
- older := r.FilterFunc(oldObj)
- switch {
- case newer && older:
- r.Handler.OnUpdate(oldObj, newObj)
- case newer && !older:
- r.Handler.OnAdd(newObj)
- case !newer && older:
- r.Handler.OnDelete(oldObj)
- default:
- // do nothing
- }
- }
- // OnDelete calls the nested handler only if the filter succeeds
- func (r FilteringResourceEventHandler) OnDelete(obj interface{}) {
- if !r.FilterFunc(obj) {
- return
- }
- r.Handler.OnDelete(obj)
- }
- // DeletionHandlingMetaNamespaceKeyFunc checks for
- // DeletedFinalStateUnknown objects before calling
- // MetaNamespaceKeyFunc.
- func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
- if d, ok := obj.(DeletedFinalStateUnknown); ok {
- return d.Key, nil
- }
- return MetaNamespaceKeyFunc(obj)
- }
- // NewInformer returns a Store and a controller for populating the store
- // while also providing event notifications. You should only used the returned
- // Store for Get/List operations; Add/Modify/Deletes will cause the event
- // notifications to be faulty.
- //
- // Parameters:
- // * lw is list and watch functions for the source of the resource you want to
- // be informed of.
- // * objType is an object of the type that you expect to receive.
- // * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
- // calls, even if nothing changed). Otherwise, re-list will be delayed as
- // long as possible (until the upstream source closes the watch or times out,
- // or you stop the controller).
- // * h is the object you want notifications sent to.
- //
- func NewInformer(
- lw ListerWatcher,
- objType runtime.Object,
- resyncPeriod time.Duration,
- h ResourceEventHandler,
- ) (Store, Controller) {
- // This will hold the client state, as we know it.
- clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
- // This will hold incoming changes. Note how we pass clientState in as a
- // KeyLister, that way resync operations will result in the correct set
- // of update/delete deltas.
- fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState)
- cfg := &Config{
- Queue: fifo,
- ListerWatcher: lw,
- ObjectType: objType,
- FullResyncPeriod: resyncPeriod,
- RetryOnError: false,
- Process: func(obj interface{}) error {
- // from oldest to newest
- for _, d := range obj.(Deltas) {
- switch d.Type {
- case Sync, Added, Updated:
- if old, exists, err := clientState.Get(d.Object); err == nil && exists {
- if err := clientState.Update(d.Object); err != nil {
- return err
- }
- h.OnUpdate(old, d.Object)
- } else {
- if err := clientState.Add(d.Object); err != nil {
- return err
- }
- h.OnAdd(d.Object)
- }
- case Deleted:
- if err := clientState.Delete(d.Object); err != nil {
- return err
- }
- h.OnDelete(d.Object)
- }
- }
- return nil
- },
- }
- return clientState, New(cfg)
- }
- // NewIndexerInformer returns a Indexer and a controller for populating the index
- // while also providing event notifications. You should only used the returned
- // Index for Get/List operations; Add/Modify/Deletes will cause the event
- // notifications to be faulty.
- //
- // Parameters:
- // * lw is list and watch functions for the source of the resource you want to
- // be informed of.
- // * objType is an object of the type that you expect to receive.
- // * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
- // calls, even if nothing changed). Otherwise, re-list will be delayed as
- // long as possible (until the upstream source closes the watch or times out,
- // or you stop the controller).
- // * h is the object you want notifications sent to.
- // * indexers is the indexer for the received object type.
- //
- func NewIndexerInformer(
- lw ListerWatcher,
- objType runtime.Object,
- resyncPeriod time.Duration,
- h ResourceEventHandler,
- indexers Indexers,
- ) (Indexer, Controller) {
- // This will hold the client state, as we know it.
- clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
- // This will hold incoming changes. Note how we pass clientState in as a
- // KeyLister, that way resync operations will result in the correct set
- // of update/delete deltas.
- fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState)
- cfg := &Config{
- Queue: fifo,
- ListerWatcher: lw,
- ObjectType: objType,
- FullResyncPeriod: resyncPeriod,
- RetryOnError: false,
- Process: func(obj interface{}) error {
- // from oldest to newest
- for _, d := range obj.(Deltas) {
- switch d.Type {
- case Sync, Added, Updated:
- if old, exists, err := clientState.Get(d.Object); err == nil && exists {
- if err := clientState.Update(d.Object); err != nil {
- return err
- }
- h.OnUpdate(old, d.Object)
- } else {
- if err := clientState.Add(d.Object); err != nil {
- return err
- }
- h.OnAdd(d.Object)
- }
- case Deleted:
- if err := clientState.Delete(d.Object); err != nil {
- return err
- }
- h.OnDelete(d.Object)
- }
- }
- return nil
- },
- }
- return clientState, New(cfg)
- }
|