enc.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. // Copyright 2012-2015 Apcera Inc. All rights reserved.
  2. package nats
  3. import (
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "time"
  9. // Default Encoders
  10. . "github.com/nats-io/nats/encoders/builtin"
  11. )
  12. // Encoder interface is for all register encoders
  13. type Encoder interface {
  14. Encode(subject string, v interface{}) ([]byte, error)
  15. Decode(subject string, data []byte, vPtr interface{}) error
  16. }
  17. var encMap map[string]Encoder
  18. var encLock sync.Mutex
  19. // Indexe names into the Registered Encoders.
  20. const (
  21. JSON_ENCODER = "json"
  22. GOB_ENCODER = "gob"
  23. DEFAULT_ENCODER = "default"
  24. )
  25. func init() {
  26. encMap = make(map[string]Encoder)
  27. // Register json, gob and default encoder
  28. RegisterEncoder(JSON_ENCODER, &JsonEncoder{})
  29. RegisterEncoder(GOB_ENCODER, &GobEncoder{})
  30. RegisterEncoder(DEFAULT_ENCODER, &DefaultEncoder{})
  31. }
  32. // EncodedConn are the preferred way to interface with NATS. They wrap a bare connection to
  33. // a nats server and have an extendable encoder system that will encode and decode messages
  34. // from raw Go types.
  35. type EncodedConn struct {
  36. Conn *Conn
  37. Enc Encoder
  38. }
  39. // NewEncodedConn will wrap an existing Connection and utilize the appropriate registered
  40. // encoder.
  41. func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error) {
  42. if c == nil {
  43. return nil, errors.New("nats: Nil Connection")
  44. }
  45. if c.IsClosed() {
  46. return nil, ErrConnectionClosed
  47. }
  48. ec := &EncodedConn{Conn: c, Enc: EncoderForType(encType)}
  49. if ec.Enc == nil {
  50. return nil, fmt.Errorf("No encoder registered for '%s'", encType)
  51. }
  52. return ec, nil
  53. }
  54. // RegisterEncoder will register the encType with the given Encoder. Useful for customization.
  55. func RegisterEncoder(encType string, enc Encoder) {
  56. encLock.Lock()
  57. defer encLock.Unlock()
  58. encMap[encType] = enc
  59. }
  60. // EncoderForType will return the registered Encoder for the encType.
  61. func EncoderForType(encType string) Encoder {
  62. encLock.Lock()
  63. defer encLock.Unlock()
  64. return encMap[encType]
  65. }
  66. // Publish publishes the data argument to the given subject. The data argument
  67. // will be encoded using the associated encoder.
  68. func (c *EncodedConn) Publish(subject string, v interface{}) error {
  69. b, err := c.Enc.Encode(subject, v)
  70. if err != nil {
  71. return err
  72. }
  73. return c.Conn.publish(subject, _EMPTY_, b)
  74. }
  75. // PublishRequest will perform a Publish() expecting a response on the
  76. // reply subject. Use Request() for automatically waiting for a response
  77. // inline.
  78. func (c *EncodedConn) PublishRequest(subject, reply string, v interface{}) error {
  79. b, err := c.Enc.Encode(subject, v)
  80. if err != nil {
  81. return err
  82. }
  83. return c.Conn.publish(subject, reply, b)
  84. }
  85. // Request will create an Inbox and perform a Request() call
  86. // with the Inbox reply for the data v. A response will be
  87. // decoded into the vPtrResponse.
  88. func (c *EncodedConn) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error {
  89. b, err := c.Enc.Encode(subject, v)
  90. if err != nil {
  91. return err
  92. }
  93. m, err := c.Conn.Request(subject, b, timeout)
  94. if err != nil {
  95. return err
  96. }
  97. if reflect.TypeOf(vPtr) == emptyMsgType {
  98. mPtr := vPtr.(*Msg)
  99. *mPtr = *m
  100. } else {
  101. err = c.Enc.Decode(m.Subject, m.Data, vPtr)
  102. }
  103. return err
  104. }
  105. // Handler is a specific callback used for Subscribe. It is generalized to
  106. // an interface{}, but we will discover its format and arguments at runtime
  107. // and perform the correct callback, including de-marshalling JSON strings
  108. // back into the appropriate struct based on the signature of the Handler.
  109. //
  110. // Handlers are expected to have one of four signatures.
  111. //
  112. // type person struct {
  113. // Name string `json:"name,omitempty"`
  114. // Age uint `json:"age,omitempty"`
  115. // }
  116. //
  117. // handler := func(m *Msg)
  118. // handler := func(p *person)
  119. // handler := func(subject string, o *obj)
  120. // handler := func(subject, reply string, o *obj)
  121. //
  122. // These forms allow a callback to request a raw Msg ptr, where the processing
  123. // of the message from the wire is untouched. Process a JSON representation
  124. // and demarshal it into the given struct, e.g. person.
  125. // There are also variants where the callback wants either the subject, or the
  126. // subject and the reply subject.
  127. type Handler interface{}
  128. // Dissect the cb Handler's signature
  129. func argInfo(cb Handler) (reflect.Type, int) {
  130. cbType := reflect.TypeOf(cb)
  131. if cbType.Kind() != reflect.Func {
  132. panic("nats: Handler needs to be a func")
  133. }
  134. numArgs := cbType.NumIn()
  135. if numArgs == 0 {
  136. return nil, numArgs
  137. }
  138. return cbType.In(numArgs - 1), numArgs
  139. }
  140. var emptyMsgType = reflect.TypeOf(&Msg{})
  141. // Subscribe will create a subscription on the given subject and process incoming
  142. // messages using the specified Handler. The Handler should be a func that matches
  143. // a signature from the description of Handler from above.
  144. func (c *EncodedConn) Subscribe(subject string, cb Handler) (*Subscription, error) {
  145. return c.subscribe(subject, _EMPTY_, cb)
  146. }
  147. // QueueSubscribe will create a queue subscription on the given subject and process
  148. // incoming messages using the specified Handler. The Handler should be a func that
  149. // matches a signature from the description of Handler from above.
  150. func (c *EncodedConn) QueueSubscribe(subject, queue string, cb Handler) (*Subscription, error) {
  151. return c.subscribe(subject, queue, cb)
  152. }
  153. // Internal implementation that all public functions will use.
  154. func (c *EncodedConn) subscribe(subject, queue string, cb Handler) (*Subscription, error) {
  155. if cb == nil {
  156. return nil, errors.New("nats: Handler required for EncodedConn Subscription")
  157. }
  158. argType, numArgs := argInfo(cb)
  159. if argType == nil {
  160. return nil, errors.New("nats: Handler requires at least one argument")
  161. }
  162. cbValue := reflect.ValueOf(cb)
  163. wantsRaw := (argType == emptyMsgType)
  164. natsCB := func(m *Msg) {
  165. var oV []reflect.Value
  166. if wantsRaw {
  167. oV = []reflect.Value{reflect.ValueOf(m)}
  168. } else {
  169. var oPtr reflect.Value
  170. if argType.Kind() != reflect.Ptr {
  171. oPtr = reflect.New(argType)
  172. } else {
  173. oPtr = reflect.New(argType.Elem())
  174. }
  175. if err := c.Enc.Decode(m.Subject, m.Data, oPtr.Interface()); err != nil {
  176. if c.Conn.Opts.AsyncErrorCB != nil {
  177. c.Conn.ach <- func() {
  178. c.Conn.Opts.AsyncErrorCB(c.Conn, m.Sub, errors.New("nats: Got an error trying to unmarshal: "+err.Error()))
  179. }
  180. }
  181. return
  182. }
  183. if argType.Kind() != reflect.Ptr {
  184. oPtr = reflect.Indirect(oPtr)
  185. }
  186. // Callback Arity
  187. switch numArgs {
  188. case 1:
  189. oV = []reflect.Value{oPtr}
  190. case 2:
  191. subV := reflect.ValueOf(m.Subject)
  192. oV = []reflect.Value{subV, oPtr}
  193. case 3:
  194. subV := reflect.ValueOf(m.Subject)
  195. replyV := reflect.ValueOf(m.Reply)
  196. oV = []reflect.Value{subV, replyV, oPtr}
  197. }
  198. }
  199. cbValue.Call(oV)
  200. }
  201. return c.Conn.subscribe(subject, queue, natsCB, nil)
  202. }
  203. // FlushTimeout allows a Flush operation to have an associated timeout.
  204. func (c *EncodedConn) FlushTimeout(timeout time.Duration) (err error) {
  205. return c.Conn.FlushTimeout(timeout)
  206. }
  207. // Flush will perform a round trip to the server and return when it
  208. // receives the internal reply.
  209. func (c *EncodedConn) Flush() error {
  210. return c.Conn.Flush()
  211. }
  212. // Close will close the connection to the server. This call will release
  213. // all blocking calls, such as Flush(), etc.
  214. func (c *EncodedConn) Close() {
  215. c.Conn.Close()
  216. }
  217. // LastError reports the last error encountered via the Connection.
  218. func (c *EncodedConn) LastError() error {
  219. return c.Conn.err
  220. }