123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475 |
- package api
- import (
- "bytes"
- "crypto/tls"
- "encoding/json"
- "fmt"
- "io"
- "log"
- "net"
- "net/http"
- "net/url"
- "os"
- "strconv"
- "strings"
- "time"
- "github.com/hashicorp/go-cleanhttp"
- )
- // QueryOptions are used to parameterize a query
- type QueryOptions struct {
- // Providing a datacenter overwrites the DC provided
- // by the Config
- Datacenter string
- // AllowStale allows any Consul server (non-leader) to service
- // a read. This allows for lower latency and higher throughput
- AllowStale bool
- // RequireConsistent forces the read to be fully consistent.
- // This is more expensive but prevents ever performing a stale
- // read.
- RequireConsistent bool
- // WaitIndex is used to enable a blocking query. Waits
- // until the timeout or the next index is reached
- WaitIndex uint64
- // WaitTime is used to bound the duration of a wait.
- // Defaults to that of the Config, but can be overridden.
- WaitTime time.Duration
- // Token is used to provide a per-request ACL token
- // which overrides the agent's default token.
- Token string
- // Near is used to provide a node name that will sort the results
- // in ascending order based on the estimated round trip time from
- // that node. Setting this to "_agent" will use the agent's node
- // for the sort.
- Near string
- }
- // WriteOptions are used to parameterize a write
- type WriteOptions struct {
- // Providing a datacenter overwrites the DC provided
- // by the Config
- Datacenter string
- // Token is used to provide a per-request ACL token
- // which overrides the agent's default token.
- Token string
- }
- // QueryMeta is used to return meta data about a query
- type QueryMeta struct {
- // LastIndex. This can be used as a WaitIndex to perform
- // a blocking query
- LastIndex uint64
- // Time of last contact from the leader for the
- // server servicing the request
- LastContact time.Duration
- // Is there a known leader
- KnownLeader bool
- // How long did the request take
- RequestTime time.Duration
- }
- // WriteMeta is used to return meta data about a write
- type WriteMeta struct {
- // How long did the request take
- RequestTime time.Duration
- }
- // HttpBasicAuth is used to authenticate http client with HTTP Basic Authentication
- type HttpBasicAuth struct {
- // Username to use for HTTP Basic Authentication
- Username string
- // Password to use for HTTP Basic Authentication
- Password string
- }
- // Config is used to configure the creation of a client
- type Config struct {
- // Address is the address of the Consul server
- Address string
- // Scheme is the URI scheme for the Consul server
- Scheme string
- // Datacenter to use. If not provided, the default agent datacenter is used.
- Datacenter string
- // HttpClient is the client to use. Default will be
- // used if not provided.
- HttpClient *http.Client
- // HttpAuth is the auth info to use for http access.
- HttpAuth *HttpBasicAuth
- // WaitTime limits how long a Watch will block. If not provided,
- // the agent default values will be used.
- WaitTime time.Duration
- // Token is used to provide a per-request ACL token
- // which overrides the agent's default token.
- Token string
- }
- // DefaultConfig returns a default configuration for the client
- func DefaultConfig() *Config {
- config := &Config{
- Address: "127.0.0.1:8500",
- Scheme: "http",
- HttpClient: cleanhttp.DefaultClient(),
- }
- if addr := os.Getenv("CONSUL_HTTP_ADDR"); addr != "" {
- config.Address = addr
- }
- if token := os.Getenv("CONSUL_HTTP_TOKEN"); token != "" {
- config.Token = token
- }
- if auth := os.Getenv("CONSUL_HTTP_AUTH"); auth != "" {
- var username, password string
- if strings.Contains(auth, ":") {
- split := strings.SplitN(auth, ":", 2)
- username = split[0]
- password = split[1]
- } else {
- username = auth
- }
- config.HttpAuth = &HttpBasicAuth{
- Username: username,
- Password: password,
- }
- }
- if ssl := os.Getenv("CONSUL_HTTP_SSL"); ssl != "" {
- enabled, err := strconv.ParseBool(ssl)
- if err != nil {
- log.Printf("[WARN] client: could not parse CONSUL_HTTP_SSL: %s", err)
- }
- if enabled {
- config.Scheme = "https"
- }
- }
- if verify := os.Getenv("CONSUL_HTTP_SSL_VERIFY"); verify != "" {
- doVerify, err := strconv.ParseBool(verify)
- if err != nil {
- log.Printf("[WARN] client: could not parse CONSUL_HTTP_SSL_VERIFY: %s", err)
- }
- if !doVerify {
- transport := cleanhttp.DefaultTransport()
- transport.TLSClientConfig = &tls.Config{
- InsecureSkipVerify: true,
- }
- config.HttpClient.Transport = transport
- }
- }
- return config
- }
- // Client provides a client to the Consul API
- type Client struct {
- config Config
- }
- // NewClient returns a new client
- func NewClient(config *Config) (*Client, error) {
- // bootstrap the config
- defConfig := DefaultConfig()
- if len(config.Address) == 0 {
- config.Address = defConfig.Address
- }
- if len(config.Scheme) == 0 {
- config.Scheme = defConfig.Scheme
- }
- if config.HttpClient == nil {
- config.HttpClient = defConfig.HttpClient
- }
- if parts := strings.SplitN(config.Address, "unix://", 2); len(parts) == 2 {
- trans := cleanhttp.DefaultTransport()
- trans.Dial = func(_, _ string) (net.Conn, error) {
- return net.Dial("unix", parts[1])
- }
- config.HttpClient = &http.Client{
- Transport: trans,
- }
- config.Address = parts[1]
- }
- client := &Client{
- config: *config,
- }
- return client, nil
- }
- // request is used to help build up a request
- type request struct {
- config *Config
- method string
- url *url.URL
- params url.Values
- body io.Reader
- obj interface{}
- }
- // setQueryOptions is used to annotate the request with
- // additional query options
- func (r *request) setQueryOptions(q *QueryOptions) {
- if q == nil {
- return
- }
- if q.Datacenter != "" {
- r.params.Set("dc", q.Datacenter)
- }
- if q.AllowStale {
- r.params.Set("stale", "")
- }
- if q.RequireConsistent {
- r.params.Set("consistent", "")
- }
- if q.WaitIndex != 0 {
- r.params.Set("index", strconv.FormatUint(q.WaitIndex, 10))
- }
- if q.WaitTime != 0 {
- r.params.Set("wait", durToMsec(q.WaitTime))
- }
- if q.Token != "" {
- r.params.Set("token", q.Token)
- }
- if q.Near != "" {
- r.params.Set("near", q.Near)
- }
- }
- // durToMsec converts a duration to a millisecond specified string. If the
- // user selected a positive value that rounds to 0 ms, then we will use 1 ms
- // so they get a short delay, otherwise Consul will translate the 0 ms into
- // a huge default delay.
- func durToMsec(dur time.Duration) string {
- ms := dur / time.Millisecond
- if dur > 0 && ms == 0 {
- ms = 1
- }
- return fmt.Sprintf("%dms", ms)
- }
- // serverError is a string we look for to detect 500 errors.
- const serverError = "Unexpected response code: 500"
- // IsServerError returns true for 500 errors from the Consul servers, these are
- // usually retryable at a later time.
- func IsServerError(err error) bool {
- if err == nil {
- return false
- }
- // TODO (slackpad) - Make a real error type here instead of using
- // a string check.
- return strings.Contains(err.Error(), serverError)
- }
- // setWriteOptions is used to annotate the request with
- // additional write options
- func (r *request) setWriteOptions(q *WriteOptions) {
- if q == nil {
- return
- }
- if q.Datacenter != "" {
- r.params.Set("dc", q.Datacenter)
- }
- if q.Token != "" {
- r.params.Set("token", q.Token)
- }
- }
- // toHTTP converts the request to an HTTP request
- func (r *request) toHTTP() (*http.Request, error) {
- // Encode the query parameters
- r.url.RawQuery = r.params.Encode()
- // Check if we should encode the body
- if r.body == nil && r.obj != nil {
- if b, err := encodeBody(r.obj); err != nil {
- return nil, err
- } else {
- r.body = b
- }
- }
- // Create the HTTP request
- req, err := http.NewRequest(r.method, r.url.RequestURI(), r.body)
- if err != nil {
- return nil, err
- }
- req.URL.Host = r.url.Host
- req.URL.Scheme = r.url.Scheme
- req.Host = r.url.Host
- // Setup auth
- if r.config.HttpAuth != nil {
- req.SetBasicAuth(r.config.HttpAuth.Username, r.config.HttpAuth.Password)
- }
- return req, nil
- }
- // newRequest is used to create a new request
- func (c *Client) newRequest(method, path string) *request {
- r := &request{
- config: &c.config,
- method: method,
- url: &url.URL{
- Scheme: c.config.Scheme,
- Host: c.config.Address,
- Path: path,
- },
- params: make(map[string][]string),
- }
- if c.config.Datacenter != "" {
- r.params.Set("dc", c.config.Datacenter)
- }
- if c.config.WaitTime != 0 {
- r.params.Set("wait", durToMsec(r.config.WaitTime))
- }
- if c.config.Token != "" {
- r.params.Set("token", r.config.Token)
- }
- return r
- }
- // doRequest runs a request with our client
- func (c *Client) doRequest(r *request) (time.Duration, *http.Response, error) {
- req, err := r.toHTTP()
- if err != nil {
- return 0, nil, err
- }
- start := time.Now()
- resp, err := c.config.HttpClient.Do(req)
- diff := time.Now().Sub(start)
- return diff, resp, err
- }
- // Query is used to do a GET request against an endpoint
- // and deserialize the response into an interface using
- // standard Consul conventions.
- func (c *Client) query(endpoint string, out interface{}, q *QueryOptions) (*QueryMeta, error) {
- r := c.newRequest("GET", endpoint)
- r.setQueryOptions(q)
- rtt, resp, err := requireOK(c.doRequest(r))
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- qm := &QueryMeta{}
- parseQueryMeta(resp, qm)
- qm.RequestTime = rtt
- if err := decodeBody(resp, out); err != nil {
- return nil, err
- }
- return qm, nil
- }
- // write is used to do a PUT request against an endpoint
- // and serialize/deserialized using the standard Consul conventions.
- func (c *Client) write(endpoint string, in, out interface{}, q *WriteOptions) (*WriteMeta, error) {
- r := c.newRequest("PUT", endpoint)
- r.setWriteOptions(q)
- r.obj = in
- rtt, resp, err := requireOK(c.doRequest(r))
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- wm := &WriteMeta{RequestTime: rtt}
- if out != nil {
- if err := decodeBody(resp, &out); err != nil {
- return nil, err
- }
- }
- return wm, nil
- }
- // parseQueryMeta is used to help parse query meta-data
- func parseQueryMeta(resp *http.Response, q *QueryMeta) error {
- header := resp.Header
- // Parse the X-Consul-Index
- index, err := strconv.ParseUint(header.Get("X-Consul-Index"), 10, 64)
- if err != nil {
- return fmt.Errorf("Failed to parse X-Consul-Index: %v", err)
- }
- q.LastIndex = index
- // Parse the X-Consul-LastContact
- last, err := strconv.ParseUint(header.Get("X-Consul-LastContact"), 10, 64)
- if err != nil {
- return fmt.Errorf("Failed to parse X-Consul-LastContact: %v", err)
- }
- q.LastContact = time.Duration(last) * time.Millisecond
- // Parse the X-Consul-KnownLeader
- switch header.Get("X-Consul-KnownLeader") {
- case "true":
- q.KnownLeader = true
- default:
- q.KnownLeader = false
- }
- return nil
- }
- // decodeBody is used to JSON decode a body
- func decodeBody(resp *http.Response, out interface{}) error {
- dec := json.NewDecoder(resp.Body)
- return dec.Decode(out)
- }
- // encodeBody is used to encode a request body
- func encodeBody(obj interface{}) (io.Reader, error) {
- buf := bytes.NewBuffer(nil)
- enc := json.NewEncoder(buf)
- if err := enc.Encode(obj); err != nil {
- return nil, err
- }
- return buf, nil
- }
- // requireOK is used to wrap doRequest and check for a 200
- func requireOK(d time.Duration, resp *http.Response, e error) (time.Duration, *http.Response, error) {
- if e != nil {
- if resp != nil {
- resp.Body.Close()
- }
- return d, nil, e
- }
- if resp.StatusCode != 200 {
- var buf bytes.Buffer
- io.Copy(&buf, resp.Body)
- resp.Body.Close()
- return d, nil, fmt.Errorf("Unexpected response code: %d (%s)", resp.StatusCode, buf.Bytes())
- }
- return d, resp, nil
- }
|