parser.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. // Copyright 2012-2014 Apcera Inc. All rights reserved.
  2. package nats
  3. import (
  4. "fmt"
  5. )
  6. type msgArg struct {
  7. subject []byte
  8. reply []byte
  9. sid int64
  10. size int
  11. }
  12. const MAX_CONTROL_LINE_SIZE = 1024
  13. type parseState struct {
  14. state int
  15. as int
  16. drop int
  17. ma msgArg
  18. argBuf []byte
  19. msgBuf []byte
  20. scratch [MAX_CONTROL_LINE_SIZE]byte
  21. }
  22. const (
  23. OP_START = iota
  24. OP_PLUS
  25. OP_PLUS_O
  26. OP_PLUS_OK
  27. OP_MINUS
  28. OP_MINUS_E
  29. OP_MINUS_ER
  30. OP_MINUS_ERR
  31. OP_MINUS_ERR_SPC
  32. MINUS_ERR_ARG
  33. OP_M
  34. OP_MS
  35. OP_MSG
  36. OP_MSG_SPC
  37. MSG_ARG
  38. MSG_PAYLOAD
  39. MSG_END
  40. OP_P
  41. OP_PI
  42. OP_PIN
  43. OP_PING
  44. OP_PO
  45. OP_PON
  46. OP_PONG
  47. )
  48. // parse is the fast protocol parser engine.
  49. func (nc *Conn) parse(buf []byte) error {
  50. var i int
  51. var b byte
  52. // Move to loop instead of range syntax to allow jumping of i
  53. for i = 0; i < len(buf); i++ {
  54. b = buf[i]
  55. switch nc.ps.state {
  56. case OP_START:
  57. switch b {
  58. case 'M', 'm':
  59. nc.ps.state = OP_M
  60. case 'P', 'p':
  61. nc.ps.state = OP_P
  62. case '+':
  63. nc.ps.state = OP_PLUS
  64. case '-':
  65. nc.ps.state = OP_MINUS
  66. default:
  67. goto parseErr
  68. }
  69. case OP_M:
  70. switch b {
  71. case 'S', 's':
  72. nc.ps.state = OP_MS
  73. default:
  74. goto parseErr
  75. }
  76. case OP_MS:
  77. switch b {
  78. case 'G', 'g':
  79. nc.ps.state = OP_MSG
  80. default:
  81. goto parseErr
  82. }
  83. case OP_MSG:
  84. switch b {
  85. case ' ', '\t':
  86. nc.ps.state = OP_MSG_SPC
  87. default:
  88. goto parseErr
  89. }
  90. case OP_MSG_SPC:
  91. switch b {
  92. case ' ', '\t':
  93. continue
  94. default:
  95. nc.ps.state = MSG_ARG
  96. nc.ps.as = i
  97. }
  98. case MSG_ARG:
  99. switch b {
  100. case '\r':
  101. nc.ps.drop = 1
  102. case '\n':
  103. var arg []byte
  104. if nc.ps.argBuf != nil {
  105. arg = nc.ps.argBuf
  106. } else {
  107. arg = buf[nc.ps.as : i-nc.ps.drop]
  108. }
  109. if err := nc.processMsgArgs(arg); err != nil {
  110. return err
  111. }
  112. nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, MSG_PAYLOAD
  113. // jump ahead with the index. If this overruns
  114. // what is left we fall out and process split
  115. // buffer.
  116. i = nc.ps.as + nc.ps.ma.size - 1
  117. default:
  118. if nc.ps.argBuf != nil {
  119. nc.ps.argBuf = append(nc.ps.argBuf, b)
  120. }
  121. }
  122. case MSG_PAYLOAD:
  123. if nc.ps.msgBuf != nil {
  124. if len(nc.ps.msgBuf) >= nc.ps.ma.size {
  125. nc.processMsg(nc.ps.msgBuf)
  126. nc.ps.argBuf, nc.ps.msgBuf, nc.ps.state = nil, nil, MSG_END
  127. } else {
  128. // copy as much as we can to the buffer and skip ahead.
  129. toCopy := nc.ps.ma.size - len(nc.ps.msgBuf)
  130. avail := len(buf) - i
  131. if avail < toCopy {
  132. toCopy = avail
  133. }
  134. if toCopy > 0 {
  135. start := len(nc.ps.msgBuf)
  136. // This is needed for copy to work.
  137. nc.ps.msgBuf = nc.ps.msgBuf[:start+toCopy]
  138. copy(nc.ps.msgBuf[start:], buf[i:i+toCopy])
  139. // Update our index
  140. i = (i + toCopy) - 1
  141. } else {
  142. nc.ps.msgBuf = append(nc.ps.msgBuf, b)
  143. }
  144. }
  145. } else if i-nc.ps.as >= nc.ps.ma.size {
  146. nc.processMsg(buf[nc.ps.as:i])
  147. nc.ps.argBuf, nc.ps.msgBuf, nc.ps.state = nil, nil, MSG_END
  148. }
  149. case MSG_END:
  150. switch b {
  151. case '\n':
  152. nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, OP_START
  153. default:
  154. continue
  155. }
  156. case OP_PLUS:
  157. switch b {
  158. case 'O', 'o':
  159. nc.ps.state = OP_PLUS_O
  160. default:
  161. goto parseErr
  162. }
  163. case OP_PLUS_O:
  164. switch b {
  165. case 'K', 'k':
  166. nc.ps.state = OP_PLUS_OK
  167. default:
  168. goto parseErr
  169. }
  170. case OP_PLUS_OK:
  171. switch b {
  172. case '\n':
  173. nc.processOK()
  174. nc.ps.drop, nc.ps.state = 0, OP_START
  175. }
  176. case OP_MINUS:
  177. switch b {
  178. case 'E', 'e':
  179. nc.ps.state = OP_MINUS_E
  180. default:
  181. goto parseErr
  182. }
  183. case OP_MINUS_E:
  184. switch b {
  185. case 'R', 'r':
  186. nc.ps.state = OP_MINUS_ER
  187. default:
  188. goto parseErr
  189. }
  190. case OP_MINUS_ER:
  191. switch b {
  192. case 'R', 'r':
  193. nc.ps.state = OP_MINUS_ERR
  194. default:
  195. goto parseErr
  196. }
  197. case OP_MINUS_ERR:
  198. switch b {
  199. case ' ', '\t':
  200. nc.ps.state = OP_MINUS_ERR_SPC
  201. default:
  202. goto parseErr
  203. }
  204. case OP_MINUS_ERR_SPC:
  205. switch b {
  206. case ' ', '\t':
  207. continue
  208. default:
  209. nc.ps.state = MINUS_ERR_ARG
  210. nc.ps.as = i
  211. }
  212. case MINUS_ERR_ARG:
  213. switch b {
  214. case '\r':
  215. nc.ps.drop = 1
  216. case '\n':
  217. var arg []byte
  218. if nc.ps.argBuf != nil {
  219. arg = nc.ps.argBuf
  220. nc.ps.argBuf = nil
  221. } else {
  222. arg = buf[nc.ps.as : i-nc.ps.drop]
  223. }
  224. nc.processErr(string(arg))
  225. nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, OP_START
  226. default:
  227. if nc.ps.argBuf != nil {
  228. nc.ps.argBuf = append(nc.ps.argBuf, b)
  229. }
  230. }
  231. case OP_P:
  232. switch b {
  233. case 'I', 'i':
  234. nc.ps.state = OP_PI
  235. case 'O', 'o':
  236. nc.ps.state = OP_PO
  237. default:
  238. goto parseErr
  239. }
  240. case OP_PO:
  241. switch b {
  242. case 'N', 'n':
  243. nc.ps.state = OP_PON
  244. default:
  245. goto parseErr
  246. }
  247. case OP_PON:
  248. switch b {
  249. case 'G', 'g':
  250. nc.ps.state = OP_PONG
  251. default:
  252. goto parseErr
  253. }
  254. case OP_PONG:
  255. switch b {
  256. case '\n':
  257. nc.processPong()
  258. nc.ps.drop, nc.ps.state = 0, OP_START
  259. }
  260. case OP_PI:
  261. switch b {
  262. case 'N', 'n':
  263. nc.ps.state = OP_PIN
  264. default:
  265. goto parseErr
  266. }
  267. case OP_PIN:
  268. switch b {
  269. case 'G', 'g':
  270. nc.ps.state = OP_PING
  271. default:
  272. goto parseErr
  273. }
  274. case OP_PING:
  275. switch b {
  276. case '\n':
  277. nc.processPing()
  278. nc.ps.drop, nc.ps.state = 0, OP_START
  279. }
  280. default:
  281. goto parseErr
  282. }
  283. }
  284. // Check for split buffer scenarios
  285. if (nc.ps.state == MSG_ARG || nc.ps.state == MINUS_ERR_ARG) && nc.ps.argBuf == nil {
  286. nc.ps.argBuf = nc.ps.scratch[:0]
  287. nc.ps.argBuf = append(nc.ps.argBuf, buf[nc.ps.as:i-nc.ps.drop]...)
  288. // FIXME, check max len
  289. }
  290. // Check for split msg
  291. if nc.ps.state == MSG_PAYLOAD && nc.ps.msgBuf == nil {
  292. // We need to clone the msgArg if it is still referencing the
  293. // read buffer and we are not able to process the msg.
  294. if nc.ps.argBuf == nil {
  295. nc.cloneMsgArg()
  296. }
  297. // If we will overflow the scratch buffer, just create a
  298. // new buffer to hold the split message.
  299. if nc.ps.ma.size > cap(nc.ps.scratch)-len(nc.ps.argBuf) {
  300. lrem := len(buf[nc.ps.as:])
  301. nc.ps.msgBuf = make([]byte, lrem, nc.ps.ma.size)
  302. copy(nc.ps.msgBuf, buf[nc.ps.as:])
  303. } else {
  304. nc.ps.msgBuf = nc.ps.scratch[len(nc.ps.argBuf):len(nc.ps.argBuf)]
  305. nc.ps.msgBuf = append(nc.ps.msgBuf, (buf[nc.ps.as:])...)
  306. }
  307. }
  308. return nil
  309. parseErr:
  310. return fmt.Errorf("nats: Parse Error [%d]: '%s'", nc.ps.state, buf[i:])
  311. }
  312. // cloneMsgArg is used when the split buffer scenario has the pubArg in the existing read buffer, but
  313. // we need to hold onto it into the next read.
  314. func (nc *Conn) cloneMsgArg() {
  315. nc.ps.argBuf = nc.ps.scratch[:0]
  316. nc.ps.argBuf = append(nc.ps.argBuf, nc.ps.ma.subject...)
  317. nc.ps.argBuf = append(nc.ps.argBuf, nc.ps.ma.reply...)
  318. nc.ps.ma.subject = nc.ps.argBuf[:len(nc.ps.ma.subject)]
  319. if nc.ps.ma.reply != nil {
  320. nc.ps.ma.reply = nc.ps.argBuf[len(nc.ps.ma.subject):]
  321. }
  322. }
  323. const argsLenMax = 4
  324. func (nc *Conn) processMsgArgs(arg []byte) error {
  325. // Unroll splitArgs to avoid runtime/heap issues
  326. a := [argsLenMax][]byte{}
  327. args := a[:0]
  328. start := -1
  329. for i, b := range arg {
  330. switch b {
  331. case ' ', '\t', '\r', '\n':
  332. if start >= 0 {
  333. args = append(args, arg[start:i])
  334. start = -1
  335. }
  336. default:
  337. if start < 0 {
  338. start = i
  339. }
  340. }
  341. }
  342. if start >= 0 {
  343. args = append(args, arg[start:])
  344. }
  345. switch len(args) {
  346. case 3:
  347. nc.ps.ma.subject = args[0]
  348. nc.ps.ma.sid = parseInt64(args[1])
  349. nc.ps.ma.reply = nil
  350. nc.ps.ma.size = int(parseInt64(args[2]))
  351. case 4:
  352. nc.ps.ma.subject = args[0]
  353. nc.ps.ma.sid = parseInt64(args[1])
  354. nc.ps.ma.reply = args[2]
  355. nc.ps.ma.size = int(parseInt64(args[3]))
  356. default:
  357. return fmt.Errorf("nats: processMsgArgs Parse Error: '%s'", arg)
  358. }
  359. if nc.ps.ma.sid < 0 {
  360. return fmt.Errorf("nats: processMsgArgs Bad or Missing Sid: '%s'", arg)
  361. }
  362. if nc.ps.ma.size < 0 {
  363. return fmt.Errorf("nats: processMsgArgs Bad or Missing Size: '%s'", arg)
  364. }
  365. return nil
  366. }
  367. // Ascii numbers 0-9
  368. const (
  369. ascii_0 = 48
  370. ascii_9 = 57
  371. )
  372. // parseInt64 expects decimal positive numbers. We
  373. // return -1 to signal error
  374. func parseInt64(d []byte) (n int64) {
  375. if len(d) == 0 {
  376. return -1
  377. }
  378. for _, dec := range d {
  379. if dec < ascii_0 || dec > ascii_9 {
  380. return -1
  381. }
  382. n = n*10 + (int64(dec) - ascii_0)
  383. }
  384. return n
  385. }