Source: lib/conn.js

  1. const _ = require("lodash"),
  2. consts = require("./consts"),
  3. debug = require("debug")("clusterluck:lib:conn"),
  4. EventEmitter = require("events").EventEmitter,
  5. Queue = require("./queue");
  6. const connDefaults = consts.connDefaults;
  7. class Connection extends EventEmitter {
  8. /**
  9. *
  10. * Connection abstraction class. Handles reconnection logic when the client IPC socket disconnects, internal message buffering during reconnection, and state management for safe connection closure.
  11. *
  12. * @class Connection
  13. * @memberof Clusterluck
  14. *
  15. * @param {IPC} ipc - IPC module to create connection over.
  16. * @param {Clusterluck.Node} node - Node this connection communicates with.
  17. * @param {Object} [opts] - Options object for connection.
  18. * @param {Number} [opts.maxLen] - Maximum length of messages that can buffered while IPC socket is down. Defaults to 1024. Once breached, the oldest messages will be dropped until the queue is of this size. For unbounded buffering, set this to `Infinity`.
  19. *
  20. */
  21. constructor(ipc, node, opts=connDefaults) {
  22. super();
  23. opts = _.defaults(opts, connDefaults);
  24. this._ipc = ipc;
  25. this._node = node;
  26. this._queue = new Queue();
  27. this._connecting = false;
  28. this._active = false;
  29. this._streams = new Map();
  30. this._maxLen = opts.maxLen;
  31. }
  32. /**
  33. *
  34. * Initializes IPC client socket to `node`, along with listeners for socket disconnects.
  35. *
  36. * @method start
  37. * @memberof Clusterluck.Connection
  38. * @instance
  39. *
  40. */
  41. start() {
  42. // maybe add routine for removing old messages still in queue to avoid backup
  43. // on catastrophic neighbor failures
  44. var node = this._node;
  45. this._active = true;
  46. this._connecting = true;
  47. this._ipc.connectToNet(node.id(), node.host(), node.port());
  48. this._ipc.of[node.id()].on("connect", this._handleConnect.bind(this));
  49. this._ipc.of[node.id()].on("disconnect", this._handleDisconnect.bind(this));
  50. }
  51. /**
  52. *
  53. * Closes IPC client socket to `node`. Can be done synchronously using the force option, or asynchronously by waiting for an idle/connected state to occur.
  54. *
  55. * @method stop
  56. * @memberof Clusterluck.Connection
  57. * @instance
  58. *
  59. * @param {Boolean} [force] - Whether to forcibly close this connection or not. If true, will bypass waiting for an 'idle' state, immediately flushing the internal message buffer and clobeering state about which streams are still active over this connection. Otherwise, this will asynchronously close, waiting for all messages and streams to finish first.
  60. *
  61. * @return {Clusterluck.Connection} This instance.
  62. *
  63. */
  64. stop(force = false) {
  65. debug("Stopping connection to node " + this._node.id() + (force ? " forcefully" : " gracefully"));
  66. if (!this.idle() && force !== true) {
  67. this.once("idle", this.stop.bind(this));
  68. return this;
  69. }
  70. if (this._connecting === true && force !== true) {
  71. this.once("connect", this.stop.bind(this));
  72. return this;
  73. }
  74. this._connecting = false;
  75. this._active = false;
  76. this._queue.flush();
  77. this._streams = new Map();
  78. this._ipc.disconnect(this._node.id());
  79. return this;
  80. }
  81. /**
  82. *
  83. * Acts as a getter for the node this connection communicates with.
  84. *
  85. * @method node
  86. * @memberof Clusterluck.Connection
  87. * @instance
  88. *
  89. * @return {Clusterluck.Node} Node this instance communicates with.
  90. *
  91. */
  92. node() {
  93. return this._node;
  94. }
  95. /**
  96. *
  97. * Acts as a getter for the internal message buffer.
  98. *
  99. * @method queue
  100. * @memberof Clusterluck.Connection
  101. * @instance
  102. *
  103. * @return {Queue} Internal message buffer of this instance.
  104. *
  105. */
  106. queue() {
  107. return this._queue;
  108. }
  109. /**
  110. *
  111. * Returns whether this connection has been stopped or not.
  112. *
  113. * @method active
  114. * @memberof Clusterluck.Connection
  115. * @instance
  116. *
  117. * @return {Boolean} Whether this connection is active or not.
  118. *
  119. */
  120. active() {
  121. return this._active;
  122. }
  123. /**
  124. *
  125. * Returns whether this connection is in a reconnection state or not.
  126. *
  127. * @method connecting
  128. * @memberof Clusterluck.Connection
  129. * @instance
  130. *
  131. * @return {Boolean} Whether this connection is in the middle of reconnection logic.
  132. *
  133. */
  134. connecting() {
  135. return this._connecting;
  136. }
  137. /**
  138. *
  139. * Returns whether this connection is in an idle state.
  140. *
  141. * @method idle
  142. * @memberof Clusterluck.Connection
  143. * @instance
  144. *
  145. * @return {Boolean} Whether this connection is currently idle.
  146. *
  147. */
  148. idle() {
  149. return this._streams.size === 0 && this._queue.size() === 0;
  150. }
  151. /**
  152. *
  153. * Acts as a getter/setter for the max length of the internal message queue
  154. * for this IPC socket connection.
  155. *
  156. * @method maxLen
  157. * @memberof Clusterluck.Connection
  158. * @instance
  159. *
  160. * @param {Number} [len] - Number to set maximum message queue length to.
  161. *
  162. * @return {Number} The maximum message queue length of this IPC socket.
  163. *
  164. */
  165. maxLen(len) {
  166. if (typeof len === "number" && len >= 0) {
  167. this._maxLen = len;
  168. while (this._queue.size() > this._maxLen) {
  169. this._queue.dequeue();
  170. }
  171. }
  172. return this._maxLen;
  173. }
  174. /**
  175. *
  176. * Sends message `data` under event `event` through this IPC socket.
  177. *
  178. * @method send
  179. * @memberof Clusterluck.Connection
  180. * @instance
  181. *
  182. * @param {String} event - Event to identify IPC message with.
  183. * @param {Object} data - Data to send with this IPC message.
  184. *
  185. * @return {Clusterluck.Connection} This instance.
  186. *
  187. */
  188. send(event, data) {
  189. if (this._active === false) {
  190. return new Error("Cannot write to inactive connection.");
  191. }
  192. if (this._connecting === true) {
  193. if (this._queue.size() >= this._maxLen) {
  194. this._queue.dequeue();
  195. }
  196. this._queue.enqueue({
  197. event: event,
  198. data: data
  199. });
  200. return this;
  201. }
  202. this._ipc.of[this._node.id()].emit(event, data);
  203. this.emit("send", event, data);
  204. this._updateStream(data.stream);
  205. return this;
  206. }
  207. /**
  208. *
  209. * Marks message stream `stream` in order to indicate to this connection beforehand that it is not
  210. * in an idle state.
  211. *
  212. * @method initiateStream
  213. * @memberof Clusterluck.Connection
  214. * @instance
  215. *
  216. * @param {Object} stream - Message stream to mark.
  217. * @param {Object} stream.stream - Unique ID of mesage stream.
  218. *
  219. * @return {Clusterluck.Connection} This instance.
  220. *
  221. */
  222. initiateStream(stream) {
  223. this._streams.set(stream.stream, true);
  224. return this;
  225. }
  226. /**
  227. *
  228. * Handler for when this connection has finished reconnection logic.
  229. *
  230. * @method _handleConnect
  231. * @memberof Clusterluck.Connection
  232. * @private
  233. * @instance
  234. *
  235. */
  236. _handleConnect() {
  237. debug("Connected to TCP connection to node " + this._node.id());
  238. this._connecting = false;
  239. this.emit("connect");
  240. // flush queue after emitting "connect"
  241. var out = this._queue.flush();
  242. out.forEach((msg) => {
  243. this.send(msg.event, msg.data);
  244. });
  245. }
  246. /**
  247. *
  248. * Handler for when this connection has entered reconnection logic.
  249. *
  250. * @method _handleDisconnect
  251. * @memberof Clusterluck.Connection
  252. * @private
  253. * @instance
  254. *
  255. */
  256. _handleDisconnect() {
  257. debug("Disconnected from TCP connection to node " + this._node.id());
  258. if (this._active) {
  259. this._connecting = true;
  260. } else {
  261. this._connecting = false;
  262. }
  263. this.emit("disconnect");
  264. }
  265. /**
  266. *
  267. * Updates the stream state of this instance. If the stream is finished, removes the stream ID. If no stream IDs are left, then an idle event is emitted.
  268. *
  269. * @method _updateStream
  270. * @memberof Clusterluck.Connection
  271. * @private
  272. * @instance
  273. *
  274. * @param {Object} stream - Stream to update internal state about.
  275. *
  276. */
  277. _updateStream(stream) {
  278. if (stream.done && stream.stream) {
  279. this._streams.delete(stream.stream);
  280. if (this._streams.size === 0) {
  281. this.emit("idle");
  282. }
  283. } else if (stream.stream) {
  284. this._streams.set(stream.stream, true);
  285. }
  286. }
  287. }
  288. module.exports = Connection;