permessage-deflate.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521
  1. 'use strict';
  2. const safeBuffer = require('safe-buffer');
  3. const Limiter = require('async-limiter');
  4. const zlib = require('zlib');
  5. const bufferUtil = require('./buffer-util');
  6. const Buffer = safeBuffer.Buffer;
  7. const TRAILER = Buffer.from([0x00, 0x00, 0xff, 0xff]);
  8. const EMPTY_BLOCK = Buffer.from([0x00]);
  9. const kWriteInProgress = Symbol('write-in-progress');
  10. const kPendingClose = Symbol('pending-close');
  11. const kTotalLength = Symbol('total-length');
  12. const kCallback = Symbol('callback');
  13. const kBuffers = Symbol('buffers');
  14. const kError = Symbol('error');
  15. const kOwner = Symbol('owner');
  16. //
  17. // We limit zlib concurrency, which prevents severe memory fragmentation
  18. // as documented in https://github.com/nodejs/node/issues/8871#issuecomment-250915913
  19. // and https://github.com/websockets/ws/issues/1202
  20. //
  21. // Intentionally global; it's the global thread pool that's an issue.
  22. //
  23. let zlibLimiter;
  24. /**
  25. * permessage-deflate implementation.
  26. */
  27. class PerMessageDeflate {
  28. /**
  29. * Creates a PerMessageDeflate instance.
  30. *
  31. * @param {Object} options Configuration options
  32. * @param {Boolean} options.serverNoContextTakeover Request/accept disabling
  33. * of server context takeover
  34. * @param {Boolean} options.clientNoContextTakeover Advertise/acknowledge
  35. * disabling of client context takeover
  36. * @param {(Boolean|Number)} options.serverMaxWindowBits Request/confirm the
  37. * use of a custom server window size
  38. * @param {(Boolean|Number)} options.clientMaxWindowBits Advertise support
  39. * for, or request, a custom client window size
  40. * @param {Object} options.zlibDeflateOptions Options to pass to zlib on deflate
  41. * @param {Object} options.zlibInflateOptions Options to pass to zlib on inflate
  42. * @param {Number} options.threshold Size (in bytes) below which messages
  43. * should not be compressed
  44. * @param {Number} options.concurrencyLimit The number of concurrent calls to
  45. * zlib
  46. * @param {Boolean} isServer Create the instance in either server or client
  47. * mode
  48. * @param {Number} maxPayload The maximum allowed message length
  49. */
  50. constructor (options, isServer, maxPayload) {
  51. this._maxPayload = maxPayload | 0;
  52. this._options = options || {};
  53. this._threshold = this._options.threshold !== undefined
  54. ? this._options.threshold
  55. : 1024;
  56. this._isServer = !!isServer;
  57. this._deflate = null;
  58. this._inflate = null;
  59. this.params = null;
  60. if (!zlibLimiter) {
  61. const concurrency = this._options.concurrencyLimit !== undefined
  62. ? this._options.concurrencyLimit
  63. : 10;
  64. zlibLimiter = new Limiter({ concurrency });
  65. }
  66. }
  67. /**
  68. * @type {String}
  69. */
  70. static get extensionName () {
  71. return 'permessage-deflate';
  72. }
  73. /**
  74. * Create an extension negotiation offer.
  75. *
  76. * @return {Object} Extension parameters
  77. * @public
  78. */
  79. offer () {
  80. const params = {};
  81. if (this._options.serverNoContextTakeover) {
  82. params.server_no_context_takeover = true;
  83. }
  84. if (this._options.clientNoContextTakeover) {
  85. params.client_no_context_takeover = true;
  86. }
  87. if (this._options.serverMaxWindowBits) {
  88. params.server_max_window_bits = this._options.serverMaxWindowBits;
  89. }
  90. if (this._options.clientMaxWindowBits) {
  91. params.client_max_window_bits = this._options.clientMaxWindowBits;
  92. } else if (this._options.clientMaxWindowBits == null) {
  93. params.client_max_window_bits = true;
  94. }
  95. return params;
  96. }
  97. /**
  98. * Accept an extension negotiation offer/response.
  99. *
  100. * @param {Array} configurations The extension negotiation offers/reponse
  101. * @return {Object} Accepted configuration
  102. * @public
  103. */
  104. accept (configurations) {
  105. configurations = this.normalizeParams(configurations);
  106. this.params = this._isServer
  107. ? this.acceptAsServer(configurations)
  108. : this.acceptAsClient(configurations);
  109. return this.params;
  110. }
  111. /**
  112. * Releases all resources used by the extension.
  113. *
  114. * @public
  115. */
  116. cleanup () {
  117. if (this._inflate) {
  118. if (this._inflate[kWriteInProgress]) {
  119. this._inflate[kPendingClose] = true;
  120. } else {
  121. this._inflate.close();
  122. this._inflate = null;
  123. }
  124. }
  125. if (this._deflate) {
  126. if (this._deflate[kWriteInProgress]) {
  127. this._deflate[kPendingClose] = true;
  128. } else {
  129. this._deflate.close();
  130. this._deflate = null;
  131. }
  132. }
  133. }
  134. /**
  135. * Accept an extension negotiation offer.
  136. *
  137. * @param {Array} offers The extension negotiation offers
  138. * @return {Object} Accepted configuration
  139. * @private
  140. */
  141. acceptAsServer (offers) {
  142. const opts = this._options;
  143. const accepted = offers.find((params) => {
  144. if (
  145. (opts.serverNoContextTakeover === false &&
  146. params.server_no_context_takeover) ||
  147. (params.server_max_window_bits &&
  148. (opts.serverMaxWindowBits === false ||
  149. (typeof opts.serverMaxWindowBits === 'number' &&
  150. opts.serverMaxWindowBits > params.server_max_window_bits))) ||
  151. (typeof opts.clientMaxWindowBits === 'number' &&
  152. !params.client_max_window_bits)
  153. ) {
  154. return false;
  155. }
  156. return true;
  157. });
  158. if (!accepted) {
  159. throw new Error('None of the extension offers can be accepted');
  160. }
  161. if (opts.serverNoContextTakeover) {
  162. accepted.server_no_context_takeover = true;
  163. }
  164. if (opts.clientNoContextTakeover) {
  165. accepted.client_no_context_takeover = true;
  166. }
  167. if (typeof opts.serverMaxWindowBits === 'number') {
  168. accepted.server_max_window_bits = opts.serverMaxWindowBits;
  169. }
  170. if (typeof opts.clientMaxWindowBits === 'number') {
  171. accepted.client_max_window_bits = opts.clientMaxWindowBits;
  172. } else if (
  173. accepted.client_max_window_bits === true ||
  174. opts.clientMaxWindowBits === false
  175. ) {
  176. delete accepted.client_max_window_bits;
  177. }
  178. return accepted;
  179. }
  180. /**
  181. * Accept the extension negotiation response.
  182. *
  183. * @param {Array} response The extension negotiation response
  184. * @return {Object} Accepted configuration
  185. * @private
  186. */
  187. acceptAsClient (response) {
  188. const params = response[0];
  189. if (
  190. this._options.clientNoContextTakeover === false &&
  191. params.client_no_context_takeover
  192. ) {
  193. throw new Error('Unexpected parameter "client_no_context_takeover"');
  194. }
  195. if (!params.client_max_window_bits) {
  196. if (typeof this._options.clientMaxWindowBits === 'number') {
  197. params.client_max_window_bits = this._options.clientMaxWindowBits;
  198. }
  199. } else if (
  200. this._options.clientMaxWindowBits === false ||
  201. (typeof this._options.clientMaxWindowBits === 'number' &&
  202. params.client_max_window_bits > this._options.clientMaxWindowBits)
  203. ) {
  204. throw new Error(
  205. 'Unexpected or invalid parameter "client_max_window_bits"'
  206. );
  207. }
  208. return params;
  209. }
  210. /**
  211. * Normalize parameters.
  212. *
  213. * @param {Array} configurations The extension negotiation offers/reponse
  214. * @return {Array} The offers/response with normalized parameters
  215. * @private
  216. */
  217. normalizeParams (configurations) {
  218. configurations.forEach((params) => {
  219. Object.keys(params).forEach((key) => {
  220. var value = params[key];
  221. if (value.length > 1) {
  222. throw new Error(`Parameter "${key}" must have only a single value`);
  223. }
  224. value = value[0];
  225. if (key === 'client_max_window_bits') {
  226. if (value !== true) {
  227. const num = +value;
  228. if (!Number.isInteger(num) || num < 8 || num > 15) {
  229. throw new TypeError(
  230. `Invalid value for parameter "${key}": ${value}`
  231. );
  232. }
  233. value = num;
  234. } else if (!this._isServer) {
  235. throw new TypeError(
  236. `Invalid value for parameter "${key}": ${value}`
  237. );
  238. }
  239. } else if (key === 'server_max_window_bits') {
  240. const num = +value;
  241. if (!Number.isInteger(num) || num < 8 || num > 15) {
  242. throw new TypeError(
  243. `Invalid value for parameter "${key}": ${value}`
  244. );
  245. }
  246. value = num;
  247. } else if (
  248. key === 'client_no_context_takeover' ||
  249. key === 'server_no_context_takeover'
  250. ) {
  251. if (value !== true) {
  252. throw new TypeError(
  253. `Invalid value for parameter "${key}": ${value}`
  254. );
  255. }
  256. } else {
  257. throw new Error(`Unknown parameter "${key}"`);
  258. }
  259. params[key] = value;
  260. });
  261. });
  262. return configurations;
  263. }
  264. /**
  265. * Decompress data. Concurrency limited by async-limiter.
  266. *
  267. * @param {Buffer} data Compressed data
  268. * @param {Boolean} fin Specifies whether or not this is the last fragment
  269. * @param {Function} callback Callback
  270. * @public
  271. */
  272. decompress (data, fin, callback) {
  273. zlibLimiter.push((done) => {
  274. this._decompress(data, fin, (err, result) => {
  275. done();
  276. callback(err, result);
  277. });
  278. });
  279. }
  280. /**
  281. * Compress data. Concurrency limited by async-limiter.
  282. *
  283. * @param {Buffer} data Data to compress
  284. * @param {Boolean} fin Specifies whether or not this is the last fragment
  285. * @param {Function} callback Callback
  286. * @public
  287. */
  288. compress (data, fin, callback) {
  289. zlibLimiter.push((done) => {
  290. this._compress(data, fin, (err, result) => {
  291. done();
  292. callback(err, result);
  293. });
  294. });
  295. }
  296. /**
  297. * Decompress data.
  298. *
  299. * @param {Buffer} data Compressed data
  300. * @param {Boolean} fin Specifies whether or not this is the last fragment
  301. * @param {Function} callback Callback
  302. * @private
  303. */
  304. _decompress (data, fin, callback) {
  305. const endpoint = this._isServer ? 'client' : 'server';
  306. if (!this._inflate) {
  307. const key = `${endpoint}_max_window_bits`;
  308. const windowBits = typeof this.params[key] !== 'number'
  309. ? zlib.Z_DEFAULT_WINDOWBITS
  310. : this.params[key];
  311. this._inflate = zlib.createInflateRaw(
  312. Object.assign(
  313. {},
  314. this._options.zlibInflateOptions,
  315. { windowBits }
  316. )
  317. );
  318. this._inflate[kTotalLength] = 0;
  319. this._inflate[kBuffers] = [];
  320. this._inflate[kOwner] = this;
  321. this._inflate.on('error', inflateOnError);
  322. this._inflate.on('data', inflateOnData);
  323. }
  324. this._inflate[kCallback] = callback;
  325. this._inflate[kWriteInProgress] = true;
  326. this._inflate.write(data);
  327. if (fin) this._inflate.write(TRAILER);
  328. this._inflate.flush(() => {
  329. const err = this._inflate[kError];
  330. if (err) {
  331. this._inflate.close();
  332. this._inflate = null;
  333. callback(err);
  334. return;
  335. }
  336. const data = bufferUtil.concat(
  337. this._inflate[kBuffers],
  338. this._inflate[kTotalLength]
  339. );
  340. if (
  341. (fin && this.params[`${endpoint}_no_context_takeover`]) ||
  342. this._inflate[kPendingClose]
  343. ) {
  344. this._inflate.close();
  345. this._inflate = null;
  346. } else {
  347. this._inflate[kWriteInProgress] = false;
  348. this._inflate[kTotalLength] = 0;
  349. this._inflate[kBuffers] = [];
  350. }
  351. callback(null, data);
  352. });
  353. }
  354. /**
  355. * Compress data.
  356. *
  357. * @param {Buffer} data Data to compress
  358. * @param {Boolean} fin Specifies whether or not this is the last fragment
  359. * @param {Function} callback Callback
  360. * @private
  361. */
  362. _compress (data, fin, callback) {
  363. if (!data || data.length === 0) {
  364. process.nextTick(callback, null, EMPTY_BLOCK);
  365. return;
  366. }
  367. const endpoint = this._isServer ? 'server' : 'client';
  368. if (!this._deflate) {
  369. const key = `${endpoint}_max_window_bits`;
  370. const windowBits = typeof this.params[key] !== 'number'
  371. ? zlib.Z_DEFAULT_WINDOWBITS
  372. : this.params[key];
  373. this._deflate = zlib.createDeflateRaw(
  374. Object.assign(
  375. // TODO deprecate memLevel/level and recommend zlibDeflateOptions instead
  376. {
  377. memLevel: this._options.memLevel,
  378. level: this._options.level
  379. },
  380. this._options.zlibDeflateOptions,
  381. { windowBits }
  382. )
  383. );
  384. this._deflate[kTotalLength] = 0;
  385. this._deflate[kBuffers] = [];
  386. //
  387. // `zlib.DeflateRaw` emits an `'error'` event only when an attempt to use
  388. // it is made after it has already been closed. This cannot happen here,
  389. // so we only add a listener for the `'data'` event.
  390. //
  391. this._deflate.on('data', deflateOnData);
  392. }
  393. this._deflate[kWriteInProgress] = true;
  394. this._deflate.write(data);
  395. this._deflate.flush(zlib.Z_SYNC_FLUSH, () => {
  396. var data = bufferUtil.concat(
  397. this._deflate[kBuffers],
  398. this._deflate[kTotalLength]
  399. );
  400. if (fin) data = data.slice(0, data.length - 4);
  401. if (
  402. (fin && this.params[`${endpoint}_no_context_takeover`]) ||
  403. this._deflate[kPendingClose]
  404. ) {
  405. this._deflate.close();
  406. this._deflate = null;
  407. } else {
  408. this._deflate[kWriteInProgress] = false;
  409. this._deflate[kTotalLength] = 0;
  410. this._deflate[kBuffers] = [];
  411. }
  412. callback(null, data);
  413. });
  414. }
  415. }
  416. module.exports = PerMessageDeflate;
  417. /**
  418. * The listener of the `zlib.DeflateRaw` stream `'data'` event.
  419. *
  420. * @param {Buffer} chunk A chunk of data
  421. * @private
  422. */
  423. function deflateOnData (chunk) {
  424. this[kBuffers].push(chunk);
  425. this[kTotalLength] += chunk.length;
  426. }
  427. /**
  428. * The listener of the `zlib.InflateRaw` stream `'data'` event.
  429. *
  430. * @param {Buffer} chunk A chunk of data
  431. * @private
  432. */
  433. function inflateOnData (chunk) {
  434. this[kTotalLength] += chunk.length;
  435. if (
  436. this[kOwner]._maxPayload < 1 ||
  437. this[kTotalLength] <= this[kOwner]._maxPayload
  438. ) {
  439. this[kBuffers].push(chunk);
  440. return;
  441. }
  442. this[kError] = new RangeError('Max payload size exceeded');
  443. this[kError].closeCode = 1009;
  444. this.removeListener('data', inflateOnData);
  445. this.reset();
  446. }
  447. /**
  448. * The listener of the `zlib.InflateRaw` stream `'error'` event.
  449. *
  450. * @param {Error} err The emitted error
  451. * @private
  452. */
  453. function inflateOnError (err) {
  454. //
  455. // There is no need to call `Zlib#close()` as the handle is automatically
  456. // closed when an error is emitted.
  457. //
  458. this[kOwner]._inflate = null;
  459. this[kCallback](err);
  460. }