receiver.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589
  1. 'use strict';
  2. const safeBuffer = require('safe-buffer');
  3. const PerMessageDeflate = require('./permessage-deflate');
  4. const bufferUtil = require('./buffer-util');
  5. const validation = require('./validation');
  6. const constants = require('./constants');
  7. const Buffer = safeBuffer.Buffer;
  8. const GET_INFO = 0;
  9. const GET_PAYLOAD_LENGTH_16 = 1;
  10. const GET_PAYLOAD_LENGTH_64 = 2;
  11. const GET_MASK = 3;
  12. const GET_DATA = 4;
  13. const INFLATING = 5;
  14. /**
  15. * HyBi Receiver implementation.
  16. */
  17. class Receiver {
  18. /**
  19. * Creates a Receiver instance.
  20. *
  21. * @param {Object} extensions An object containing the negotiated extensions
  22. * @param {Number} maxPayload The maximum allowed message length
  23. * @param {String} binaryType The type for binary data
  24. */
  25. constructor (extensions, maxPayload, binaryType) {
  26. this._binaryType = binaryType || constants.BINARY_TYPES[0];
  27. this._extensions = extensions || {};
  28. this._maxPayload = maxPayload | 0;
  29. this._bufferedBytes = 0;
  30. this._buffers = [];
  31. this._compressed = false;
  32. this._payloadLength = 0;
  33. this._fragmented = 0;
  34. this._masked = false;
  35. this._fin = false;
  36. this._mask = null;
  37. this._opcode = 0;
  38. this._totalPayloadLength = 0;
  39. this._messageLength = 0;
  40. this._fragments = [];
  41. this._cleanupCallback = null;
  42. this._isCleaningUp = false;
  43. this._hadError = false;
  44. this._loop = false;
  45. this.add = this.add.bind(this);
  46. this.onmessage = null;
  47. this.onclose = null;
  48. this.onerror = null;
  49. this.onping = null;
  50. this.onpong = null;
  51. this._state = GET_INFO;
  52. }
  53. /**
  54. * Consumes `n` bytes from the buffered data, calls `cleanup` if necessary.
  55. *
  56. * @param {Number} n The number of bytes to consume
  57. * @return {(Buffer|null)} The consumed bytes or `null` if `n` bytes are not
  58. * available
  59. * @private
  60. */
  61. consume (n) {
  62. if (this._bufferedBytes < n) {
  63. this._loop = false;
  64. if (this._isCleaningUp) this.cleanup(this._cleanupCallback);
  65. return null;
  66. }
  67. this._bufferedBytes -= n;
  68. if (n === this._buffers[0].length) return this._buffers.shift();
  69. if (n < this._buffers[0].length) {
  70. const buf = this._buffers[0];
  71. this._buffers[0] = buf.slice(n);
  72. return buf.slice(0, n);
  73. }
  74. const dst = Buffer.allocUnsafe(n);
  75. do {
  76. const buf = this._buffers[0];
  77. if (n >= buf.length) {
  78. this._buffers.shift().copy(dst, dst.length - n);
  79. } else {
  80. buf.copy(dst, dst.length - n, 0, n);
  81. this._buffers[0] = buf.slice(n);
  82. }
  83. n -= buf.length;
  84. } while (n > 0);
  85. return dst;
  86. }
  87. /**
  88. * Adds new data to the parser.
  89. *
  90. * @param {Buffer} chunk A chunk of data
  91. * @public
  92. */
  93. add (chunk) {
  94. this._bufferedBytes += chunk.length;
  95. this._buffers.push(chunk);
  96. this.startLoop();
  97. }
  98. /**
  99. * Starts the parsing loop.
  100. *
  101. * @private
  102. */
  103. startLoop () {
  104. this._loop = true;
  105. do {
  106. switch (this._state) {
  107. case GET_INFO:
  108. this.getInfo();
  109. break;
  110. case GET_PAYLOAD_LENGTH_16:
  111. this.getPayloadLength16();
  112. break;
  113. case GET_PAYLOAD_LENGTH_64:
  114. this.getPayloadLength64();
  115. break;
  116. case GET_MASK:
  117. this.getMask();
  118. break;
  119. case GET_DATA:
  120. this.getData();
  121. break;
  122. default: // `INFLATING`
  123. this._loop = false;
  124. }
  125. } while (this._loop);
  126. }
  127. /**
  128. * Reads the first two bytes of a frame.
  129. *
  130. * @private
  131. */
  132. getInfo () {
  133. const buf = this.consume(2);
  134. if (buf === null) return;
  135. if ((buf[0] & 0x30) !== 0x00) {
  136. this.error(
  137. new RangeError('Invalid WebSocket frame: RSV2 and RSV3 must be clear'),
  138. 1002
  139. );
  140. return;
  141. }
  142. const compressed = (buf[0] & 0x40) === 0x40;
  143. if (compressed && !this._extensions[PerMessageDeflate.extensionName]) {
  144. this.error(
  145. new RangeError('Invalid WebSocket frame: RSV1 must be clear'),
  146. 1002
  147. );
  148. return;
  149. }
  150. this._fin = (buf[0] & 0x80) === 0x80;
  151. this._opcode = buf[0] & 0x0f;
  152. this._payloadLength = buf[1] & 0x7f;
  153. if (this._opcode === 0x00) {
  154. if (compressed) {
  155. this.error(
  156. new RangeError('Invalid WebSocket frame: RSV1 must be clear'),
  157. 1002
  158. );
  159. return;
  160. }
  161. if (!this._fragmented) {
  162. this.error(
  163. new RangeError('Invalid WebSocket frame: invalid opcode 0'),
  164. 1002
  165. );
  166. return;
  167. } else {
  168. this._opcode = this._fragmented;
  169. }
  170. } else if (this._opcode === 0x01 || this._opcode === 0x02) {
  171. if (this._fragmented) {
  172. this.error(
  173. new RangeError(
  174. `Invalid WebSocket frame: invalid opcode ${this._opcode}`
  175. ),
  176. 1002
  177. );
  178. return;
  179. }
  180. this._compressed = compressed;
  181. } else if (this._opcode > 0x07 && this._opcode < 0x0b) {
  182. if (!this._fin) {
  183. this.error(
  184. new RangeError('Invalid WebSocket frame: FIN must be set'),
  185. 1002
  186. );
  187. return;
  188. }
  189. if (compressed) {
  190. this.error(
  191. new RangeError('Invalid WebSocket frame: RSV1 must be clear'),
  192. 1002
  193. );
  194. return;
  195. }
  196. if (this._payloadLength > 0x7d) {
  197. this.error(
  198. new RangeError(
  199. `Invalid WebSocket frame: invalid payload length ` +
  200. `${this._payloadLength}`
  201. ),
  202. 1002
  203. );
  204. return;
  205. }
  206. } else {
  207. this.error(
  208. new RangeError(
  209. `Invalid WebSocket frame: invalid opcode ${this._opcode}`
  210. ),
  211. 1002
  212. );
  213. return;
  214. }
  215. if (!this._fin && !this._fragmented) this._fragmented = this._opcode;
  216. this._masked = (buf[1] & 0x80) === 0x80;
  217. if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16;
  218. else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64;
  219. else this.haveLength();
  220. }
  221. /**
  222. * Gets extended payload length (7+16).
  223. *
  224. * @private
  225. */
  226. getPayloadLength16 () {
  227. const buf = this.consume(2);
  228. if (buf === null) return;
  229. this._payloadLength = buf.readUInt16BE(0, true);
  230. this.haveLength();
  231. }
  232. /**
  233. * Gets extended payload length (7+64).
  234. *
  235. * @private
  236. */
  237. getPayloadLength64 () {
  238. const buf = this.consume(8);
  239. if (buf === null) return;
  240. const num = buf.readUInt32BE(0, true);
  241. //
  242. // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned
  243. // if payload length is greater than this number.
  244. //
  245. if (num > Math.pow(2, 53 - 32) - 1) {
  246. this.error(
  247. new RangeError(
  248. 'Unsupported WebSocket frame: payload length > 2^53 - 1'
  249. ),
  250. 1009
  251. );
  252. return;
  253. }
  254. this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4, true);
  255. this.haveLength();
  256. }
  257. /**
  258. * Payload length has been read.
  259. *
  260. * @private
  261. */
  262. haveLength () {
  263. if (this._opcode < 0x08 && this.maxPayloadExceeded(this._payloadLength)) {
  264. return;
  265. }
  266. if (this._masked) this._state = GET_MASK;
  267. else this._state = GET_DATA;
  268. }
  269. /**
  270. * Reads mask bytes.
  271. *
  272. * @private
  273. */
  274. getMask () {
  275. this._mask = this.consume(4);
  276. if (this._mask === null) return;
  277. this._state = GET_DATA;
  278. }
  279. /**
  280. * Reads data bytes.
  281. *
  282. * @private
  283. */
  284. getData () {
  285. var data = constants.EMPTY_BUFFER;
  286. if (this._payloadLength) {
  287. data = this.consume(this._payloadLength);
  288. if (data === null) return;
  289. if (this._masked) bufferUtil.unmask(data, this._mask);
  290. }
  291. if (this._opcode > 0x07) {
  292. this.controlMessage(data);
  293. } else if (this._compressed) {
  294. this._state = INFLATING;
  295. this.decompress(data);
  296. } else if (this.pushFragment(data)) {
  297. this.dataMessage();
  298. }
  299. }
  300. /**
  301. * Decompresses data.
  302. *
  303. * @param {Buffer} data Compressed data
  304. * @private
  305. */
  306. decompress (data) {
  307. const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
  308. perMessageDeflate.decompress(data, this._fin, (err, buf) => {
  309. if (err) {
  310. this.error(err, err.closeCode === 1009 ? 1009 : 1007);
  311. return;
  312. }
  313. if (this.pushFragment(buf)) this.dataMessage();
  314. this.startLoop();
  315. });
  316. }
  317. /**
  318. * Handles a data message.
  319. *
  320. * @private
  321. */
  322. dataMessage () {
  323. if (this._fin) {
  324. const messageLength = this._messageLength;
  325. const fragments = this._fragments;
  326. this._totalPayloadLength = 0;
  327. this._messageLength = 0;
  328. this._fragmented = 0;
  329. this._fragments = [];
  330. if (this._opcode === 2) {
  331. var data;
  332. if (this._binaryType === 'nodebuffer') {
  333. data = toBuffer(fragments, messageLength);
  334. } else if (this._binaryType === 'arraybuffer') {
  335. data = toArrayBuffer(toBuffer(fragments, messageLength));
  336. } else {
  337. data = fragments;
  338. }
  339. this.onmessage(data);
  340. } else {
  341. const buf = toBuffer(fragments, messageLength);
  342. if (!validation.isValidUTF8(buf)) {
  343. this.error(
  344. new Error('Invalid WebSocket frame: invalid UTF-8 sequence'),
  345. 1007
  346. );
  347. return;
  348. }
  349. this.onmessage(buf.toString());
  350. }
  351. }
  352. this._state = GET_INFO;
  353. }
  354. /**
  355. * Handles a control message.
  356. *
  357. * @param {Buffer} data Data to handle
  358. * @private
  359. */
  360. controlMessage (data) {
  361. if (this._opcode === 0x08) {
  362. if (data.length === 0) {
  363. this._loop = false;
  364. this.onclose(1005, '');
  365. this.cleanup(this._cleanupCallback);
  366. } else if (data.length === 1) {
  367. this.error(
  368. new RangeError('Invalid WebSocket frame: invalid payload length 1'),
  369. 1002
  370. );
  371. } else {
  372. const code = data.readUInt16BE(0, true);
  373. if (!validation.isValidStatusCode(code)) {
  374. this.error(
  375. new RangeError(
  376. `Invalid WebSocket frame: invalid status code ${code}`
  377. ),
  378. 1002
  379. );
  380. return;
  381. }
  382. const buf = data.slice(2);
  383. if (!validation.isValidUTF8(buf)) {
  384. this.error(
  385. new Error('Invalid WebSocket frame: invalid UTF-8 sequence'),
  386. 1007
  387. );
  388. return;
  389. }
  390. this._loop = false;
  391. this.onclose(code, buf.toString());
  392. this.cleanup(this._cleanupCallback);
  393. }
  394. return;
  395. }
  396. if (this._opcode === 0x09) this.onping(data);
  397. else this.onpong(data);
  398. this._state = GET_INFO;
  399. }
  400. /**
  401. * Handles an error.
  402. *
  403. * @param {Error} err The error
  404. * @param {Number} code Close code
  405. * @private
  406. */
  407. error (err, code) {
  408. this._hadError = true;
  409. this._loop = false;
  410. this.onerror(err, code);
  411. this.cleanup(this._cleanupCallback);
  412. }
  413. /**
  414. * Checks payload size, disconnects socket when it exceeds `maxPayload`.
  415. *
  416. * @param {Number} length Payload length
  417. * @private
  418. */
  419. maxPayloadExceeded (length) {
  420. if (length === 0 || this._maxPayload < 1) return false;
  421. const fullLength = this._totalPayloadLength + length;
  422. if (fullLength <= this._maxPayload) {
  423. this._totalPayloadLength = fullLength;
  424. return false;
  425. }
  426. this.error(new RangeError('Max payload size exceeded'), 1009);
  427. return true;
  428. }
  429. /**
  430. * Appends a fragment in the fragments array after checking that the sum of
  431. * fragment lengths does not exceed `maxPayload`.
  432. *
  433. * @param {Buffer} fragment The fragment to add
  434. * @return {Boolean} `true` if `maxPayload` is not exceeded, else `false`
  435. * @private
  436. */
  437. pushFragment (fragment) {
  438. if (fragment.length === 0) return true;
  439. const totalLength = this._messageLength + fragment.length;
  440. if (this._maxPayload < 1 || totalLength <= this._maxPayload) {
  441. this._messageLength = totalLength;
  442. this._fragments.push(fragment);
  443. return true;
  444. }
  445. this.error(new RangeError('Max payload size exceeded'), 1009);
  446. return false;
  447. }
  448. /**
  449. * Releases resources used by the receiver.
  450. *
  451. * @param {Function} cb Callback
  452. * @public
  453. */
  454. cleanup (cb) {
  455. if (this._extensions === null) {
  456. if (cb) cb();
  457. return;
  458. }
  459. if (!this._hadError && (this._loop || this._state === INFLATING)) {
  460. this._cleanupCallback = cb;
  461. this._isCleaningUp = true;
  462. return;
  463. }
  464. this._extensions = null;
  465. this._fragments = null;
  466. this._buffers = null;
  467. this._mask = null;
  468. this._cleanupCallback = null;
  469. this.onmessage = null;
  470. this.onclose = null;
  471. this.onerror = null;
  472. this.onping = null;
  473. this.onpong = null;
  474. if (cb) cb();
  475. }
  476. }
  477. module.exports = Receiver;
  478. /**
  479. * Makes a buffer from a list of fragments.
  480. *
  481. * @param {Buffer[]} fragments The list of fragments composing the message
  482. * @param {Number} messageLength The length of the message
  483. * @return {Buffer}
  484. * @private
  485. */
  486. function toBuffer (fragments, messageLength) {
  487. if (fragments.length === 1) return fragments[0];
  488. if (fragments.length > 1) return bufferUtil.concat(fragments, messageLength);
  489. return constants.EMPTY_BUFFER;
  490. }
  491. /**
  492. * Converts a buffer to an `ArrayBuffer`.
  493. *
  494. * @param {Buffer} The buffer to convert
  495. * @return {ArrayBuffer} Converted buffer
  496. */
  497. function toArrayBuffer (buf) {
  498. if (buf.byteOffset === 0 && buf.byteLength === buf.buffer.byteLength) {
  499. return buf.buffer;
  500. }
  501. return buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength);
  502. }