stream.js 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710
  1. 'use strict'
  2. var transport = require('../spdy-transport')
  3. var assert = require('assert')
  4. var util = require('util')
  5. var debug = {
  6. client: require('debug')('spdy:stream:client'),
  7. server: require('debug')('spdy:stream:server')
  8. }
  9. var Duplex = require('readable-stream').Duplex
  10. function Stream (connection, options) {
  11. Duplex.call(this)
  12. var connectionState = connection._spdyState
  13. var state = {}
  14. this._spdyState = state
  15. this.id = options.id
  16. this.method = options.method
  17. this.path = options.path
  18. this.host = options.host
  19. this.headers = options.headers || {}
  20. this.connection = connection
  21. this.parent = options.parent || null
  22. state.socket = null
  23. state.protocol = connectionState.protocol
  24. state.constants = state.protocol.constants
  25. // See _initPriority()
  26. state.priority = null
  27. state.version = this.connection.getVersion()
  28. state.isServer = this.connection.isServer()
  29. state.debug = state.isServer ? debug.server : debug.client
  30. state.framer = connectionState.framer
  31. state.parser = connectionState.parser
  32. state.request = options.request
  33. state.needResponse = options.request
  34. state.window = connectionState.streamWindow.clone(options.id)
  35. state.sessionWindow = connectionState.window
  36. state.maxChunk = connectionState.maxChunk
  37. // Can't send incoming request
  38. // (See `.send()` method)
  39. state.sent = !state.request
  40. state.readable = options.readable !== false
  41. state.writable = options.writable !== false
  42. state.aborted = false
  43. state.corked = 0
  44. state.corkQueue = []
  45. state.timeout = new transport.utils.Timeout(this)
  46. this.on('finish', this._onFinish)
  47. this.on('end', this._onEnd)
  48. var self = this
  49. function _onWindowOverflow () {
  50. self._onWindowOverflow()
  51. }
  52. state.window.recv.on('overflow', _onWindowOverflow)
  53. state.window.send.on('overflow', _onWindowOverflow)
  54. this._initPriority(options.priority)
  55. if (!state.readable) { this.push(null) }
  56. if (!state.writable) {
  57. this._writableState.ended = true
  58. this._writableState.finished = true
  59. }
  60. }
  61. util.inherits(Stream, Duplex)
  62. exports.Stream = Stream
  63. Stream.prototype._init = function _init (socket) {
  64. this.socket = socket
  65. }
  66. Stream.prototype._initPriority = function _initPriority (priority) {
  67. var state = this._spdyState
  68. var connectionState = this.connection._spdyState
  69. var root = connectionState.priorityRoot
  70. if (!priority) {
  71. state.priority = root.addDefault(this.id)
  72. return
  73. }
  74. state.priority = root.add({
  75. id: this.id,
  76. parent: priority.parent,
  77. weight: priority.weight,
  78. exclusive: priority.exclusive
  79. })
  80. }
  81. Stream.prototype._handleFrame = function _handleFrame (frame) {
  82. var state = this._spdyState
  83. // Ignore any kind of data after abort
  84. if (state.aborted) {
  85. state.debug('id=%d ignoring frame=%s after abort', this.id, frame.type)
  86. return
  87. }
  88. // Restart the timer on incoming frames
  89. state.timeout.reset()
  90. if (frame.type === 'DATA') {
  91. this._handleData(frame)
  92. } else if (frame.type === 'HEADERS') {
  93. this._handleHeaders(frame)
  94. } else if (frame.type === 'RST') {
  95. this._handleRST(frame)
  96. } else if (frame.type === 'WINDOW_UPDATE') { this._handleWindowUpdate(frame) } else if (frame.type === 'PRIORITY') {
  97. this._handlePriority(frame)
  98. } else if (frame.type === 'PUSH_PROMISE') { this._handlePushPromise(frame) }
  99. if (frame.fin) {
  100. state.debug('id=%d end', this.id)
  101. this.push(null)
  102. }
  103. }
  104. function checkAborted (stream, state, callback) {
  105. if (state.aborted) {
  106. state.debug('id=%d abort write', stream.id)
  107. process.nextTick(function () {
  108. callback(new Error('Stream write aborted'))
  109. })
  110. return true
  111. }
  112. return false
  113. }
  114. function _send (stream, state, data, callback) {
  115. if (checkAborted(stream, state, callback)) {
  116. return
  117. }
  118. state.debug('id=%d presend=%d', stream.id, data.length)
  119. state.timeout.reset()
  120. state.window.send.update(-data.length, function () {
  121. if (checkAborted(stream, state, callback)) {
  122. return
  123. }
  124. state.debug('id=%d send=%d', stream.id, data.length)
  125. state.timeout.reset()
  126. state.framer.dataFrame({
  127. id: stream.id,
  128. priority: state.priority.getPriority(),
  129. fin: false,
  130. data: data
  131. }, function (err) {
  132. state.debug('id=%d postsend=%d', stream.id, data.length)
  133. callback(err)
  134. })
  135. })
  136. }
  137. Stream.prototype._write = function _write (data, enc, callback) {
  138. var state = this._spdyState
  139. // Send the request if it wasn't sent
  140. if (!state.sent) { this.send() }
  141. // Writes should come after pending control frames (response and headers)
  142. if (state.corked !== 0) {
  143. var self = this
  144. state.corkQueue.push(function () {
  145. self._write(data, enc, callback)
  146. })
  147. return
  148. }
  149. // Split DATA in chunks to prevent window from going negative
  150. this._splitStart(data, _send, callback)
  151. }
  152. Stream.prototype._splitStart = function _splitStart (data, onChunk, callback) {
  153. return this._split(data, 0, onChunk, callback)
  154. }
  155. Stream.prototype._split = function _split (data, offset, onChunk, callback) {
  156. if (offset === data.length) {
  157. return process.nextTick(callback)
  158. }
  159. var state = this._spdyState
  160. var local = state.window.send
  161. var session = state.sessionWindow.send
  162. var availSession = Math.max(0, session.getCurrent())
  163. if (availSession === 0) {
  164. availSession = session.getMax()
  165. }
  166. var availLocal = Math.max(0, local.getCurrent())
  167. if (availLocal === 0) {
  168. availLocal = local.getMax()
  169. }
  170. var avail = Math.min(availSession, availLocal)
  171. avail = Math.min(avail, state.maxChunk)
  172. var self = this
  173. if (avail === 0) {
  174. state.window.send.update(0, function () {
  175. self._split(data, offset, onChunk, callback)
  176. })
  177. return
  178. }
  179. // Split data in chunks in a following way:
  180. var limit = avail
  181. var size = Math.min(data.length - offset, limit)
  182. var chunk = data.slice(offset, offset + size)
  183. onChunk(this, state, chunk, function (err) {
  184. if (err) { return callback(err) }
  185. // Get the next chunk
  186. self._split(data, offset + size, onChunk, callback)
  187. })
  188. }
  189. Stream.prototype._read = function _read () {
  190. var state = this._spdyState
  191. if (!state.window.recv.isDraining()) {
  192. return
  193. }
  194. var delta = state.window.recv.getDelta()
  195. state.debug('id=%d window emptying, update by %d', this.id, delta)
  196. state.window.recv.update(delta)
  197. state.framer.windowUpdateFrame({
  198. id: this.id,
  199. delta: delta
  200. })
  201. }
  202. Stream.prototype._handleData = function _handleData (frame) {
  203. var state = this._spdyState
  204. // DATA on ended or not readable stream!
  205. if (!state.readable || this._readableState.ended) {
  206. state.framer.rstFrame({ id: this.id, code: 'STREAM_CLOSED' })
  207. return
  208. }
  209. state.debug('id=%d recv=%d', this.id, frame.data.length)
  210. state.window.recv.update(-frame.data.length)
  211. this.push(frame.data)
  212. }
  213. Stream.prototype._handleRST = function _handleRST (frame) {
  214. if (frame.code !== 'CANCEL') {
  215. this.emit('error', new Error('Got RST: ' + frame.code))
  216. }
  217. this.abort()
  218. }
  219. Stream.prototype._handleWindowUpdate = function _handleWindowUpdate (frame) {
  220. var state = this._spdyState
  221. state.window.send.update(frame.delta)
  222. }
  223. Stream.prototype._onWindowOverflow = function _onWindowOverflow () {
  224. var state = this._spdyState
  225. state.debug('id=%d window overflow', this.id)
  226. state.framer.rstFrame({ id: this.id, code: 'FLOW_CONTROL_ERROR' })
  227. this.aborted = true
  228. this.emit('error', new Error('HTTP2 window overflow'))
  229. }
  230. Stream.prototype._handlePriority = function _handlePriority (frame) {
  231. var state = this._spdyState
  232. state.priority.remove()
  233. state.priority = null
  234. this._initPriority(frame.priority)
  235. // Mostly for testing purposes
  236. this.emit('priority', frame.priority)
  237. }
  238. Stream.prototype._handleHeaders = function _handleHeaders (frame) {
  239. var state = this._spdyState
  240. if (!state.readable || this._readableState.ended) {
  241. state.framer.rstFrame({ id: this.id, code: 'STREAM_CLOSED' })
  242. return
  243. }
  244. if (state.needResponse) {
  245. return this._handleResponse(frame)
  246. }
  247. this.emit('headers', frame.headers)
  248. }
  249. Stream.prototype._handleResponse = function _handleResponse (frame) {
  250. var state = this._spdyState
  251. if (frame.headers[':status'] === undefined) {
  252. state.framer.rstFrame({ id: this.id, code: 'PROTOCOL_ERROR' })
  253. return
  254. }
  255. state.needResponse = false
  256. this.emit('response', frame.headers[':status'] | 0, frame.headers)
  257. }
  258. Stream.prototype._onFinish = function _onFinish () {
  259. var state = this._spdyState
  260. // Send the request if it wasn't sent
  261. if (!state.sent) {
  262. // NOTE: will send HEADERS with FIN flag
  263. this.send()
  264. } else {
  265. // Just an `.end()` without any writes will trigger immediate `finish` event
  266. // without any calls to `_write()`.
  267. if (state.corked !== 0) {
  268. var self = this
  269. state.corkQueue.push(function () {
  270. self._onFinish()
  271. })
  272. return
  273. }
  274. state.framer.dataFrame({
  275. id: this.id,
  276. priority: state.priority.getPriority(),
  277. fin: true,
  278. data: Buffer.alloc(0)
  279. })
  280. }
  281. this._maybeClose()
  282. }
  283. Stream.prototype._onEnd = function _onEnd () {
  284. this._maybeClose()
  285. }
  286. Stream.prototype._checkEnded = function _checkEnded (callback) {
  287. var state = this._spdyState
  288. var ended = false
  289. if (state.aborted) { ended = true }
  290. if (!state.writable || this._writableState.finished) { ended = true }
  291. if (!ended) {
  292. return true
  293. }
  294. if (!callback) {
  295. return false
  296. }
  297. var err = new Error('Ended stream can\'t send frames')
  298. process.nextTick(function () {
  299. callback(err)
  300. })
  301. return false
  302. }
  303. Stream.prototype._maybeClose = function _maybeClose () {
  304. var state = this._spdyState
  305. // .abort() emits `close`
  306. if (state.aborted) {
  307. return
  308. }
  309. if ((!state.readable || this._readableState.ended) &&
  310. this._writableState.finished) {
  311. // Clear timeout
  312. state.timeout.set(0)
  313. this.emit('close')
  314. }
  315. }
  316. Stream.prototype._handlePushPromise = function _handlePushPromise (frame) {
  317. var push = this.connection._createStream({
  318. id: frame.promisedId,
  319. parent: this,
  320. push: true,
  321. request: true,
  322. method: frame.headers[':method'],
  323. path: frame.headers[':path'],
  324. host: frame.headers[':authority'],
  325. priority: frame.priority,
  326. headers: frame.headers,
  327. writable: false
  328. })
  329. // GOAWAY
  330. if (this.connection._isGoaway(push.id)) {
  331. return
  332. }
  333. if (!this.emit('pushPromise', push)) {
  334. push.abort()
  335. }
  336. }
  337. Stream.prototype._hardCork = function _hardCork () {
  338. var state = this._spdyState
  339. this.cork()
  340. state.corked++
  341. }
  342. Stream.prototype._hardUncork = function _hardUncork () {
  343. var state = this._spdyState
  344. this.uncork()
  345. state.corked--
  346. if (state.corked !== 0) {
  347. return
  348. }
  349. // Invoke callbacks
  350. var queue = state.corkQueue
  351. state.corkQueue = []
  352. for (var i = 0; i < queue.length; i++) {
  353. queue[i]()
  354. }
  355. }
  356. Stream.prototype._sendPush = function _sendPush (status, response, callback) {
  357. var self = this
  358. var state = this._spdyState
  359. this._hardCork()
  360. state.framer.pushFrame({
  361. id: this.parent.id,
  362. promisedId: this.id,
  363. priority: state.priority.toJSON(),
  364. path: this.path,
  365. host: this.host,
  366. method: this.method,
  367. status: status,
  368. headers: this.headers,
  369. response: response
  370. }, function (err) {
  371. self._hardUncork()
  372. callback(err)
  373. })
  374. }
  375. Stream.prototype._wasSent = function _wasSent () {
  376. var state = this._spdyState
  377. return state.sent
  378. }
  379. // Public API
  380. Stream.prototype.send = function send (callback) {
  381. var state = this._spdyState
  382. if (state.sent) {
  383. var err = new Error('Stream was already sent')
  384. process.nextTick(function () {
  385. if (callback) {
  386. callback(err)
  387. }
  388. })
  389. return
  390. }
  391. state.sent = true
  392. state.timeout.reset()
  393. // GET requests should always be auto-finished
  394. if (this.method === 'GET') {
  395. this._writableState.ended = true
  396. this._writableState.finished = true
  397. }
  398. // TODO(indunty): ideally it should just take a stream object as an input
  399. var self = this
  400. this._hardCork()
  401. state.framer.requestFrame({
  402. id: this.id,
  403. method: this.method,
  404. path: this.path,
  405. host: this.host,
  406. priority: state.priority.toJSON(),
  407. headers: this.headers,
  408. fin: this._writableState.finished
  409. }, function (err) {
  410. self._hardUncork()
  411. if (!callback) {
  412. return
  413. }
  414. callback(err)
  415. })
  416. }
  417. Stream.prototype.respond = function respond (status, headers, callback) {
  418. var self = this
  419. var state = this._spdyState
  420. assert(!state.request, 'Can\'t respond on request')
  421. state.timeout.reset()
  422. if (!this._checkEnded(callback)) { return }
  423. var frame = {
  424. id: this.id,
  425. status: status,
  426. headers: headers
  427. }
  428. this._hardCork()
  429. state.framer.responseFrame(frame, function (err) {
  430. self._hardUncork()
  431. if (callback) { callback(err) }
  432. })
  433. }
  434. Stream.prototype.setWindow = function setWindow (size) {
  435. var state = this._spdyState
  436. state.timeout.reset()
  437. if (!this._checkEnded()) {
  438. return
  439. }
  440. state.debug('id=%d force window max=%d', this.id, size)
  441. state.window.recv.setMax(size)
  442. var delta = state.window.recv.getDelta()
  443. if (delta === 0) { return }
  444. state.framer.windowUpdateFrame({
  445. id: this.id,
  446. delta: delta
  447. })
  448. state.window.recv.update(delta)
  449. }
  450. Stream.prototype.sendHeaders = function sendHeaders (headers, callback) {
  451. var self = this
  452. var state = this._spdyState
  453. state.timeout.reset()
  454. if (!this._checkEnded(callback)) {
  455. return
  456. }
  457. // Request wasn't yet send, coalesce headers
  458. if (!state.sent) {
  459. this.headers = Object.assign({}, this.headers)
  460. Object.assign(this.headers, headers)
  461. process.nextTick(function () {
  462. if (callback) {
  463. callback(null)
  464. }
  465. })
  466. return
  467. }
  468. this._hardCork()
  469. state.framer.headersFrame({
  470. id: this.id,
  471. headers: headers
  472. }, function (err) {
  473. self._hardUncork()
  474. if (callback) { callback(err) }
  475. })
  476. }
  477. Stream.prototype._destroy = function destroy () {
  478. this.abort()
  479. }
  480. Stream.prototype.abort = function abort (code, callback) {
  481. var state = this._spdyState
  482. // .abort(callback)
  483. if (typeof code === 'function') {
  484. callback = code
  485. code = null
  486. }
  487. if (this._readableState.ended && this._writableState.finished) {
  488. state.debug('id=%d already closed', this.id)
  489. if (callback) {
  490. process.nextTick(callback)
  491. }
  492. return
  493. }
  494. if (state.aborted) {
  495. state.debug('id=%d already aborted', this.id)
  496. if (callback) { process.nextTick(callback) }
  497. return
  498. }
  499. state.aborted = true
  500. state.debug('id=%d abort', this.id)
  501. this.setTimeout(0)
  502. var abortCode = code || 'CANCEL'
  503. state.framer.rstFrame({
  504. id: this.id,
  505. code: abortCode
  506. })
  507. var self = this
  508. process.nextTick(function () {
  509. if (callback) {
  510. callback(null)
  511. }
  512. self.emit('close', new Error('Aborted, code: ' + abortCode))
  513. })
  514. }
  515. Stream.prototype.setPriority = function setPriority (info) {
  516. var state = this._spdyState
  517. state.timeout.reset()
  518. if (!this._checkEnded()) {
  519. return
  520. }
  521. state.debug('id=%d priority change', this.id, info)
  522. var frame = { id: this.id, priority: info }
  523. // Change priority on this side
  524. this._handlePriority(frame)
  525. // And on the other too
  526. state.framer.priorityFrame(frame)
  527. }
  528. Stream.prototype.pushPromise = function pushPromise (uri, callback) {
  529. if (!this._checkEnded(callback)) {
  530. return
  531. }
  532. var self = this
  533. this._hardCork()
  534. var push = this.connection.pushPromise(this, uri, function (err) {
  535. self._hardUncork()
  536. if (!err) {
  537. push._hardUncork()
  538. }
  539. if (callback) {
  540. return callback(err, push)
  541. }
  542. if (err) { push.emit('error', err) }
  543. })
  544. push._hardCork()
  545. return push
  546. }
  547. Stream.prototype.setMaxChunk = function setMaxChunk (size) {
  548. var state = this._spdyState
  549. state.maxChunk = size
  550. }
  551. Stream.prototype.setTimeout = function setTimeout (delay, callback) {
  552. var state = this._spdyState
  553. state.timeout.set(delay, callback)
  554. }