kv.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. package api
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "net/http"
  7. "strconv"
  8. "strings"
  9. )
  10. // KVPair is used to represent a single K/V entry
  11. type KVPair struct {
  12. Key string
  13. CreateIndex uint64
  14. ModifyIndex uint64
  15. LockIndex uint64
  16. Flags uint64
  17. Value []byte
  18. Session string
  19. }
  20. // KVPairs is a list of KVPair objects
  21. type KVPairs []*KVPair
  22. // KV is used to manipulate the K/V API
  23. type KV struct {
  24. c *Client
  25. }
  26. // KV is used to return a handle to the K/V apis
  27. func (c *Client) KV() *KV {
  28. return &KV{c}
  29. }
  30. // Get is used to lookup a single key
  31. func (k *KV) Get(key string, q *QueryOptions) (*KVPair, *QueryMeta, error) {
  32. resp, qm, err := k.getInternal(key, nil, q)
  33. if err != nil {
  34. return nil, nil, err
  35. }
  36. if resp == nil {
  37. return nil, qm, nil
  38. }
  39. defer resp.Body.Close()
  40. var entries []*KVPair
  41. if err := decodeBody(resp, &entries); err != nil {
  42. return nil, nil, err
  43. }
  44. if len(entries) > 0 {
  45. return entries[0], qm, nil
  46. }
  47. return nil, qm, nil
  48. }
  49. // List is used to lookup all keys under a prefix
  50. func (k *KV) List(prefix string, q *QueryOptions) (KVPairs, *QueryMeta, error) {
  51. resp, qm, err := k.getInternal(prefix, map[string]string{"recurse": ""}, q)
  52. if err != nil {
  53. return nil, nil, err
  54. }
  55. if resp == nil {
  56. return nil, qm, nil
  57. }
  58. defer resp.Body.Close()
  59. var entries []*KVPair
  60. if err := decodeBody(resp, &entries); err != nil {
  61. return nil, nil, err
  62. }
  63. return entries, qm, nil
  64. }
  65. // Keys is used to list all the keys under a prefix. Optionally,
  66. // a separator can be used to limit the responses.
  67. func (k *KV) Keys(prefix, separator string, q *QueryOptions) ([]string, *QueryMeta, error) {
  68. params := map[string]string{"keys": ""}
  69. if separator != "" {
  70. params["separator"] = separator
  71. }
  72. resp, qm, err := k.getInternal(prefix, params, q)
  73. if err != nil {
  74. return nil, nil, err
  75. }
  76. if resp == nil {
  77. return nil, qm, nil
  78. }
  79. defer resp.Body.Close()
  80. var entries []string
  81. if err := decodeBody(resp, &entries); err != nil {
  82. return nil, nil, err
  83. }
  84. return entries, qm, nil
  85. }
  86. func (k *KV) getInternal(key string, params map[string]string, q *QueryOptions) (*http.Response, *QueryMeta, error) {
  87. r := k.c.newRequest("GET", "/v1/kv/"+key)
  88. r.setQueryOptions(q)
  89. for param, val := range params {
  90. r.params.Set(param, val)
  91. }
  92. rtt, resp, err := k.c.doRequest(r)
  93. if err != nil {
  94. return nil, nil, err
  95. }
  96. qm := &QueryMeta{}
  97. parseQueryMeta(resp, qm)
  98. qm.RequestTime = rtt
  99. if resp.StatusCode == 404 {
  100. resp.Body.Close()
  101. return nil, qm, nil
  102. } else if resp.StatusCode != 200 {
  103. resp.Body.Close()
  104. return nil, nil, fmt.Errorf("Unexpected response code: %d", resp.StatusCode)
  105. }
  106. return resp, qm, nil
  107. }
  108. // Put is used to write a new value. Only the
  109. // Key, Flags and Value is respected.
  110. func (k *KV) Put(p *KVPair, q *WriteOptions) (*WriteMeta, error) {
  111. params := make(map[string]string, 1)
  112. if p.Flags != 0 {
  113. params["flags"] = strconv.FormatUint(p.Flags, 10)
  114. }
  115. _, wm, err := k.put(p.Key, params, p.Value, q)
  116. return wm, err
  117. }
  118. // CAS is used for a Check-And-Set operation. The Key,
  119. // ModifyIndex, Flags and Value are respected. Returns true
  120. // on success or false on failures.
  121. func (k *KV) CAS(p *KVPair, q *WriteOptions) (bool, *WriteMeta, error) {
  122. params := make(map[string]string, 2)
  123. if p.Flags != 0 {
  124. params["flags"] = strconv.FormatUint(p.Flags, 10)
  125. }
  126. params["cas"] = strconv.FormatUint(p.ModifyIndex, 10)
  127. return k.put(p.Key, params, p.Value, q)
  128. }
  129. // Acquire is used for a lock acquisition operation. The Key,
  130. // Flags, Value and Session are respected. Returns true
  131. // on success or false on failures.
  132. func (k *KV) Acquire(p *KVPair, q *WriteOptions) (bool, *WriteMeta, error) {
  133. params := make(map[string]string, 2)
  134. if p.Flags != 0 {
  135. params["flags"] = strconv.FormatUint(p.Flags, 10)
  136. }
  137. params["acquire"] = p.Session
  138. return k.put(p.Key, params, p.Value, q)
  139. }
  140. // Release is used for a lock release operation. The Key,
  141. // Flags, Value and Session are respected. Returns true
  142. // on success or false on failures.
  143. func (k *KV) Release(p *KVPair, q *WriteOptions) (bool, *WriteMeta, error) {
  144. params := make(map[string]string, 2)
  145. if p.Flags != 0 {
  146. params["flags"] = strconv.FormatUint(p.Flags, 10)
  147. }
  148. params["release"] = p.Session
  149. return k.put(p.Key, params, p.Value, q)
  150. }
  151. func (k *KV) put(key string, params map[string]string, body []byte, q *WriteOptions) (bool, *WriteMeta, error) {
  152. if len(key) > 0 && key[0] == '/' {
  153. return false, nil, fmt.Errorf("Invalid key. Key must not begin with a '/': %s", key)
  154. }
  155. r := k.c.newRequest("PUT", "/v1/kv/"+key)
  156. r.setWriteOptions(q)
  157. for param, val := range params {
  158. r.params.Set(param, val)
  159. }
  160. r.body = bytes.NewReader(body)
  161. rtt, resp, err := requireOK(k.c.doRequest(r))
  162. if err != nil {
  163. return false, nil, err
  164. }
  165. defer resp.Body.Close()
  166. qm := &WriteMeta{}
  167. qm.RequestTime = rtt
  168. var buf bytes.Buffer
  169. if _, err := io.Copy(&buf, resp.Body); err != nil {
  170. return false, nil, fmt.Errorf("Failed to read response: %v", err)
  171. }
  172. res := strings.Contains(string(buf.Bytes()), "true")
  173. return res, qm, nil
  174. }
  175. // Delete is used to delete a single key
  176. func (k *KV) Delete(key string, w *WriteOptions) (*WriteMeta, error) {
  177. _, qm, err := k.deleteInternal(key, nil, w)
  178. return qm, err
  179. }
  180. // DeleteCAS is used for a Delete Check-And-Set operation. The Key
  181. // and ModifyIndex are respected. Returns true on success or false on failures.
  182. func (k *KV) DeleteCAS(p *KVPair, q *WriteOptions) (bool, *WriteMeta, error) {
  183. params := map[string]string{
  184. "cas": strconv.FormatUint(p.ModifyIndex, 10),
  185. }
  186. return k.deleteInternal(p.Key, params, q)
  187. }
  188. // DeleteTree is used to delete all keys under a prefix
  189. func (k *KV) DeleteTree(prefix string, w *WriteOptions) (*WriteMeta, error) {
  190. _, qm, err := k.deleteInternal(prefix, map[string]string{"recurse": ""}, w)
  191. return qm, err
  192. }
  193. func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOptions) (bool, *WriteMeta, error) {
  194. r := k.c.newRequest("DELETE", "/v1/kv/"+key)
  195. r.setWriteOptions(q)
  196. for param, val := range params {
  197. r.params.Set(param, val)
  198. }
  199. rtt, resp, err := requireOK(k.c.doRequest(r))
  200. if err != nil {
  201. return false, nil, err
  202. }
  203. defer resp.Body.Close()
  204. qm := &WriteMeta{}
  205. qm.RequestTime = rtt
  206. var buf bytes.Buffer
  207. if _, err := io.Copy(&buf, resp.Body); err != nil {
  208. return false, nil, fmt.Errorf("Failed to read response: %v", err)
  209. }
  210. res := strings.Contains(string(buf.Bytes()), "true")
  211. return res, qm, nil
  212. }