123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package cache
- import (
- "sync"
- "time"
- "k8s.io/apimachinery/pkg/util/clock"
- "k8s.io/klog"
- )
- // ExpirationCache implements the store interface
- // 1. All entries are automatically time stamped on insert
- // a. The key is computed based off the original item/keyFunc
- // b. The value inserted under that key is the timestamped item
- // 2. Expiration happens lazily on read based on the expiration policy
- // a. No item can be inserted into the store while we're expiring
- // *any* item in the cache.
- // 3. Time-stamps are stripped off unexpired entries before return
- // Note that the ExpirationCache is inherently slower than a normal
- // threadSafeStore because it takes a write lock every time it checks if
- // an item has expired.
- type ExpirationCache struct {
- cacheStorage ThreadSafeStore
- keyFunc KeyFunc
- clock clock.Clock
- expirationPolicy ExpirationPolicy
- // expirationLock is a write lock used to guarantee that we don't clobber
- // newly inserted objects because of a stale expiration timestamp comparison
- expirationLock sync.Mutex
- }
- // ExpirationPolicy dictates when an object expires. Currently only abstracted out
- // so unittests don't rely on the system clock.
- type ExpirationPolicy interface {
- IsExpired(obj *timestampedEntry) bool
- }
- // TTLPolicy implements a ttl based ExpirationPolicy.
- type TTLPolicy struct {
- // >0: Expire entries with an age > ttl
- // <=0: Don't expire any entry
- Ttl time.Duration
- // Clock used to calculate ttl expiration
- Clock clock.Clock
- }
- // IsExpired returns true if the given object is older than the ttl, or it can't
- // determine its age.
- func (p *TTLPolicy) IsExpired(obj *timestampedEntry) bool {
- return p.Ttl > 0 && p.Clock.Since(obj.timestamp) > p.Ttl
- }
- // timestampedEntry is the only type allowed in a ExpirationCache.
- type timestampedEntry struct {
- obj interface{}
- timestamp time.Time
- }
- // getTimestampedEntry returns the timestampedEntry stored under the given key.
- func (c *ExpirationCache) getTimestampedEntry(key string) (*timestampedEntry, bool) {
- item, _ := c.cacheStorage.Get(key)
- if tsEntry, ok := item.(*timestampedEntry); ok {
- return tsEntry, true
- }
- return nil, false
- }
- // getOrExpire retrieves the object from the timestampedEntry if and only if it hasn't
- // already expired. It holds a write lock across deletion.
- func (c *ExpirationCache) getOrExpire(key string) (interface{}, bool) {
- // Prevent all inserts from the time we deem an item as "expired" to when we
- // delete it, so an un-expired item doesn't sneak in under the same key, just
- // before the Delete.
- c.expirationLock.Lock()
- defer c.expirationLock.Unlock()
- timestampedItem, exists := c.getTimestampedEntry(key)
- if !exists {
- return nil, false
- }
- if c.expirationPolicy.IsExpired(timestampedItem) {
- klog.V(4).Infof("Entry %v: %+v has expired", key, timestampedItem.obj)
- c.cacheStorage.Delete(key)
- return nil, false
- }
- return timestampedItem.obj, true
- }
- // GetByKey returns the item stored under the key, or sets exists=false.
- func (c *ExpirationCache) GetByKey(key string) (interface{}, bool, error) {
- obj, exists := c.getOrExpire(key)
- return obj, exists, nil
- }
- // Get returns unexpired items. It purges the cache of expired items in the
- // process.
- func (c *ExpirationCache) Get(obj interface{}) (interface{}, bool, error) {
- key, err := c.keyFunc(obj)
- if err != nil {
- return nil, false, KeyError{obj, err}
- }
- obj, exists := c.getOrExpire(key)
- return obj, exists, nil
- }
- // List retrieves a list of unexpired items. It purges the cache of expired
- // items in the process.
- func (c *ExpirationCache) List() []interface{} {
- items := c.cacheStorage.List()
- list := make([]interface{}, 0, len(items))
- for _, item := range items {
- obj := item.(*timestampedEntry).obj
- if key, err := c.keyFunc(obj); err != nil {
- list = append(list, obj)
- } else if obj, exists := c.getOrExpire(key); exists {
- list = append(list, obj)
- }
- }
- return list
- }
- // ListKeys returns a list of all keys in the expiration cache.
- func (c *ExpirationCache) ListKeys() []string {
- return c.cacheStorage.ListKeys()
- }
- // Add timestamps an item and inserts it into the cache, overwriting entries
- // that might exist under the same key.
- func (c *ExpirationCache) Add(obj interface{}) error {
- c.expirationLock.Lock()
- defer c.expirationLock.Unlock()
- key, err := c.keyFunc(obj)
- if err != nil {
- return KeyError{obj, err}
- }
- c.cacheStorage.Add(key, ×tampedEntry{obj, c.clock.Now()})
- return nil
- }
- // Update has not been implemented yet for lack of a use case, so this method
- // simply calls `Add`. This effectively refreshes the timestamp.
- func (c *ExpirationCache) Update(obj interface{}) error {
- return c.Add(obj)
- }
- // Delete removes an item from the cache.
- func (c *ExpirationCache) Delete(obj interface{}) error {
- c.expirationLock.Lock()
- defer c.expirationLock.Unlock()
- key, err := c.keyFunc(obj)
- if err != nil {
- return KeyError{obj, err}
- }
- c.cacheStorage.Delete(key)
- return nil
- }
- // Replace will convert all items in the given list to TimestampedEntries
- // before attempting the replace operation. The replace operation will
- // delete the contents of the ExpirationCache `c`.
- func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) error {
- c.expirationLock.Lock()
- defer c.expirationLock.Unlock()
- items := make(map[string]interface{}, len(list))
- ts := c.clock.Now()
- for _, item := range list {
- key, err := c.keyFunc(item)
- if err != nil {
- return KeyError{item, err}
- }
- items[key] = ×tampedEntry{item, ts}
- }
- c.cacheStorage.Replace(items, resourceVersion)
- return nil
- }
- // Resync will touch all objects to put them into the processing queue
- func (c *ExpirationCache) Resync() error {
- return c.cacheStorage.Resync()
- }
- // NewTTLStore creates and returns a ExpirationCache with a TTLPolicy
- func NewTTLStore(keyFunc KeyFunc, ttl time.Duration) Store {
- return &ExpirationCache{
- cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
- keyFunc: keyFunc,
- clock: clock.RealClock{},
- expirationPolicy: &TTLPolicy{ttl, clock.RealClock{}},
- }
- }
|