http2_client.go 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "context"
  21. "fmt"
  22. "io"
  23. "math"
  24. "net"
  25. "strconv"
  26. "strings"
  27. "sync"
  28. "sync/atomic"
  29. "time"
  30. "golang.org/x/net/http2"
  31. "golang.org/x/net/http2/hpack"
  32. "google.golang.org/grpc/codes"
  33. "google.golang.org/grpc/credentials"
  34. "google.golang.org/grpc/internal/channelz"
  35. "google.golang.org/grpc/internal/syscall"
  36. "google.golang.org/grpc/keepalive"
  37. "google.golang.org/grpc/metadata"
  38. "google.golang.org/grpc/peer"
  39. "google.golang.org/grpc/stats"
  40. "google.golang.org/grpc/status"
  41. )
  42. // http2Client implements the ClientTransport interface with HTTP2.
  43. type http2Client struct {
  44. ctx context.Context
  45. cancel context.CancelFunc
  46. ctxDone <-chan struct{} // Cache the ctx.Done() chan.
  47. userAgent string
  48. md interface{}
  49. conn net.Conn // underlying communication channel
  50. loopy *loopyWriter
  51. remoteAddr net.Addr
  52. localAddr net.Addr
  53. authInfo credentials.AuthInfo // auth info about the connection
  54. readerDone chan struct{} // sync point to enable testing.
  55. writerDone chan struct{} // sync point to enable testing.
  56. // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
  57. // that the server sent GoAway on this transport.
  58. goAway chan struct{}
  59. // awakenKeepalive is used to wake up keepalive when after it has gone dormant.
  60. awakenKeepalive chan struct{}
  61. framer *framer
  62. // controlBuf delivers all the control related tasks (e.g., window
  63. // updates, reset streams, and various settings) to the controller.
  64. controlBuf *controlBuffer
  65. fc *trInFlow
  66. // The scheme used: https if TLS is on, http otherwise.
  67. scheme string
  68. isSecure bool
  69. perRPCCreds []credentials.PerRPCCredentials
  70. // Boolean to keep track of reading activity on transport.
  71. // 1 is true and 0 is false.
  72. activity uint32 // Accessed atomically.
  73. kp keepalive.ClientParameters
  74. keepaliveEnabled bool
  75. statsHandler stats.Handler
  76. initialWindowSize int32
  77. // configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE
  78. maxSendHeaderListSize *uint32
  79. bdpEst *bdpEstimator
  80. // onPrefaceReceipt is a callback that client transport calls upon
  81. // receiving server preface to signal that a succefull HTTP2
  82. // connection was established.
  83. onPrefaceReceipt func()
  84. maxConcurrentStreams uint32
  85. streamQuota int64
  86. streamsQuotaAvailable chan struct{}
  87. waitingStreams uint32
  88. nextID uint32
  89. mu sync.Mutex // guard the following variables
  90. state transportState
  91. activeStreams map[uint32]*Stream
  92. // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
  93. prevGoAwayID uint32
  94. // goAwayReason records the http2.ErrCode and debug data received with the
  95. // GoAway frame.
  96. goAwayReason GoAwayReason
  97. // Fields below are for channelz metric collection.
  98. channelzID int64 // channelz unique identification number
  99. czData *channelzData
  100. onGoAway func(GoAwayReason)
  101. onClose func()
  102. }
  103. func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
  104. if fn != nil {
  105. return fn(ctx, addr)
  106. }
  107. return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
  108. }
  109. func isTemporary(err error) bool {
  110. switch err := err.(type) {
  111. case interface {
  112. Temporary() bool
  113. }:
  114. return err.Temporary()
  115. case interface {
  116. Timeout() bool
  117. }:
  118. // Timeouts may be resolved upon retry, and are thus treated as
  119. // temporary.
  120. return err.Timeout()
  121. }
  122. return true
  123. }
  124. // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
  125. // and starts to receive messages on it. Non-nil error returns if construction
  126. // fails.
  127. func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
  128. scheme := "http"
  129. ctx, cancel := context.WithCancel(ctx)
  130. defer func() {
  131. if err != nil {
  132. cancel()
  133. }
  134. }()
  135. conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
  136. if err != nil {
  137. if opts.FailOnNonTempDialError {
  138. return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
  139. }
  140. return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
  141. }
  142. // Any further errors will close the underlying connection
  143. defer func(conn net.Conn) {
  144. if err != nil {
  145. conn.Close()
  146. }
  147. }(conn)
  148. kp := opts.KeepaliveParams
  149. // Validate keepalive parameters.
  150. if kp.Time == 0 {
  151. kp.Time = defaultClientKeepaliveTime
  152. }
  153. if kp.Timeout == 0 {
  154. kp.Timeout = defaultClientKeepaliveTimeout
  155. }
  156. keepaliveEnabled := false
  157. if kp.Time != infinity {
  158. if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
  159. return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
  160. }
  161. keepaliveEnabled = true
  162. }
  163. var (
  164. isSecure bool
  165. authInfo credentials.AuthInfo
  166. )
  167. transportCreds := opts.TransportCredentials
  168. perRPCCreds := opts.PerRPCCredentials
  169. if b := opts.CredsBundle; b != nil {
  170. if t := b.TransportCredentials(); t != nil {
  171. transportCreds = t
  172. }
  173. if t := b.PerRPCCredentials(); t != nil {
  174. perRPCCreds = append(perRPCCreds, t)
  175. }
  176. }
  177. if transportCreds != nil {
  178. scheme = "https"
  179. conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.Authority, conn)
  180. if err != nil {
  181. return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
  182. }
  183. isSecure = true
  184. }
  185. dynamicWindow := true
  186. icwz := int32(initialWindowSize)
  187. if opts.InitialConnWindowSize >= defaultWindowSize {
  188. icwz = opts.InitialConnWindowSize
  189. dynamicWindow = false
  190. }
  191. writeBufSize := opts.WriteBufferSize
  192. readBufSize := opts.ReadBufferSize
  193. maxHeaderListSize := defaultClientMaxHeaderListSize
  194. if opts.MaxHeaderListSize != nil {
  195. maxHeaderListSize = *opts.MaxHeaderListSize
  196. }
  197. t := &http2Client{
  198. ctx: ctx,
  199. ctxDone: ctx.Done(), // Cache Done chan.
  200. cancel: cancel,
  201. userAgent: opts.UserAgent,
  202. md: addr.Metadata,
  203. conn: conn,
  204. remoteAddr: conn.RemoteAddr(),
  205. localAddr: conn.LocalAddr(),
  206. authInfo: authInfo,
  207. readerDone: make(chan struct{}),
  208. writerDone: make(chan struct{}),
  209. goAway: make(chan struct{}),
  210. awakenKeepalive: make(chan struct{}, 1),
  211. framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
  212. fc: &trInFlow{limit: uint32(icwz)},
  213. scheme: scheme,
  214. activeStreams: make(map[uint32]*Stream),
  215. isSecure: isSecure,
  216. perRPCCreds: perRPCCreds,
  217. kp: kp,
  218. statsHandler: opts.StatsHandler,
  219. initialWindowSize: initialWindowSize,
  220. onPrefaceReceipt: onPrefaceReceipt,
  221. nextID: 1,
  222. maxConcurrentStreams: defaultMaxStreamsClient,
  223. streamQuota: defaultMaxStreamsClient,
  224. streamsQuotaAvailable: make(chan struct{}, 1),
  225. czData: new(channelzData),
  226. onGoAway: onGoAway,
  227. onClose: onClose,
  228. keepaliveEnabled: keepaliveEnabled,
  229. }
  230. t.controlBuf = newControlBuffer(t.ctxDone)
  231. if opts.InitialWindowSize >= defaultWindowSize {
  232. t.initialWindowSize = opts.InitialWindowSize
  233. dynamicWindow = false
  234. }
  235. if dynamicWindow {
  236. t.bdpEst = &bdpEstimator{
  237. bdp: initialWindowSize,
  238. updateFlowControl: t.updateFlowControl,
  239. }
  240. }
  241. // Make sure awakenKeepalive can't be written upon.
  242. // keepalive routine will make it writable, if need be.
  243. t.awakenKeepalive <- struct{}{}
  244. if t.statsHandler != nil {
  245. t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
  246. RemoteAddr: t.remoteAddr,
  247. LocalAddr: t.localAddr,
  248. })
  249. connBegin := &stats.ConnBegin{
  250. Client: true,
  251. }
  252. t.statsHandler.HandleConn(t.ctx, connBegin)
  253. }
  254. if channelz.IsOn() {
  255. t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
  256. }
  257. if t.keepaliveEnabled {
  258. go t.keepalive()
  259. }
  260. // Start the reader goroutine for incoming message. Each transport has
  261. // a dedicated goroutine which reads HTTP2 frame from network. Then it
  262. // dispatches the frame to the corresponding stream entity.
  263. go t.reader()
  264. // Send connection preface to server.
  265. n, err := t.conn.Write(clientPreface)
  266. if err != nil {
  267. t.Close()
  268. return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
  269. }
  270. if n != len(clientPreface) {
  271. t.Close()
  272. return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
  273. }
  274. var ss []http2.Setting
  275. if t.initialWindowSize != defaultWindowSize {
  276. ss = append(ss, http2.Setting{
  277. ID: http2.SettingInitialWindowSize,
  278. Val: uint32(t.initialWindowSize),
  279. })
  280. }
  281. if opts.MaxHeaderListSize != nil {
  282. ss = append(ss, http2.Setting{
  283. ID: http2.SettingMaxHeaderListSize,
  284. Val: *opts.MaxHeaderListSize,
  285. })
  286. }
  287. err = t.framer.fr.WriteSettings(ss...)
  288. if err != nil {
  289. t.Close()
  290. return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
  291. }
  292. // Adjust the connection flow control window if needed.
  293. if delta := uint32(icwz - defaultWindowSize); delta > 0 {
  294. if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
  295. t.Close()
  296. return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
  297. }
  298. }
  299. if err := t.framer.writer.Flush(); err != nil {
  300. return nil, err
  301. }
  302. go func() {
  303. t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
  304. err := t.loopy.run()
  305. if err != nil {
  306. errorf("transport: loopyWriter.run returning. Err: %v", err)
  307. }
  308. // If it's a connection error, let reader goroutine handle it
  309. // since there might be data in the buffers.
  310. if _, ok := err.(net.Error); !ok {
  311. t.conn.Close()
  312. }
  313. close(t.writerDone)
  314. }()
  315. return t, nil
  316. }
  317. func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
  318. // TODO(zhaoq): Handle uint32 overflow of Stream.id.
  319. s := &Stream{
  320. done: make(chan struct{}),
  321. method: callHdr.Method,
  322. sendCompress: callHdr.SendCompress,
  323. buf: newRecvBuffer(),
  324. headerChan: make(chan struct{}),
  325. contentSubtype: callHdr.ContentSubtype,
  326. }
  327. s.wq = newWriteQuota(defaultWriteQuota, s.done)
  328. s.requestRead = func(n int) {
  329. t.adjustWindow(s, uint32(n))
  330. }
  331. // The client side stream context should have exactly the same life cycle with the user provided context.
  332. // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
  333. // So we use the original context here instead of creating a copy.
  334. s.ctx = ctx
  335. s.trReader = &transportReader{
  336. reader: &recvBufferReader{
  337. ctx: s.ctx,
  338. ctxDone: s.ctx.Done(),
  339. recv: s.buf,
  340. closeStream: func(err error) {
  341. t.CloseStream(s, err)
  342. },
  343. },
  344. windowHandler: func(n int) {
  345. t.updateWindow(s, uint32(n))
  346. },
  347. }
  348. return s
  349. }
  350. func (t *http2Client) getPeer() *peer.Peer {
  351. pr := &peer.Peer{
  352. Addr: t.remoteAddr,
  353. }
  354. // Attach Auth info if there is any.
  355. if t.authInfo != nil {
  356. pr.AuthInfo = t.authInfo
  357. }
  358. return pr
  359. }
  360. func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
  361. aud := t.createAudience(callHdr)
  362. authData, err := t.getTrAuthData(ctx, aud)
  363. if err != nil {
  364. return nil, err
  365. }
  366. callAuthData, err := t.getCallAuthData(ctx, aud, callHdr)
  367. if err != nil {
  368. return nil, err
  369. }
  370. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  371. // first and create a slice of that exact size.
  372. // Make the slice of certain predictable size to reduce allocations made by append.
  373. hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
  374. hfLen += len(authData) + len(callAuthData)
  375. headerFields := make([]hpack.HeaderField, 0, hfLen)
  376. headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
  377. headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
  378. headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
  379. headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
  380. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(callHdr.ContentSubtype)})
  381. headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
  382. headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
  383. if callHdr.PreviousAttempts > 0 {
  384. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
  385. }
  386. if callHdr.SendCompress != "" {
  387. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
  388. }
  389. if dl, ok := ctx.Deadline(); ok {
  390. // Send out timeout regardless its value. The server can detect timeout context by itself.
  391. // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
  392. timeout := time.Until(dl)
  393. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
  394. }
  395. for k, v := range authData {
  396. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  397. }
  398. for k, v := range callAuthData {
  399. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  400. }
  401. if b := stats.OutgoingTags(ctx); b != nil {
  402. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
  403. }
  404. if b := stats.OutgoingTrace(ctx); b != nil {
  405. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
  406. }
  407. if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
  408. var k string
  409. for _, vv := range added {
  410. for i, v := range vv {
  411. if i%2 == 0 {
  412. k = v
  413. continue
  414. }
  415. // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
  416. if isReservedHeader(k) {
  417. continue
  418. }
  419. headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)})
  420. }
  421. }
  422. for k, vv := range md {
  423. // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
  424. if isReservedHeader(k) {
  425. continue
  426. }
  427. for _, v := range vv {
  428. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  429. }
  430. }
  431. }
  432. if md, ok := t.md.(*metadata.MD); ok {
  433. for k, vv := range *md {
  434. if isReservedHeader(k) {
  435. continue
  436. }
  437. for _, v := range vv {
  438. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  439. }
  440. }
  441. }
  442. return headerFields, nil
  443. }
  444. func (t *http2Client) createAudience(callHdr *CallHdr) string {
  445. // Create an audience string only if needed.
  446. if len(t.perRPCCreds) == 0 && callHdr.Creds == nil {
  447. return ""
  448. }
  449. // Construct URI required to get auth request metadata.
  450. // Omit port if it is the default one.
  451. host := strings.TrimSuffix(callHdr.Host, ":443")
  452. pos := strings.LastIndex(callHdr.Method, "/")
  453. if pos == -1 {
  454. pos = len(callHdr.Method)
  455. }
  456. return "https://" + host + callHdr.Method[:pos]
  457. }
  458. func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
  459. authData := map[string]string{}
  460. for _, c := range t.perRPCCreds {
  461. data, err := c.GetRequestMetadata(ctx, audience)
  462. if err != nil {
  463. if _, ok := status.FromError(err); ok {
  464. return nil, err
  465. }
  466. return nil, status.Errorf(codes.Unauthenticated, "transport: %v", err)
  467. }
  468. for k, v := range data {
  469. // Capital header names are illegal in HTTP/2.
  470. k = strings.ToLower(k)
  471. authData[k] = v
  472. }
  473. }
  474. return authData, nil
  475. }
  476. func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
  477. callAuthData := map[string]string{}
  478. // Check if credentials.PerRPCCredentials were provided via call options.
  479. // Note: if these credentials are provided both via dial options and call
  480. // options, then both sets of credentials will be applied.
  481. if callCreds := callHdr.Creds; callCreds != nil {
  482. if !t.isSecure && callCreds.RequireTransportSecurity() {
  483. return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
  484. }
  485. data, err := callCreds.GetRequestMetadata(ctx, audience)
  486. if err != nil {
  487. return nil, status.Errorf(codes.Internal, "transport: %v", err)
  488. }
  489. for k, v := range data {
  490. // Capital header names are illegal in HTTP/2
  491. k = strings.ToLower(k)
  492. callAuthData[k] = v
  493. }
  494. }
  495. return callAuthData, nil
  496. }
  497. // NewStream creates a stream and registers it into the transport as "active"
  498. // streams.
  499. func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
  500. ctx = peer.NewContext(ctx, t.getPeer())
  501. headerFields, err := t.createHeaderFields(ctx, callHdr)
  502. if err != nil {
  503. return nil, err
  504. }
  505. s := t.newStream(ctx, callHdr)
  506. cleanup := func(err error) {
  507. if s.swapState(streamDone) == streamDone {
  508. // If it was already done, return.
  509. return
  510. }
  511. // The stream was unprocessed by the server.
  512. atomic.StoreUint32(&s.unprocessed, 1)
  513. s.write(recvMsg{err: err})
  514. close(s.done)
  515. // If headerChan isn't closed, then close it.
  516. if atomic.SwapUint32(&s.headerDone, 1) == 0 {
  517. close(s.headerChan)
  518. }
  519. }
  520. hdr := &headerFrame{
  521. hf: headerFields,
  522. endStream: false,
  523. initStream: func(id uint32) (bool, error) {
  524. t.mu.Lock()
  525. if state := t.state; state != reachable {
  526. t.mu.Unlock()
  527. // Do a quick cleanup.
  528. err := error(errStreamDrain)
  529. if state == closing {
  530. err = ErrConnClosing
  531. }
  532. cleanup(err)
  533. return false, err
  534. }
  535. t.activeStreams[id] = s
  536. if channelz.IsOn() {
  537. atomic.AddInt64(&t.czData.streamsStarted, 1)
  538. atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
  539. }
  540. var sendPing bool
  541. // If the number of active streams change from 0 to 1, then check if keepalive
  542. // has gone dormant. If so, wake it up.
  543. if len(t.activeStreams) == 1 && t.keepaliveEnabled {
  544. select {
  545. case t.awakenKeepalive <- struct{}{}:
  546. sendPing = true
  547. // Fill the awakenKeepalive channel again as this channel must be
  548. // kept non-writable except at the point that the keepalive()
  549. // goroutine is waiting either to be awaken or shutdown.
  550. t.awakenKeepalive <- struct{}{}
  551. default:
  552. }
  553. }
  554. t.mu.Unlock()
  555. return sendPing, nil
  556. },
  557. onOrphaned: cleanup,
  558. wq: s.wq,
  559. }
  560. firstTry := true
  561. var ch chan struct{}
  562. checkForStreamQuota := func(it interface{}) bool {
  563. if t.streamQuota <= 0 { // Can go negative if server decreases it.
  564. if firstTry {
  565. t.waitingStreams++
  566. }
  567. ch = t.streamsQuotaAvailable
  568. return false
  569. }
  570. if !firstTry {
  571. t.waitingStreams--
  572. }
  573. t.streamQuota--
  574. h := it.(*headerFrame)
  575. h.streamID = t.nextID
  576. t.nextID += 2
  577. s.id = h.streamID
  578. s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
  579. if t.streamQuota > 0 && t.waitingStreams > 0 {
  580. select {
  581. case t.streamsQuotaAvailable <- struct{}{}:
  582. default:
  583. }
  584. }
  585. return true
  586. }
  587. var hdrListSizeErr error
  588. checkForHeaderListSize := func(it interface{}) bool {
  589. if t.maxSendHeaderListSize == nil {
  590. return true
  591. }
  592. hdrFrame := it.(*headerFrame)
  593. var sz int64
  594. for _, f := range hdrFrame.hf {
  595. if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
  596. hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
  597. return false
  598. }
  599. }
  600. return true
  601. }
  602. for {
  603. success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
  604. if !checkForStreamQuota(it) {
  605. return false
  606. }
  607. if !checkForHeaderListSize(it) {
  608. return false
  609. }
  610. return true
  611. }, hdr)
  612. if err != nil {
  613. return nil, err
  614. }
  615. if success {
  616. break
  617. }
  618. if hdrListSizeErr != nil {
  619. return nil, hdrListSizeErr
  620. }
  621. firstTry = false
  622. select {
  623. case <-ch:
  624. case <-s.ctx.Done():
  625. return nil, ContextErr(s.ctx.Err())
  626. case <-t.goAway:
  627. return nil, errStreamDrain
  628. case <-t.ctx.Done():
  629. return nil, ErrConnClosing
  630. }
  631. }
  632. if t.statsHandler != nil {
  633. outHeader := &stats.OutHeader{
  634. Client: true,
  635. FullMethod: callHdr.Method,
  636. RemoteAddr: t.remoteAddr,
  637. LocalAddr: t.localAddr,
  638. Compression: callHdr.SendCompress,
  639. }
  640. t.statsHandler.HandleRPC(s.ctx, outHeader)
  641. }
  642. return s, nil
  643. }
  644. // CloseStream clears the footprint of a stream when the stream is not needed any more.
  645. // This must not be executed in reader's goroutine.
  646. func (t *http2Client) CloseStream(s *Stream, err error) {
  647. var (
  648. rst bool
  649. rstCode http2.ErrCode
  650. )
  651. if err != nil {
  652. rst = true
  653. rstCode = http2.ErrCodeCancel
  654. }
  655. t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
  656. }
  657. func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
  658. // Set stream status to done.
  659. if s.swapState(streamDone) == streamDone {
  660. // If it was already done, return. If multiple closeStream calls
  661. // happen simultaneously, wait for the first to finish.
  662. <-s.done
  663. return
  664. }
  665. // status and trailers can be updated here without any synchronization because the stream goroutine will
  666. // only read it after it sees an io.EOF error from read or write and we'll write those errors
  667. // only after updating this.
  668. s.status = st
  669. if len(mdata) > 0 {
  670. s.trailer = mdata
  671. }
  672. if err != nil {
  673. // This will unblock reads eventually.
  674. s.write(recvMsg{err: err})
  675. }
  676. // If headerChan isn't closed, then close it.
  677. if atomic.SwapUint32(&s.headerDone, 1) == 0 {
  678. s.noHeaders = true
  679. close(s.headerChan)
  680. }
  681. cleanup := &cleanupStream{
  682. streamID: s.id,
  683. onWrite: func() {
  684. t.mu.Lock()
  685. if t.activeStreams != nil {
  686. delete(t.activeStreams, s.id)
  687. }
  688. t.mu.Unlock()
  689. if channelz.IsOn() {
  690. if eosReceived {
  691. atomic.AddInt64(&t.czData.streamsSucceeded, 1)
  692. } else {
  693. atomic.AddInt64(&t.czData.streamsFailed, 1)
  694. }
  695. }
  696. },
  697. rst: rst,
  698. rstCode: rstCode,
  699. }
  700. addBackStreamQuota := func(interface{}) bool {
  701. t.streamQuota++
  702. if t.streamQuota > 0 && t.waitingStreams > 0 {
  703. select {
  704. case t.streamsQuotaAvailable <- struct{}{}:
  705. default:
  706. }
  707. }
  708. return true
  709. }
  710. t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
  711. // This will unblock write.
  712. close(s.done)
  713. }
  714. // Close kicks off the shutdown process of the transport. This should be called
  715. // only once on a transport. Once it is called, the transport should not be
  716. // accessed any more.
  717. //
  718. // This method blocks until the addrConn that initiated this transport is
  719. // re-connected. This happens because t.onClose() begins reconnect logic at the
  720. // addrConn level and blocks until the addrConn is successfully connected.
  721. func (t *http2Client) Close() error {
  722. t.mu.Lock()
  723. // Make sure we only Close once.
  724. if t.state == closing {
  725. t.mu.Unlock()
  726. return nil
  727. }
  728. t.state = closing
  729. streams := t.activeStreams
  730. t.activeStreams = nil
  731. t.mu.Unlock()
  732. t.controlBuf.finish()
  733. t.cancel()
  734. err := t.conn.Close()
  735. if channelz.IsOn() {
  736. channelz.RemoveEntry(t.channelzID)
  737. }
  738. // Notify all active streams.
  739. for _, s := range streams {
  740. t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
  741. }
  742. if t.statsHandler != nil {
  743. connEnd := &stats.ConnEnd{
  744. Client: true,
  745. }
  746. t.statsHandler.HandleConn(t.ctx, connEnd)
  747. }
  748. t.onClose()
  749. return err
  750. }
  751. // GracefulClose sets the state to draining, which prevents new streams from
  752. // being created and causes the transport to be closed when the last active
  753. // stream is closed. If there are no active streams, the transport is closed
  754. // immediately. This does nothing if the transport is already draining or
  755. // closing.
  756. func (t *http2Client) GracefulClose() error {
  757. t.mu.Lock()
  758. // Make sure we move to draining only from active.
  759. if t.state == draining || t.state == closing {
  760. t.mu.Unlock()
  761. return nil
  762. }
  763. t.state = draining
  764. active := len(t.activeStreams)
  765. t.mu.Unlock()
  766. if active == 0 {
  767. return t.Close()
  768. }
  769. t.controlBuf.put(&incomingGoAway{})
  770. return nil
  771. }
  772. // Write formats the data into HTTP2 data frame(s) and sends it out. The caller
  773. // should proceed only if Write returns nil.
  774. func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
  775. if opts.Last {
  776. // If it's the last message, update stream state.
  777. if !s.compareAndSwapState(streamActive, streamWriteDone) {
  778. return errStreamDone
  779. }
  780. } else if s.getState() != streamActive {
  781. return errStreamDone
  782. }
  783. df := &dataFrame{
  784. streamID: s.id,
  785. endStream: opts.Last,
  786. }
  787. if hdr != nil || data != nil { // If it's not an empty data frame.
  788. // Add some data to grpc message header so that we can equally
  789. // distribute bytes across frames.
  790. emptyLen := http2MaxFrameLen - len(hdr)
  791. if emptyLen > len(data) {
  792. emptyLen = len(data)
  793. }
  794. hdr = append(hdr, data[:emptyLen]...)
  795. data = data[emptyLen:]
  796. df.h, df.d = hdr, data
  797. // TODO(mmukhi): The above logic in this if can be moved to loopyWriter's data handler.
  798. if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
  799. return err
  800. }
  801. }
  802. return t.controlBuf.put(df)
  803. }
  804. func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
  805. t.mu.Lock()
  806. defer t.mu.Unlock()
  807. s, ok := t.activeStreams[f.Header().StreamID]
  808. return s, ok
  809. }
  810. // adjustWindow sends out extra window update over the initial window size
  811. // of stream if the application is requesting data larger in size than
  812. // the window.
  813. func (t *http2Client) adjustWindow(s *Stream, n uint32) {
  814. if w := s.fc.maybeAdjust(n); w > 0 {
  815. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  816. }
  817. }
  818. // updateWindow adjusts the inbound quota for the stream.
  819. // Window updates will be sent out when the cumulative quota
  820. // exceeds the corresponding threshold.
  821. func (t *http2Client) updateWindow(s *Stream, n uint32) {
  822. if w := s.fc.onRead(n); w > 0 {
  823. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  824. }
  825. }
  826. // updateFlowControl updates the incoming flow control windows
  827. // for the transport and the stream based on the current bdp
  828. // estimation.
  829. func (t *http2Client) updateFlowControl(n uint32) {
  830. t.mu.Lock()
  831. for _, s := range t.activeStreams {
  832. s.fc.newLimit(n)
  833. }
  834. t.mu.Unlock()
  835. updateIWS := func(interface{}) bool {
  836. t.initialWindowSize = int32(n)
  837. return true
  838. }
  839. t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
  840. t.controlBuf.put(&outgoingSettings{
  841. ss: []http2.Setting{
  842. {
  843. ID: http2.SettingInitialWindowSize,
  844. Val: n,
  845. },
  846. },
  847. })
  848. }
  849. func (t *http2Client) handleData(f *http2.DataFrame) {
  850. size := f.Header().Length
  851. var sendBDPPing bool
  852. if t.bdpEst != nil {
  853. sendBDPPing = t.bdpEst.add(size)
  854. }
  855. // Decouple connection's flow control from application's read.
  856. // An update on connection's flow control should not depend on
  857. // whether user application has read the data or not. Such a
  858. // restriction is already imposed on the stream's flow control,
  859. // and therefore the sender will be blocked anyways.
  860. // Decoupling the connection flow control will prevent other
  861. // active(fast) streams from starving in presence of slow or
  862. // inactive streams.
  863. //
  864. if w := t.fc.onData(size); w > 0 {
  865. t.controlBuf.put(&outgoingWindowUpdate{
  866. streamID: 0,
  867. increment: w,
  868. })
  869. }
  870. if sendBDPPing {
  871. // Avoid excessive ping detection (e.g. in an L7 proxy)
  872. // by sending a window update prior to the BDP ping.
  873. if w := t.fc.reset(); w > 0 {
  874. t.controlBuf.put(&outgoingWindowUpdate{
  875. streamID: 0,
  876. increment: w,
  877. })
  878. }
  879. t.controlBuf.put(bdpPing)
  880. }
  881. // Select the right stream to dispatch.
  882. s, ok := t.getStream(f)
  883. if !ok {
  884. return
  885. }
  886. if size > 0 {
  887. if err := s.fc.onData(size); err != nil {
  888. t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
  889. return
  890. }
  891. if f.Header().Flags.Has(http2.FlagDataPadded) {
  892. if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
  893. t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
  894. }
  895. }
  896. // TODO(bradfitz, zhaoq): A copy is required here because there is no
  897. // guarantee f.Data() is consumed before the arrival of next frame.
  898. // Can this copy be eliminated?
  899. if len(f.Data()) > 0 {
  900. data := make([]byte, len(f.Data()))
  901. copy(data, f.Data())
  902. s.write(recvMsg{data: data})
  903. }
  904. }
  905. // The server has closed the stream without sending trailers. Record that
  906. // the read direction is closed, and set the status appropriately.
  907. if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
  908. t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
  909. }
  910. }
  911. func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
  912. s, ok := t.getStream(f)
  913. if !ok {
  914. return
  915. }
  916. if f.ErrCode == http2.ErrCodeRefusedStream {
  917. // The stream was unprocessed by the server.
  918. atomic.StoreUint32(&s.unprocessed, 1)
  919. }
  920. statusCode, ok := http2ErrConvTab[f.ErrCode]
  921. if !ok {
  922. warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
  923. statusCode = codes.Unknown
  924. }
  925. if statusCode == codes.Canceled {
  926. // Our deadline was already exceeded, and that was likely the cause of
  927. // this cancelation. Alter the status code accordingly.
  928. if d, ok := s.ctx.Deadline(); ok && d.After(time.Now()) {
  929. statusCode = codes.DeadlineExceeded
  930. }
  931. }
  932. t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
  933. }
  934. func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
  935. if f.IsAck() {
  936. return
  937. }
  938. var maxStreams *uint32
  939. var ss []http2.Setting
  940. var updateFuncs []func()
  941. f.ForeachSetting(func(s http2.Setting) error {
  942. switch s.ID {
  943. case http2.SettingMaxConcurrentStreams:
  944. maxStreams = new(uint32)
  945. *maxStreams = s.Val
  946. case http2.SettingMaxHeaderListSize:
  947. updateFuncs = append(updateFuncs, func() {
  948. t.maxSendHeaderListSize = new(uint32)
  949. *t.maxSendHeaderListSize = s.Val
  950. })
  951. default:
  952. ss = append(ss, s)
  953. }
  954. return nil
  955. })
  956. if isFirst && maxStreams == nil {
  957. maxStreams = new(uint32)
  958. *maxStreams = math.MaxUint32
  959. }
  960. sf := &incomingSettings{
  961. ss: ss,
  962. }
  963. if maxStreams != nil {
  964. updateStreamQuota := func() {
  965. delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
  966. t.maxConcurrentStreams = *maxStreams
  967. t.streamQuota += delta
  968. if delta > 0 && t.waitingStreams > 0 {
  969. close(t.streamsQuotaAvailable) // wake all of them up.
  970. t.streamsQuotaAvailable = make(chan struct{}, 1)
  971. }
  972. }
  973. updateFuncs = append(updateFuncs, updateStreamQuota)
  974. }
  975. t.controlBuf.executeAndPut(func(interface{}) bool {
  976. for _, f := range updateFuncs {
  977. f()
  978. }
  979. return true
  980. }, sf)
  981. }
  982. func (t *http2Client) handlePing(f *http2.PingFrame) {
  983. if f.IsAck() {
  984. // Maybe it's a BDP ping.
  985. if t.bdpEst != nil {
  986. t.bdpEst.calculate(f.Data)
  987. }
  988. return
  989. }
  990. pingAck := &ping{ack: true}
  991. copy(pingAck.data[:], f.Data[:])
  992. t.controlBuf.put(pingAck)
  993. }
  994. func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
  995. t.mu.Lock()
  996. if t.state == closing {
  997. t.mu.Unlock()
  998. return
  999. }
  1000. if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
  1001. infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
  1002. }
  1003. id := f.LastStreamID
  1004. if id > 0 && id%2 != 1 {
  1005. t.mu.Unlock()
  1006. t.Close()
  1007. return
  1008. }
  1009. // A client can receive multiple GoAways from the server (see
  1010. // https://github.com/grpc/grpc-go/issues/1387). The idea is that the first
  1011. // GoAway will be sent with an ID of MaxInt32 and the second GoAway will be
  1012. // sent after an RTT delay with the ID of the last stream the server will
  1013. // process.
  1014. //
  1015. // Therefore, when we get the first GoAway we don't necessarily close any
  1016. // streams. While in case of second GoAway we close all streams created after
  1017. // the GoAwayId. This way streams that were in-flight while the GoAway from
  1018. // server was being sent don't get killed.
  1019. select {
  1020. case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
  1021. // If there are multiple GoAways the first one should always have an ID greater than the following ones.
  1022. if id > t.prevGoAwayID {
  1023. t.mu.Unlock()
  1024. t.Close()
  1025. return
  1026. }
  1027. default:
  1028. t.setGoAwayReason(f)
  1029. close(t.goAway)
  1030. t.state = draining
  1031. t.controlBuf.put(&incomingGoAway{})
  1032. // This has to be a new goroutine because we're still using the current goroutine to read in the transport.
  1033. t.onGoAway(t.goAwayReason)
  1034. }
  1035. // All streams with IDs greater than the GoAwayId
  1036. // and smaller than the previous GoAway ID should be killed.
  1037. upperLimit := t.prevGoAwayID
  1038. if upperLimit == 0 { // This is the first GoAway Frame.
  1039. upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
  1040. }
  1041. for streamID, stream := range t.activeStreams {
  1042. if streamID > id && streamID <= upperLimit {
  1043. // The stream was unprocessed by the server.
  1044. atomic.StoreUint32(&stream.unprocessed, 1)
  1045. t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
  1046. }
  1047. }
  1048. t.prevGoAwayID = id
  1049. active := len(t.activeStreams)
  1050. t.mu.Unlock()
  1051. if active == 0 {
  1052. t.Close()
  1053. }
  1054. }
  1055. // setGoAwayReason sets the value of t.goAwayReason based
  1056. // on the GoAway frame received.
  1057. // It expects a lock on transport's mutext to be held by
  1058. // the caller.
  1059. func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
  1060. t.goAwayReason = GoAwayNoReason
  1061. switch f.ErrCode {
  1062. case http2.ErrCodeEnhanceYourCalm:
  1063. if string(f.DebugData()) == "too_many_pings" {
  1064. t.goAwayReason = GoAwayTooManyPings
  1065. }
  1066. }
  1067. }
  1068. func (t *http2Client) GetGoAwayReason() GoAwayReason {
  1069. t.mu.Lock()
  1070. defer t.mu.Unlock()
  1071. return t.goAwayReason
  1072. }
  1073. func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
  1074. t.controlBuf.put(&incomingWindowUpdate{
  1075. streamID: f.Header().StreamID,
  1076. increment: f.Increment,
  1077. })
  1078. }
  1079. // operateHeaders takes action on the decoded headers.
  1080. func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
  1081. s, ok := t.getStream(frame)
  1082. if !ok {
  1083. return
  1084. }
  1085. atomic.StoreUint32(&s.bytesReceived, 1)
  1086. var state decodeState
  1087. if err := state.decodeHeader(frame); err != nil {
  1088. t.closeStream(s, err, true, http2.ErrCodeProtocol, status.New(codes.Internal, err.Error()), nil, false)
  1089. // Something wrong. Stops reading even when there is remaining.
  1090. return
  1091. }
  1092. endStream := frame.StreamEnded()
  1093. var isHeader bool
  1094. defer func() {
  1095. if t.statsHandler != nil {
  1096. if isHeader {
  1097. inHeader := &stats.InHeader{
  1098. Client: true,
  1099. WireLength: int(frame.Header().Length),
  1100. }
  1101. t.statsHandler.HandleRPC(s.ctx, inHeader)
  1102. } else {
  1103. inTrailer := &stats.InTrailer{
  1104. Client: true,
  1105. WireLength: int(frame.Header().Length),
  1106. }
  1107. t.statsHandler.HandleRPC(s.ctx, inTrailer)
  1108. }
  1109. }
  1110. }()
  1111. // If headers haven't been received yet.
  1112. if atomic.SwapUint32(&s.headerDone, 1) == 0 {
  1113. if !endStream {
  1114. // Headers frame is not actually a trailers-only frame.
  1115. isHeader = true
  1116. // These values can be set without any synchronization because
  1117. // stream goroutine will read it only after seeing a closed
  1118. // headerChan which we'll close after setting this.
  1119. s.recvCompress = state.encoding
  1120. if len(state.mdata) > 0 {
  1121. s.header = state.mdata
  1122. }
  1123. } else {
  1124. s.noHeaders = true
  1125. }
  1126. close(s.headerChan)
  1127. }
  1128. if !endStream {
  1129. return
  1130. }
  1131. // if client received END_STREAM from server while stream was still active, send RST_STREAM
  1132. rst := s.getState() == streamActive
  1133. t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.mdata, true)
  1134. }
  1135. // reader runs as a separate goroutine in charge of reading data from network
  1136. // connection.
  1137. //
  1138. // TODO(zhaoq): currently one reader per transport. Investigate whether this is
  1139. // optimal.
  1140. // TODO(zhaoq): Check the validity of the incoming frame sequence.
  1141. func (t *http2Client) reader() {
  1142. defer close(t.readerDone)
  1143. // Check the validity of server preface.
  1144. frame, err := t.framer.fr.ReadFrame()
  1145. if err != nil {
  1146. t.Close() // this kicks off resetTransport, so must be last before return
  1147. return
  1148. }
  1149. t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
  1150. if t.keepaliveEnabled {
  1151. atomic.CompareAndSwapUint32(&t.activity, 0, 1)
  1152. }
  1153. sf, ok := frame.(*http2.SettingsFrame)
  1154. if !ok {
  1155. t.Close() // this kicks off resetTransport, so must be last before return
  1156. return
  1157. }
  1158. t.onPrefaceReceipt()
  1159. t.handleSettings(sf, true)
  1160. // loop to keep reading incoming messages on this transport.
  1161. for {
  1162. frame, err := t.framer.fr.ReadFrame()
  1163. if t.keepaliveEnabled {
  1164. atomic.CompareAndSwapUint32(&t.activity, 0, 1)
  1165. }
  1166. if err != nil {
  1167. // Abort an active stream if the http2.Framer returns a
  1168. // http2.StreamError. This can happen only if the server's response
  1169. // is malformed http2.
  1170. if se, ok := err.(http2.StreamError); ok {
  1171. t.mu.Lock()
  1172. s := t.activeStreams[se.StreamID]
  1173. t.mu.Unlock()
  1174. if s != nil {
  1175. // use error detail to provide better err message
  1176. code := http2ErrConvTab[se.Code]
  1177. msg := t.framer.fr.ErrorDetail().Error()
  1178. t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
  1179. }
  1180. continue
  1181. } else {
  1182. // Transport error.
  1183. t.Close()
  1184. return
  1185. }
  1186. }
  1187. switch frame := frame.(type) {
  1188. case *http2.MetaHeadersFrame:
  1189. t.operateHeaders(frame)
  1190. case *http2.DataFrame:
  1191. t.handleData(frame)
  1192. case *http2.RSTStreamFrame:
  1193. t.handleRSTStream(frame)
  1194. case *http2.SettingsFrame:
  1195. t.handleSettings(frame, false)
  1196. case *http2.PingFrame:
  1197. t.handlePing(frame)
  1198. case *http2.GoAwayFrame:
  1199. t.handleGoAway(frame)
  1200. case *http2.WindowUpdateFrame:
  1201. t.handleWindowUpdate(frame)
  1202. default:
  1203. errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
  1204. }
  1205. }
  1206. }
  1207. // keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
  1208. func (t *http2Client) keepalive() {
  1209. p := &ping{data: [8]byte{}}
  1210. timer := time.NewTimer(t.kp.Time)
  1211. for {
  1212. select {
  1213. case <-timer.C:
  1214. if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
  1215. timer.Reset(t.kp.Time)
  1216. continue
  1217. }
  1218. // Check if keepalive should go dormant.
  1219. t.mu.Lock()
  1220. if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
  1221. // Make awakenKeepalive writable.
  1222. <-t.awakenKeepalive
  1223. t.mu.Unlock()
  1224. select {
  1225. case <-t.awakenKeepalive:
  1226. // If the control gets here a ping has been sent
  1227. // need to reset the timer with keepalive.Timeout.
  1228. case <-t.ctx.Done():
  1229. return
  1230. }
  1231. } else {
  1232. t.mu.Unlock()
  1233. if channelz.IsOn() {
  1234. atomic.AddInt64(&t.czData.kpCount, 1)
  1235. }
  1236. // Send ping.
  1237. t.controlBuf.put(p)
  1238. }
  1239. // By the time control gets here a ping has been sent one way or the other.
  1240. timer.Reset(t.kp.Timeout)
  1241. select {
  1242. case <-timer.C:
  1243. if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
  1244. timer.Reset(t.kp.Time)
  1245. continue
  1246. }
  1247. t.Close()
  1248. return
  1249. case <-t.ctx.Done():
  1250. if !timer.Stop() {
  1251. <-timer.C
  1252. }
  1253. return
  1254. }
  1255. case <-t.ctx.Done():
  1256. if !timer.Stop() {
  1257. <-timer.C
  1258. }
  1259. return
  1260. }
  1261. }
  1262. }
  1263. func (t *http2Client) Error() <-chan struct{} {
  1264. return t.ctx.Done()
  1265. }
  1266. func (t *http2Client) GoAway() <-chan struct{} {
  1267. return t.goAway
  1268. }
  1269. func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
  1270. s := channelz.SocketInternalMetric{
  1271. StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
  1272. StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
  1273. StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
  1274. MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
  1275. MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
  1276. KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
  1277. LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
  1278. LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
  1279. LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
  1280. LocalFlowControlWindow: int64(t.fc.getSize()),
  1281. SocketOptions: channelz.GetSocketOption(t.conn),
  1282. LocalAddr: t.localAddr,
  1283. RemoteAddr: t.remoteAddr,
  1284. // RemoteName :
  1285. }
  1286. if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
  1287. s.Security = au.GetSecurityValue()
  1288. }
  1289. s.RemoteFlowControlWindow = t.getOutFlowWindow()
  1290. return &s
  1291. }
  1292. func (t *http2Client) IncrMsgSent() {
  1293. atomic.AddInt64(&t.czData.msgSent, 1)
  1294. atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
  1295. }
  1296. func (t *http2Client) IncrMsgRecv() {
  1297. atomic.AddInt64(&t.czData.msgRecv, 1)
  1298. atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
  1299. }
  1300. func (t *http2Client) getOutFlowWindow() int64 {
  1301. resp := make(chan uint32, 1)
  1302. timer := time.NewTimer(time.Second)
  1303. defer timer.Stop()
  1304. t.controlBuf.put(&outFlowControlSizeRequest{resp})
  1305. select {
  1306. case sz := <-resp:
  1307. return int64(sz)
  1308. case <-t.ctxDone:
  1309. return -1
  1310. case <-timer.C:
  1311. return -2
  1312. }
  1313. }