session.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. package api
  2. import (
  3. "errors"
  4. "fmt"
  5. "time"
  6. )
  7. const (
  8. // SessionBehaviorRelease is the default behavior and causes
  9. // all associated locks to be released on session invalidation.
  10. SessionBehaviorRelease = "release"
  11. // SessionBehaviorDelete is new in Consul 0.5 and changes the
  12. // behavior to delete all associated locks on session invalidation.
  13. // It can be used in a way similar to Ephemeral Nodes in ZooKeeper.
  14. SessionBehaviorDelete = "delete"
  15. )
  16. var ErrSessionExpired = errors.New("session expired")
  17. // SessionEntry represents a session in consul
  18. type SessionEntry struct {
  19. CreateIndex uint64
  20. ID string
  21. Name string
  22. Node string
  23. Checks []string
  24. LockDelay time.Duration
  25. Behavior string
  26. TTL string
  27. }
  28. // Session can be used to query the Session endpoints
  29. type Session struct {
  30. c *Client
  31. }
  32. // Session returns a handle to the session endpoints
  33. func (c *Client) Session() *Session {
  34. return &Session{c}
  35. }
  36. // CreateNoChecks is like Create but is used specifically to create
  37. // a session with no associated health checks.
  38. func (s *Session) CreateNoChecks(se *SessionEntry, q *WriteOptions) (string, *WriteMeta, error) {
  39. body := make(map[string]interface{})
  40. body["Checks"] = []string{}
  41. if se != nil {
  42. if se.Name != "" {
  43. body["Name"] = se.Name
  44. }
  45. if se.Node != "" {
  46. body["Node"] = se.Node
  47. }
  48. if se.LockDelay != 0 {
  49. body["LockDelay"] = durToMsec(se.LockDelay)
  50. }
  51. if se.Behavior != "" {
  52. body["Behavior"] = se.Behavior
  53. }
  54. if se.TTL != "" {
  55. body["TTL"] = se.TTL
  56. }
  57. }
  58. return s.create(body, q)
  59. }
  60. // Create makes a new session. Providing a session entry can
  61. // customize the session. It can also be nil to use defaults.
  62. func (s *Session) Create(se *SessionEntry, q *WriteOptions) (string, *WriteMeta, error) {
  63. var obj interface{}
  64. if se != nil {
  65. body := make(map[string]interface{})
  66. obj = body
  67. if se.Name != "" {
  68. body["Name"] = se.Name
  69. }
  70. if se.Node != "" {
  71. body["Node"] = se.Node
  72. }
  73. if se.LockDelay != 0 {
  74. body["LockDelay"] = durToMsec(se.LockDelay)
  75. }
  76. if len(se.Checks) > 0 {
  77. body["Checks"] = se.Checks
  78. }
  79. if se.Behavior != "" {
  80. body["Behavior"] = se.Behavior
  81. }
  82. if se.TTL != "" {
  83. body["TTL"] = se.TTL
  84. }
  85. }
  86. return s.create(obj, q)
  87. }
  88. func (s *Session) create(obj interface{}, q *WriteOptions) (string, *WriteMeta, error) {
  89. var out struct{ ID string }
  90. wm, err := s.c.write("/v1/session/create", obj, &out, q)
  91. if err != nil {
  92. return "", nil, err
  93. }
  94. return out.ID, wm, nil
  95. }
  96. // Destroy invalidates a given session
  97. func (s *Session) Destroy(id string, q *WriteOptions) (*WriteMeta, error) {
  98. wm, err := s.c.write("/v1/session/destroy/"+id, nil, nil, q)
  99. if err != nil {
  100. return nil, err
  101. }
  102. return wm, nil
  103. }
  104. // Renew renews the TTL on a given session
  105. func (s *Session) Renew(id string, q *WriteOptions) (*SessionEntry, *WriteMeta, error) {
  106. r := s.c.newRequest("PUT", "/v1/session/renew/"+id)
  107. r.setWriteOptions(q)
  108. rtt, resp, err := s.c.doRequest(r)
  109. if err != nil {
  110. return nil, nil, err
  111. }
  112. defer resp.Body.Close()
  113. wm := &WriteMeta{RequestTime: rtt}
  114. if resp.StatusCode == 404 {
  115. return nil, wm, nil
  116. } else if resp.StatusCode != 200 {
  117. return nil, nil, fmt.Errorf("Unexpected response code: %d", resp.StatusCode)
  118. }
  119. var entries []*SessionEntry
  120. if err := decodeBody(resp, &entries); err != nil {
  121. return nil, nil, fmt.Errorf("Failed to read response: %v", err)
  122. }
  123. if len(entries) > 0 {
  124. return entries[0], wm, nil
  125. }
  126. return nil, wm, nil
  127. }
  128. // RenewPeriodic is used to periodically invoke Session.Renew on a
  129. // session until a doneCh is closed. This is meant to be used in a long running
  130. // goroutine to ensure a session stays valid.
  131. func (s *Session) RenewPeriodic(initialTTL string, id string, q *WriteOptions, doneCh chan struct{}) error {
  132. ttl, err := time.ParseDuration(initialTTL)
  133. if err != nil {
  134. return err
  135. }
  136. waitDur := ttl / 2
  137. lastRenewTime := time.Now()
  138. var lastErr error
  139. for {
  140. if time.Since(lastRenewTime) > ttl {
  141. return lastErr
  142. }
  143. select {
  144. case <-time.After(waitDur):
  145. entry, _, err := s.Renew(id, q)
  146. if err != nil {
  147. waitDur = time.Second
  148. lastErr = err
  149. continue
  150. }
  151. if entry == nil {
  152. return ErrSessionExpired
  153. }
  154. // Handle the server updating the TTL
  155. ttl, _ = time.ParseDuration(entry.TTL)
  156. waitDur = ttl / 2
  157. lastRenewTime = time.Now()
  158. case <-doneCh:
  159. // Attempt a session destroy
  160. s.Destroy(id, q)
  161. return nil
  162. }
  163. }
  164. }
  165. // Info looks up a single session
  166. func (s *Session) Info(id string, q *QueryOptions) (*SessionEntry, *QueryMeta, error) {
  167. var entries []*SessionEntry
  168. qm, err := s.c.query("/v1/session/info/"+id, &entries, q)
  169. if err != nil {
  170. return nil, nil, err
  171. }
  172. if len(entries) > 0 {
  173. return entries[0], qm, nil
  174. }
  175. return nil, qm, nil
  176. }
  177. // List gets sessions for a node
  178. func (s *Session) Node(node string, q *QueryOptions) ([]*SessionEntry, *QueryMeta, error) {
  179. var entries []*SessionEntry
  180. qm, err := s.c.query("/v1/session/node/"+node, &entries, q)
  181. if err != nil {
  182. return nil, nil, err
  183. }
  184. return entries, qm, nil
  185. }
  186. // List gets all active sessions
  187. func (s *Session) List(q *QueryOptions) ([]*SessionEntry, *QueryMeta, error) {
  188. var entries []*SessionEntry
  189. qm, err := s.c.query("/v1/session/list", &entries, q)
  190. if err != nil {
  191. return nil, nil, err
  192. }
  193. return entries, qm, nil
  194. }