buffered-sender.js 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. 'use strict';
  2. var inherits = require('inherits')
  3. , EventEmitter = require('events').EventEmitter
  4. ;
  5. var debug = function() {};
  6. if (process.env.NODE_ENV !== 'production') {
  7. debug = require('debug')('sockjs-client:buffered-sender');
  8. }
  9. function BufferedSender(url, sender) {
  10. debug(url);
  11. EventEmitter.call(this);
  12. this.sendBuffer = [];
  13. this.sender = sender;
  14. this.url = url;
  15. }
  16. inherits(BufferedSender, EventEmitter);
  17. BufferedSender.prototype.send = function(message) {
  18. debug('send', message);
  19. this.sendBuffer.push(message);
  20. if (!this.sendStop) {
  21. this.sendSchedule();
  22. }
  23. };
  24. // For polling transports in a situation when in the message callback,
  25. // new message is being send. If the sending connection was started
  26. // before receiving one, it is possible to saturate the network and
  27. // timeout due to the lack of receiving socket. To avoid that we delay
  28. // sending messages by some small time, in order to let receiving
  29. // connection be started beforehand. This is only a halfmeasure and
  30. // does not fix the big problem, but it does make the tests go more
  31. // stable on slow networks.
  32. BufferedSender.prototype.sendScheduleWait = function() {
  33. debug('sendScheduleWait');
  34. var self = this;
  35. var tref;
  36. this.sendStop = function() {
  37. debug('sendStop');
  38. self.sendStop = null;
  39. clearTimeout(tref);
  40. };
  41. tref = setTimeout(function() {
  42. debug('timeout');
  43. self.sendStop = null;
  44. self.sendSchedule();
  45. }, 25);
  46. };
  47. BufferedSender.prototype.sendSchedule = function() {
  48. debug('sendSchedule', this.sendBuffer.length);
  49. var self = this;
  50. if (this.sendBuffer.length > 0) {
  51. var payload = '[' + this.sendBuffer.join(',') + ']';
  52. this.sendStop = this.sender(this.url, payload, function(err) {
  53. self.sendStop = null;
  54. if (err) {
  55. debug('error', err);
  56. self.emit('close', err.code || 1006, 'Sending error: ' + err);
  57. self.close();
  58. } else {
  59. self.sendScheduleWait();
  60. }
  61. });
  62. this.sendBuffer = [];
  63. }
  64. };
  65. BufferedSender.prototype._cleanup = function() {
  66. debug('_cleanup');
  67. this.removeAllListeners();
  68. };
  69. BufferedSender.prototype.close = function() {
  70. debug('close');
  71. this._cleanup();
  72. if (this.sendStop) {
  73. this.sendStop();
  74. this.sendStop = null;
  75. }
  76. };
  77. module.exports = BufferedSender;