lock.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. package api
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. const (
  8. // DefaultLockSessionName is the Session Name we assign if none is provided
  9. DefaultLockSessionName = "Consul API Lock"
  10. // DefaultLockSessionTTL is the default session TTL if no Session is provided
  11. // when creating a new Lock. This is used because we do not have another
  12. // other check to depend upon.
  13. DefaultLockSessionTTL = "15s"
  14. // DefaultLockWaitTime is how long we block for at a time to check if lock
  15. // acquisition is possible. This affects the minimum time it takes to cancel
  16. // a Lock acquisition.
  17. DefaultLockWaitTime = 15 * time.Second
  18. // DefaultLockRetryTime is how long we wait after a failed lock acquisition
  19. // before attempting to do the lock again. This is so that once a lock-delay
  20. // is in effect, we do not hot loop retrying the acquisition.
  21. DefaultLockRetryTime = 5 * time.Second
  22. // DefaultMonitorRetryTime is how long we wait after a failed monitor check
  23. // of a lock (500 response code). This allows the monitor to ride out brief
  24. // periods of unavailability, subject to the MonitorRetries setting in the
  25. // lock options which is by default set to 0, disabling this feature. This
  26. // affects locks and semaphores.
  27. DefaultMonitorRetryTime = 2 * time.Second
  28. // LockFlagValue is a magic flag we set to indicate a key
  29. // is being used for a lock. It is used to detect a potential
  30. // conflict with a semaphore.
  31. LockFlagValue = 0x2ddccbc058a50c18
  32. )
  33. var (
  34. // ErrLockHeld is returned if we attempt to double lock
  35. ErrLockHeld = fmt.Errorf("Lock already held")
  36. // ErrLockNotHeld is returned if we attempt to unlock a lock
  37. // that we do not hold.
  38. ErrLockNotHeld = fmt.Errorf("Lock not held")
  39. // ErrLockInUse is returned if we attempt to destroy a lock
  40. // that is in use.
  41. ErrLockInUse = fmt.Errorf("Lock in use")
  42. // ErrLockConflict is returned if the flags on a key
  43. // used for a lock do not match expectation
  44. ErrLockConflict = fmt.Errorf("Existing key does not match lock use")
  45. )
  46. // Lock is used to implement client-side leader election. It is follows the
  47. // algorithm as described here: https://www.consul.io/docs/guides/leader-election.html.
  48. type Lock struct {
  49. c *Client
  50. opts *LockOptions
  51. isHeld bool
  52. sessionRenew chan struct{}
  53. lockSession string
  54. l sync.Mutex
  55. }
  56. // LockOptions is used to parameterize the Lock behavior.
  57. type LockOptions struct {
  58. Key string // Must be set and have write permissions
  59. Value []byte // Optional, value to associate with the lock
  60. Session string // Optional, created if not specified
  61. SessionName string // Optional, defaults to DefaultLockSessionName
  62. SessionTTL string // Optional, defaults to DefaultLockSessionTTL
  63. MonitorRetries int // Optional, defaults to 0 which means no retries
  64. MonitorRetryTime time.Duration // Optional, defaults to DefaultMonitorRetryTime
  65. LockWaitTime time.Duration // Optional, defaults to DefaultLockWaitTime
  66. LockTryOnce bool // Optional, defaults to false which means try forever
  67. }
  68. // LockKey returns a handle to a lock struct which can be used
  69. // to acquire and release the mutex. The key used must have
  70. // write permissions.
  71. func (c *Client) LockKey(key string) (*Lock, error) {
  72. opts := &LockOptions{
  73. Key: key,
  74. }
  75. return c.LockOpts(opts)
  76. }
  77. // LockOpts returns a handle to a lock struct which can be used
  78. // to acquire and release the mutex. The key used must have
  79. // write permissions.
  80. func (c *Client) LockOpts(opts *LockOptions) (*Lock, error) {
  81. if opts.Key == "" {
  82. return nil, fmt.Errorf("missing key")
  83. }
  84. if opts.SessionName == "" {
  85. opts.SessionName = DefaultLockSessionName
  86. }
  87. if opts.SessionTTL == "" {
  88. opts.SessionTTL = DefaultLockSessionTTL
  89. } else {
  90. if _, err := time.ParseDuration(opts.SessionTTL); err != nil {
  91. return nil, fmt.Errorf("invalid SessionTTL: %v", err)
  92. }
  93. }
  94. if opts.MonitorRetryTime == 0 {
  95. opts.MonitorRetryTime = DefaultMonitorRetryTime
  96. }
  97. if opts.LockWaitTime == 0 {
  98. opts.LockWaitTime = DefaultLockWaitTime
  99. }
  100. l := &Lock{
  101. c: c,
  102. opts: opts,
  103. }
  104. return l, nil
  105. }
  106. // Lock attempts to acquire the lock and blocks while doing so.
  107. // Providing a non-nil stopCh can be used to abort the lock attempt.
  108. // Returns a channel that is closed if our lock is lost or an error.
  109. // This channel could be closed at any time due to session invalidation,
  110. // communication errors, operator intervention, etc. It is NOT safe to
  111. // assume that the lock is held until Unlock() unless the Session is specifically
  112. // created without any associated health checks. By default Consul sessions
  113. // prefer liveness over safety and an application must be able to handle
  114. // the lock being lost.
  115. func (l *Lock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
  116. // Hold the lock as we try to acquire
  117. l.l.Lock()
  118. defer l.l.Unlock()
  119. // Check if we already hold the lock
  120. if l.isHeld {
  121. return nil, ErrLockHeld
  122. }
  123. // Check if we need to create a session first
  124. l.lockSession = l.opts.Session
  125. if l.lockSession == "" {
  126. if s, err := l.createSession(); err != nil {
  127. return nil, fmt.Errorf("failed to create session: %v", err)
  128. } else {
  129. l.sessionRenew = make(chan struct{})
  130. l.lockSession = s
  131. session := l.c.Session()
  132. go session.RenewPeriodic(l.opts.SessionTTL, s, nil, l.sessionRenew)
  133. // If we fail to acquire the lock, cleanup the session
  134. defer func() {
  135. if !l.isHeld {
  136. close(l.sessionRenew)
  137. l.sessionRenew = nil
  138. }
  139. }()
  140. }
  141. }
  142. // Setup the query options
  143. kv := l.c.KV()
  144. qOpts := &QueryOptions{
  145. WaitTime: l.opts.LockWaitTime,
  146. }
  147. start := time.Now()
  148. attempts := 0
  149. WAIT:
  150. // Check if we should quit
  151. select {
  152. case <-stopCh:
  153. return nil, nil
  154. default:
  155. }
  156. // Handle the one-shot mode.
  157. if l.opts.LockTryOnce && attempts > 0 {
  158. elapsed := time.Now().Sub(start)
  159. if elapsed > qOpts.WaitTime {
  160. return nil, nil
  161. }
  162. qOpts.WaitTime -= elapsed
  163. }
  164. attempts++
  165. // Look for an existing lock, blocking until not taken
  166. pair, meta, err := kv.Get(l.opts.Key, qOpts)
  167. if err != nil {
  168. return nil, fmt.Errorf("failed to read lock: %v", err)
  169. }
  170. if pair != nil && pair.Flags != LockFlagValue {
  171. return nil, ErrLockConflict
  172. }
  173. locked := false
  174. if pair != nil && pair.Session == l.lockSession {
  175. goto HELD
  176. }
  177. if pair != nil && pair.Session != "" {
  178. qOpts.WaitIndex = meta.LastIndex
  179. goto WAIT
  180. }
  181. // Try to acquire the lock
  182. pair = l.lockEntry(l.lockSession)
  183. locked, _, err = kv.Acquire(pair, nil)
  184. if err != nil {
  185. return nil, fmt.Errorf("failed to acquire lock: %v", err)
  186. }
  187. // Handle the case of not getting the lock
  188. if !locked {
  189. // Determine why the lock failed
  190. qOpts.WaitIndex = 0
  191. pair, meta, err = kv.Get(l.opts.Key, qOpts)
  192. if pair != nil && pair.Session != "" {
  193. //If the session is not null, this means that a wait can safely happen
  194. //using a long poll
  195. qOpts.WaitIndex = meta.LastIndex
  196. goto WAIT
  197. } else {
  198. // If the session is empty and the lock failed to acquire, then it means
  199. // a lock-delay is in effect and a timed wait must be used
  200. select {
  201. case <-time.After(DefaultLockRetryTime):
  202. goto WAIT
  203. case <-stopCh:
  204. return nil, nil
  205. }
  206. }
  207. }
  208. HELD:
  209. // Watch to ensure we maintain leadership
  210. leaderCh := make(chan struct{})
  211. go l.monitorLock(l.lockSession, leaderCh)
  212. // Set that we own the lock
  213. l.isHeld = true
  214. // Locked! All done
  215. return leaderCh, nil
  216. }
  217. // Unlock released the lock. It is an error to call this
  218. // if the lock is not currently held.
  219. func (l *Lock) Unlock() error {
  220. // Hold the lock as we try to release
  221. l.l.Lock()
  222. defer l.l.Unlock()
  223. // Ensure the lock is actually held
  224. if !l.isHeld {
  225. return ErrLockNotHeld
  226. }
  227. // Set that we no longer own the lock
  228. l.isHeld = false
  229. // Stop the session renew
  230. if l.sessionRenew != nil {
  231. defer func() {
  232. close(l.sessionRenew)
  233. l.sessionRenew = nil
  234. }()
  235. }
  236. // Get the lock entry, and clear the lock session
  237. lockEnt := l.lockEntry(l.lockSession)
  238. l.lockSession = ""
  239. // Release the lock explicitly
  240. kv := l.c.KV()
  241. _, _, err := kv.Release(lockEnt, nil)
  242. if err != nil {
  243. return fmt.Errorf("failed to release lock: %v", err)
  244. }
  245. return nil
  246. }
  247. // Destroy is used to cleanup the lock entry. It is not necessary
  248. // to invoke. It will fail if the lock is in use.
  249. func (l *Lock) Destroy() error {
  250. // Hold the lock as we try to release
  251. l.l.Lock()
  252. defer l.l.Unlock()
  253. // Check if we already hold the lock
  254. if l.isHeld {
  255. return ErrLockHeld
  256. }
  257. // Look for an existing lock
  258. kv := l.c.KV()
  259. pair, _, err := kv.Get(l.opts.Key, nil)
  260. if err != nil {
  261. return fmt.Errorf("failed to read lock: %v", err)
  262. }
  263. // Nothing to do if the lock does not exist
  264. if pair == nil {
  265. return nil
  266. }
  267. // Check for possible flag conflict
  268. if pair.Flags != LockFlagValue {
  269. return ErrLockConflict
  270. }
  271. // Check if it is in use
  272. if pair.Session != "" {
  273. return ErrLockInUse
  274. }
  275. // Attempt the delete
  276. didRemove, _, err := kv.DeleteCAS(pair, nil)
  277. if err != nil {
  278. return fmt.Errorf("failed to remove lock: %v", err)
  279. }
  280. if !didRemove {
  281. return ErrLockInUse
  282. }
  283. return nil
  284. }
  285. // createSession is used to create a new managed session
  286. func (l *Lock) createSession() (string, error) {
  287. session := l.c.Session()
  288. se := &SessionEntry{
  289. Name: l.opts.SessionName,
  290. TTL: l.opts.SessionTTL,
  291. }
  292. id, _, err := session.Create(se, nil)
  293. if err != nil {
  294. return "", err
  295. }
  296. return id, nil
  297. }
  298. // lockEntry returns a formatted KVPair for the lock
  299. func (l *Lock) lockEntry(session string) *KVPair {
  300. return &KVPair{
  301. Key: l.opts.Key,
  302. Value: l.opts.Value,
  303. Session: session,
  304. Flags: LockFlagValue,
  305. }
  306. }
  307. // monitorLock is a long running routine to monitor a lock ownership
  308. // It closes the stopCh if we lose our leadership.
  309. func (l *Lock) monitorLock(session string, stopCh chan struct{}) {
  310. defer close(stopCh)
  311. kv := l.c.KV()
  312. opts := &QueryOptions{RequireConsistent: true}
  313. WAIT:
  314. retries := l.opts.MonitorRetries
  315. RETRY:
  316. pair, meta, err := kv.Get(l.opts.Key, opts)
  317. if err != nil {
  318. // If configured we can try to ride out a brief Consul unavailability
  319. // by doing retries. Note that we have to attempt the retry in a non-
  320. // blocking fashion so that we have a clean place to reset the retry
  321. // counter if service is restored.
  322. if retries > 0 && IsServerError(err) {
  323. time.Sleep(l.opts.MonitorRetryTime)
  324. retries--
  325. opts.WaitIndex = 0
  326. goto RETRY
  327. }
  328. return
  329. }
  330. if pair != nil && pair.Session == session {
  331. opts.WaitIndex = meta.LastIndex
  332. goto WAIT
  333. }
  334. }