reflector.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "errors"
  16. "fmt"
  17. "io"
  18. "math/rand"
  19. "net"
  20. "net/url"
  21. "reflect"
  22. "strconv"
  23. "strings"
  24. "sync"
  25. "sync/atomic"
  26. "syscall"
  27. "time"
  28. apierrs "k8s.io/apimachinery/pkg/api/errors"
  29. "k8s.io/apimachinery/pkg/api/meta"
  30. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  31. "k8s.io/apimachinery/pkg/runtime"
  32. "k8s.io/apimachinery/pkg/util/clock"
  33. "k8s.io/apimachinery/pkg/util/naming"
  34. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  35. "k8s.io/apimachinery/pkg/util/wait"
  36. "k8s.io/apimachinery/pkg/watch"
  37. "k8s.io/klog"
  38. )
  39. // Reflector watches a specified resource and causes all changes to be reflected in the given store.
  40. type Reflector struct {
  41. // name identifies this reflector. By default it will be a file:line if possible.
  42. name string
  43. // metrics tracks basic metric information about the reflector
  44. metrics *reflectorMetrics
  45. // The type of object we expect to place in the store.
  46. expectedType reflect.Type
  47. // The destination to sync up with the watch source
  48. store Store
  49. // listerWatcher is used to perform lists and watches.
  50. listerWatcher ListerWatcher
  51. // period controls timing between one watch ending and
  52. // the beginning of the next one.
  53. period time.Duration
  54. resyncPeriod time.Duration
  55. ShouldResync func() bool
  56. // clock allows tests to manipulate time
  57. clock clock.Clock
  58. // lastSyncResourceVersion is the resource version token last
  59. // observed when doing a sync with the underlying store
  60. // it is thread safe, but not synchronized with the underlying store
  61. lastSyncResourceVersion string
  62. // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
  63. lastSyncResourceVersionMutex sync.RWMutex
  64. }
  65. var (
  66. // We try to spread the load on apiserver by setting timeouts for
  67. // watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
  68. minWatchTimeout = 5 * time.Minute
  69. )
  70. // NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
  71. // The indexer is configured to key on namespace
  72. func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
  73. indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{"namespace": MetaNamespaceIndexFunc})
  74. reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
  75. return indexer, reflector
  76. }
  77. // NewReflector creates a new Reflector object which will keep the given store up to
  78. // date with the server's contents for the given resource. Reflector promises to
  79. // only put things in the store that have the type of expectedType, unless expectedType
  80. // is nil. If resyncPeriod is non-zero, then lists will be executed after every
  81. // resyncPeriod, so that you can use reflectors to periodically process everything as
  82. // well as incrementally processing the things that change.
  83. func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
  84. return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
  85. }
  86. // reflectorDisambiguator is used to disambiguate started reflectors.
  87. // initialized to an unstable value to ensure meaning isn't attributed to the suffix.
  88. var reflectorDisambiguator = int64(time.Now().UnixNano() % 12345)
  89. // NewNamedReflector same as NewReflector, but with a specified name for logging
  90. func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
  91. reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1)
  92. r := &Reflector{
  93. name: name,
  94. // we need this to be unique per process (some names are still the same) but obvious who it belongs to
  95. metrics: newReflectorMetrics(makeValidPrometheusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))),
  96. listerWatcher: lw,
  97. store: store,
  98. expectedType: reflect.TypeOf(expectedType),
  99. period: time.Second,
  100. resyncPeriod: resyncPeriod,
  101. clock: &clock.RealClock{},
  102. }
  103. return r
  104. }
  105. func makeValidPrometheusMetricLabel(in string) string {
  106. // this isn't perfect, but it removes our common characters
  107. return strings.NewReplacer("/", "_", ".", "_", "-", "_", ":", "_").Replace(in)
  108. }
  109. // internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
  110. // call chains to NewReflector, so they'd be low entropy names for reflectors
  111. var internalPackages = []string{"client-go/tools/cache/"}
  112. // Run starts a watch and handles watch events. Will restart the watch if it is closed.
  113. // Run will exit when stopCh is closed.
  114. func (r *Reflector) Run(stopCh <-chan struct{}) {
  115. klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
  116. wait.Until(func() {
  117. if err := r.ListAndWatch(stopCh); err != nil {
  118. utilruntime.HandleError(err)
  119. }
  120. }, r.period, stopCh)
  121. }
  122. var (
  123. // nothing will ever be sent down this channel
  124. neverExitWatch <-chan time.Time = make(chan time.Time)
  125. // Used to indicate that watching stopped so that a resync could happen.
  126. errorResyncRequested = errors.New("resync channel fired")
  127. // Used to indicate that watching stopped because of a signal from the stop
  128. // channel passed in from a client of the reflector.
  129. errorStopRequested = errors.New("Stop requested")
  130. )
  131. // resyncChan returns a channel which will receive something when a resync is
  132. // required, and a cleanup function.
  133. func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
  134. if r.resyncPeriod == 0 {
  135. return neverExitWatch, func() bool { return false }
  136. }
  137. // The cleanup function is required: imagine the scenario where watches
  138. // always fail so we end up listing frequently. Then, if we don't
  139. // manually stop the timer, we could end up with many timers active
  140. // concurrently.
  141. t := r.clock.NewTimer(r.resyncPeriod)
  142. return t.C(), t.Stop
  143. }
  144. // ListAndWatch first lists all items and get the resource version at the moment of call,
  145. // and then use the resource version to watch.
  146. // It returns error if ListAndWatch didn't even try to initialize watch.
  147. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
  148. klog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
  149. var resourceVersion string
  150. // Explicitly set "0" as resource version - it's fine for the List()
  151. // to be served from cache and potentially be delayed relative to
  152. // etcd contents. Reflector framework will catch up via Watch() eventually.
  153. options := metav1.ListOptions{ResourceVersion: "0"}
  154. r.metrics.numberOfLists.Inc()
  155. start := r.clock.Now()
  156. list, err := r.listerWatcher.List(options)
  157. if err != nil {
  158. return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
  159. }
  160. r.metrics.listDuration.Observe(time.Since(start).Seconds())
  161. listMetaInterface, err := meta.ListAccessor(list)
  162. if err != nil {
  163. return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
  164. }
  165. resourceVersion = listMetaInterface.GetResourceVersion()
  166. items, err := meta.ExtractList(list)
  167. if err != nil {
  168. return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
  169. }
  170. r.metrics.numberOfItemsInList.Observe(float64(len(items)))
  171. if err := r.syncWith(items, resourceVersion); err != nil {
  172. return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
  173. }
  174. r.setLastSyncResourceVersion(resourceVersion)
  175. resyncerrc := make(chan error, 1)
  176. cancelCh := make(chan struct{})
  177. defer close(cancelCh)
  178. go func() {
  179. resyncCh, cleanup := r.resyncChan()
  180. defer func() {
  181. cleanup() // Call the last one written into cleanup
  182. }()
  183. for {
  184. select {
  185. case <-resyncCh:
  186. case <-stopCh:
  187. return
  188. case <-cancelCh:
  189. return
  190. }
  191. if r.ShouldResync == nil || r.ShouldResync() {
  192. klog.V(4).Infof("%s: forcing resync", r.name)
  193. if err := r.store.Resync(); err != nil {
  194. resyncerrc <- err
  195. return
  196. }
  197. }
  198. cleanup()
  199. resyncCh, cleanup = r.resyncChan()
  200. }
  201. }()
  202. for {
  203. // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
  204. select {
  205. case <-stopCh:
  206. return nil
  207. default:
  208. }
  209. timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
  210. options = metav1.ListOptions{
  211. ResourceVersion: resourceVersion,
  212. // We want to avoid situations of hanging watchers. Stop any wachers that do not
  213. // receive any events within the timeout window.
  214. TimeoutSeconds: &timeoutSeconds,
  215. }
  216. r.metrics.numberOfWatches.Inc()
  217. w, err := r.listerWatcher.Watch(options)
  218. if err != nil {
  219. switch err {
  220. case io.EOF:
  221. // watch closed normally
  222. case io.ErrUnexpectedEOF:
  223. klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)
  224. default:
  225. utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
  226. }
  227. // If this is "connection refused" error, it means that most likely apiserver is not responsive.
  228. // It doesn't make sense to re-list all objects because most likely we will be able to restart
  229. // watch where we ended.
  230. // If that's the case wait and resend watch request.
  231. if urlError, ok := err.(*url.Error); ok {
  232. if opError, ok := urlError.Err.(*net.OpError); ok {
  233. if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
  234. time.Sleep(time.Second)
  235. continue
  236. }
  237. }
  238. }
  239. return nil
  240. }
  241. if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
  242. if err != errorStopRequested {
  243. klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
  244. }
  245. return nil
  246. }
  247. }
  248. }
  249. // syncWith replaces the store's items with the given list.
  250. func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
  251. found := make([]interface{}, 0, len(items))
  252. for _, item := range items {
  253. found = append(found, item)
  254. }
  255. return r.store.Replace(found, resourceVersion)
  256. }
  257. // watchHandler watches w and keeps *resourceVersion up to date.
  258. func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
  259. start := r.clock.Now()
  260. eventCount := 0
  261. // Stopping the watcher should be idempotent and if we return from this function there's no way
  262. // we're coming back in with the same watch interface.
  263. defer w.Stop()
  264. // update metrics
  265. defer func() {
  266. r.metrics.numberOfItemsInWatch.Observe(float64(eventCount))
  267. r.metrics.watchDuration.Observe(time.Since(start).Seconds())
  268. }()
  269. loop:
  270. for {
  271. select {
  272. case <-stopCh:
  273. return errorStopRequested
  274. case err := <-errc:
  275. return err
  276. case event, ok := <-w.ResultChan():
  277. if !ok {
  278. break loop
  279. }
  280. if event.Type == watch.Error {
  281. return apierrs.FromObject(event.Object)
  282. }
  283. if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
  284. utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
  285. continue
  286. }
  287. meta, err := meta.Accessor(event.Object)
  288. if err != nil {
  289. utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
  290. continue
  291. }
  292. newResourceVersion := meta.GetResourceVersion()
  293. switch event.Type {
  294. case watch.Added:
  295. err := r.store.Add(event.Object)
  296. if err != nil {
  297. utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
  298. }
  299. case watch.Modified:
  300. err := r.store.Update(event.Object)
  301. if err != nil {
  302. utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
  303. }
  304. case watch.Deleted:
  305. // TODO: Will any consumers need access to the "last known
  306. // state", which is passed in event.Object? If so, may need
  307. // to change this.
  308. err := r.store.Delete(event.Object)
  309. if err != nil {
  310. utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
  311. }
  312. default:
  313. utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
  314. }
  315. *resourceVersion = newResourceVersion
  316. r.setLastSyncResourceVersion(newResourceVersion)
  317. eventCount++
  318. }
  319. }
  320. watchDuration := r.clock.Now().Sub(start)
  321. if watchDuration < 1*time.Second && eventCount == 0 {
  322. r.metrics.numberOfShortWatches.Inc()
  323. return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
  324. }
  325. klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
  326. return nil
  327. }
  328. // LastSyncResourceVersion is the resource version observed when last sync with the underlying store
  329. // The value returned is not synchronized with access to the underlying store and is not thread-safe
  330. func (r *Reflector) LastSyncResourceVersion() string {
  331. r.lastSyncResourceVersionMutex.RLock()
  332. defer r.lastSyncResourceVersionMutex.RUnlock()
  333. return r.lastSyncResourceVersion
  334. }
  335. func (r *Reflector) setLastSyncResourceVersion(v string) {
  336. r.lastSyncResourceVersionMutex.Lock()
  337. defer r.lastSyncResourceVersionMutex.Unlock()
  338. r.lastSyncResourceVersion = v
  339. rv, err := strconv.Atoi(v)
  340. if err == nil {
  341. r.metrics.lastResourceVersion.Set(float64(rv))
  342. }
  343. }