api.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475
  1. package api
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "strconv"
  14. "strings"
  15. "time"
  16. "github.com/hashicorp/go-cleanhttp"
  17. )
  18. // QueryOptions are used to parameterize a query
  19. type QueryOptions struct {
  20. // Providing a datacenter overwrites the DC provided
  21. // by the Config
  22. Datacenter string
  23. // AllowStale allows any Consul server (non-leader) to service
  24. // a read. This allows for lower latency and higher throughput
  25. AllowStale bool
  26. // RequireConsistent forces the read to be fully consistent.
  27. // This is more expensive but prevents ever performing a stale
  28. // read.
  29. RequireConsistent bool
  30. // WaitIndex is used to enable a blocking query. Waits
  31. // until the timeout or the next index is reached
  32. WaitIndex uint64
  33. // WaitTime is used to bound the duration of a wait.
  34. // Defaults to that of the Config, but can be overridden.
  35. WaitTime time.Duration
  36. // Token is used to provide a per-request ACL token
  37. // which overrides the agent's default token.
  38. Token string
  39. // Near is used to provide a node name that will sort the results
  40. // in ascending order based on the estimated round trip time from
  41. // that node. Setting this to "_agent" will use the agent's node
  42. // for the sort.
  43. Near string
  44. }
  45. // WriteOptions are used to parameterize a write
  46. type WriteOptions struct {
  47. // Providing a datacenter overwrites the DC provided
  48. // by the Config
  49. Datacenter string
  50. // Token is used to provide a per-request ACL token
  51. // which overrides the agent's default token.
  52. Token string
  53. }
  54. // QueryMeta is used to return meta data about a query
  55. type QueryMeta struct {
  56. // LastIndex. This can be used as a WaitIndex to perform
  57. // a blocking query
  58. LastIndex uint64
  59. // Time of last contact from the leader for the
  60. // server servicing the request
  61. LastContact time.Duration
  62. // Is there a known leader
  63. KnownLeader bool
  64. // How long did the request take
  65. RequestTime time.Duration
  66. }
  67. // WriteMeta is used to return meta data about a write
  68. type WriteMeta struct {
  69. // How long did the request take
  70. RequestTime time.Duration
  71. }
  72. // HttpBasicAuth is used to authenticate http client with HTTP Basic Authentication
  73. type HttpBasicAuth struct {
  74. // Username to use for HTTP Basic Authentication
  75. Username string
  76. // Password to use for HTTP Basic Authentication
  77. Password string
  78. }
  79. // Config is used to configure the creation of a client
  80. type Config struct {
  81. // Address is the address of the Consul server
  82. Address string
  83. // Scheme is the URI scheme for the Consul server
  84. Scheme string
  85. // Datacenter to use. If not provided, the default agent datacenter is used.
  86. Datacenter string
  87. // HttpClient is the client to use. Default will be
  88. // used if not provided.
  89. HttpClient *http.Client
  90. // HttpAuth is the auth info to use for http access.
  91. HttpAuth *HttpBasicAuth
  92. // WaitTime limits how long a Watch will block. If not provided,
  93. // the agent default values will be used.
  94. WaitTime time.Duration
  95. // Token is used to provide a per-request ACL token
  96. // which overrides the agent's default token.
  97. Token string
  98. }
  99. // DefaultConfig returns a default configuration for the client
  100. func DefaultConfig() *Config {
  101. config := &Config{
  102. Address: "127.0.0.1:8500",
  103. Scheme: "http",
  104. HttpClient: cleanhttp.DefaultClient(),
  105. }
  106. if addr := os.Getenv("CONSUL_HTTP_ADDR"); addr != "" {
  107. config.Address = addr
  108. }
  109. if token := os.Getenv("CONSUL_HTTP_TOKEN"); token != "" {
  110. config.Token = token
  111. }
  112. if auth := os.Getenv("CONSUL_HTTP_AUTH"); auth != "" {
  113. var username, password string
  114. if strings.Contains(auth, ":") {
  115. split := strings.SplitN(auth, ":", 2)
  116. username = split[0]
  117. password = split[1]
  118. } else {
  119. username = auth
  120. }
  121. config.HttpAuth = &HttpBasicAuth{
  122. Username: username,
  123. Password: password,
  124. }
  125. }
  126. if ssl := os.Getenv("CONSUL_HTTP_SSL"); ssl != "" {
  127. enabled, err := strconv.ParseBool(ssl)
  128. if err != nil {
  129. log.Printf("[WARN] client: could not parse CONSUL_HTTP_SSL: %s", err)
  130. }
  131. if enabled {
  132. config.Scheme = "https"
  133. }
  134. }
  135. if verify := os.Getenv("CONSUL_HTTP_SSL_VERIFY"); verify != "" {
  136. doVerify, err := strconv.ParseBool(verify)
  137. if err != nil {
  138. log.Printf("[WARN] client: could not parse CONSUL_HTTP_SSL_VERIFY: %s", err)
  139. }
  140. if !doVerify {
  141. transport := cleanhttp.DefaultTransport()
  142. transport.TLSClientConfig = &tls.Config{
  143. InsecureSkipVerify: true,
  144. }
  145. config.HttpClient.Transport = transport
  146. }
  147. }
  148. return config
  149. }
  150. // Client provides a client to the Consul API
  151. type Client struct {
  152. config Config
  153. }
  154. // NewClient returns a new client
  155. func NewClient(config *Config) (*Client, error) {
  156. // bootstrap the config
  157. defConfig := DefaultConfig()
  158. if len(config.Address) == 0 {
  159. config.Address = defConfig.Address
  160. }
  161. if len(config.Scheme) == 0 {
  162. config.Scheme = defConfig.Scheme
  163. }
  164. if config.HttpClient == nil {
  165. config.HttpClient = defConfig.HttpClient
  166. }
  167. if parts := strings.SplitN(config.Address, "unix://", 2); len(parts) == 2 {
  168. trans := cleanhttp.DefaultTransport()
  169. trans.Dial = func(_, _ string) (net.Conn, error) {
  170. return net.Dial("unix", parts[1])
  171. }
  172. config.HttpClient = &http.Client{
  173. Transport: trans,
  174. }
  175. config.Address = parts[1]
  176. }
  177. client := &Client{
  178. config: *config,
  179. }
  180. return client, nil
  181. }
  182. // request is used to help build up a request
  183. type request struct {
  184. config *Config
  185. method string
  186. url *url.URL
  187. params url.Values
  188. body io.Reader
  189. obj interface{}
  190. }
  191. // setQueryOptions is used to annotate the request with
  192. // additional query options
  193. func (r *request) setQueryOptions(q *QueryOptions) {
  194. if q == nil {
  195. return
  196. }
  197. if q.Datacenter != "" {
  198. r.params.Set("dc", q.Datacenter)
  199. }
  200. if q.AllowStale {
  201. r.params.Set("stale", "")
  202. }
  203. if q.RequireConsistent {
  204. r.params.Set("consistent", "")
  205. }
  206. if q.WaitIndex != 0 {
  207. r.params.Set("index", strconv.FormatUint(q.WaitIndex, 10))
  208. }
  209. if q.WaitTime != 0 {
  210. r.params.Set("wait", durToMsec(q.WaitTime))
  211. }
  212. if q.Token != "" {
  213. r.params.Set("token", q.Token)
  214. }
  215. if q.Near != "" {
  216. r.params.Set("near", q.Near)
  217. }
  218. }
  219. // durToMsec converts a duration to a millisecond specified string. If the
  220. // user selected a positive value that rounds to 0 ms, then we will use 1 ms
  221. // so they get a short delay, otherwise Consul will translate the 0 ms into
  222. // a huge default delay.
  223. func durToMsec(dur time.Duration) string {
  224. ms := dur / time.Millisecond
  225. if dur > 0 && ms == 0 {
  226. ms = 1
  227. }
  228. return fmt.Sprintf("%dms", ms)
  229. }
  230. // serverError is a string we look for to detect 500 errors.
  231. const serverError = "Unexpected response code: 500"
  232. // IsServerError returns true for 500 errors from the Consul servers, these are
  233. // usually retryable at a later time.
  234. func IsServerError(err error) bool {
  235. if err == nil {
  236. return false
  237. }
  238. // TODO (slackpad) - Make a real error type here instead of using
  239. // a string check.
  240. return strings.Contains(err.Error(), serverError)
  241. }
  242. // setWriteOptions is used to annotate the request with
  243. // additional write options
  244. func (r *request) setWriteOptions(q *WriteOptions) {
  245. if q == nil {
  246. return
  247. }
  248. if q.Datacenter != "" {
  249. r.params.Set("dc", q.Datacenter)
  250. }
  251. if q.Token != "" {
  252. r.params.Set("token", q.Token)
  253. }
  254. }
  255. // toHTTP converts the request to an HTTP request
  256. func (r *request) toHTTP() (*http.Request, error) {
  257. // Encode the query parameters
  258. r.url.RawQuery = r.params.Encode()
  259. // Check if we should encode the body
  260. if r.body == nil && r.obj != nil {
  261. if b, err := encodeBody(r.obj); err != nil {
  262. return nil, err
  263. } else {
  264. r.body = b
  265. }
  266. }
  267. // Create the HTTP request
  268. req, err := http.NewRequest(r.method, r.url.RequestURI(), r.body)
  269. if err != nil {
  270. return nil, err
  271. }
  272. req.URL.Host = r.url.Host
  273. req.URL.Scheme = r.url.Scheme
  274. req.Host = r.url.Host
  275. // Setup auth
  276. if r.config.HttpAuth != nil {
  277. req.SetBasicAuth(r.config.HttpAuth.Username, r.config.HttpAuth.Password)
  278. }
  279. return req, nil
  280. }
  281. // newRequest is used to create a new request
  282. func (c *Client) newRequest(method, path string) *request {
  283. r := &request{
  284. config: &c.config,
  285. method: method,
  286. url: &url.URL{
  287. Scheme: c.config.Scheme,
  288. Host: c.config.Address,
  289. Path: path,
  290. },
  291. params: make(map[string][]string),
  292. }
  293. if c.config.Datacenter != "" {
  294. r.params.Set("dc", c.config.Datacenter)
  295. }
  296. if c.config.WaitTime != 0 {
  297. r.params.Set("wait", durToMsec(r.config.WaitTime))
  298. }
  299. if c.config.Token != "" {
  300. r.params.Set("token", r.config.Token)
  301. }
  302. return r
  303. }
  304. // doRequest runs a request with our client
  305. func (c *Client) doRequest(r *request) (time.Duration, *http.Response, error) {
  306. req, err := r.toHTTP()
  307. if err != nil {
  308. return 0, nil, err
  309. }
  310. start := time.Now()
  311. resp, err := c.config.HttpClient.Do(req)
  312. diff := time.Now().Sub(start)
  313. return diff, resp, err
  314. }
  315. // Query is used to do a GET request against an endpoint
  316. // and deserialize the response into an interface using
  317. // standard Consul conventions.
  318. func (c *Client) query(endpoint string, out interface{}, q *QueryOptions) (*QueryMeta, error) {
  319. r := c.newRequest("GET", endpoint)
  320. r.setQueryOptions(q)
  321. rtt, resp, err := requireOK(c.doRequest(r))
  322. if err != nil {
  323. return nil, err
  324. }
  325. defer resp.Body.Close()
  326. qm := &QueryMeta{}
  327. parseQueryMeta(resp, qm)
  328. qm.RequestTime = rtt
  329. if err := decodeBody(resp, out); err != nil {
  330. return nil, err
  331. }
  332. return qm, nil
  333. }
  334. // write is used to do a PUT request against an endpoint
  335. // and serialize/deserialized using the standard Consul conventions.
  336. func (c *Client) write(endpoint string, in, out interface{}, q *WriteOptions) (*WriteMeta, error) {
  337. r := c.newRequest("PUT", endpoint)
  338. r.setWriteOptions(q)
  339. r.obj = in
  340. rtt, resp, err := requireOK(c.doRequest(r))
  341. if err != nil {
  342. return nil, err
  343. }
  344. defer resp.Body.Close()
  345. wm := &WriteMeta{RequestTime: rtt}
  346. if out != nil {
  347. if err := decodeBody(resp, &out); err != nil {
  348. return nil, err
  349. }
  350. }
  351. return wm, nil
  352. }
  353. // parseQueryMeta is used to help parse query meta-data
  354. func parseQueryMeta(resp *http.Response, q *QueryMeta) error {
  355. header := resp.Header
  356. // Parse the X-Consul-Index
  357. index, err := strconv.ParseUint(header.Get("X-Consul-Index"), 10, 64)
  358. if err != nil {
  359. return fmt.Errorf("Failed to parse X-Consul-Index: %v", err)
  360. }
  361. q.LastIndex = index
  362. // Parse the X-Consul-LastContact
  363. last, err := strconv.ParseUint(header.Get("X-Consul-LastContact"), 10, 64)
  364. if err != nil {
  365. return fmt.Errorf("Failed to parse X-Consul-LastContact: %v", err)
  366. }
  367. q.LastContact = time.Duration(last) * time.Millisecond
  368. // Parse the X-Consul-KnownLeader
  369. switch header.Get("X-Consul-KnownLeader") {
  370. case "true":
  371. q.KnownLeader = true
  372. default:
  373. q.KnownLeader = false
  374. }
  375. return nil
  376. }
  377. // decodeBody is used to JSON decode a body
  378. func decodeBody(resp *http.Response, out interface{}) error {
  379. dec := json.NewDecoder(resp.Body)
  380. return dec.Decode(out)
  381. }
  382. // encodeBody is used to encode a request body
  383. func encodeBody(obj interface{}) (io.Reader, error) {
  384. buf := bytes.NewBuffer(nil)
  385. enc := json.NewEncoder(buf)
  386. if err := enc.Encode(obj); err != nil {
  387. return nil, err
  388. }
  389. return buf, nil
  390. }
  391. // requireOK is used to wrap doRequest and check for a 200
  392. func requireOK(d time.Duration, resp *http.Response, e error) (time.Duration, *http.Response, error) {
  393. if e != nil {
  394. if resp != nil {
  395. resp.Body.Close()
  396. }
  397. return d, nil, e
  398. }
  399. if resp.StatusCode != 200 {
  400. var buf bytes.Buffer
  401. io.Copy(&buf, resp.Body)
  402. resp.Body.Close()
  403. return d, nil, fmt.Errorf("Unexpected response code: %d (%s)", resp.StatusCode, buf.Bytes())
  404. }
  405. return d, resp, nil
  406. }