Source: index.js

var _ = require("lodash"),
    ipc = require("node-ipc"),
    os = require("os"),
    lib = require("./lib");

var utils = lib.utils;

var consts = lib.consts;

/**
 *
 * Constructs a new instance of the consistent hash ring class.
 *
 * @method createCHash
 * @memberof Clusterluck
 *
 * @param {Number} rfactor - Replication factor for every node inserted into the ring. Defaults to 3.
 * @param {Number} pfactor - Persistence factor for every node inserted into the ring (used when calling .next on a consistent hash ring). Defaults to 2.
 *
 * @return {Clusterluck.CHash} A consistent hash ring instance.
 *
 * @example
 * let chash = clusterluck.createCHash(3, 2);
 * assert.equal(chash.rfactor(), 3);
 * assert.equal(chash.pfactor(), 2);
 *
 */
function createCHash(rfactor = 3, pfactor = 2) {
  return new lib.chash(rfactor, pfactor);
}

/**
 *
 * Constructs a new instance of the vector clock class.
 *
 * @method createVClock
 * @memberof Clusterluck
 *
 * @param {String} id - Identifier to insert this new vector clock on creation.
 * @param {Number} count - Count to initialize `id` at in this new vector clock.
 *
 * @return {Clusterluck.VectorClock} A vector clock instance.
 *
 * @example
 * let vclock = clusterluck.createVClock();
 * assert.equal(vclock.size(), 0);
 * vclock = clutserluck.createVClock("id", 1);
 * assert.equal(vclock.size(), 1);
 * assert.ok(vclock.has("id"));
 *
 */
function createVClock(id, count) {
  return new lib.vclock(id, count);
}

/**
 *
 * Constructs an instance of a gossip processor against network kernel `kernel`.
 *
 * @method createGossip
 * @memberof Clusterluck
 *
 * @param {Clusterluck.NetKernel} kernel - Network kernel this new gossip processor instance will listen for jobs against.
 * @param {Object} [opts] - Gossip ring options to instantiate with. Affects vector clock trimming options, consistent hash ring instantiation, how often to gossip ring state against the cluster, and when/where to flush state to disk.
 * @param {Number} [opts.rfactor] - Replication factor for every node inserted into the ring. Defaults to 3.
 * @param {Number} [opts.pfactor] - Persistence factor for every node inserted into the ring (used when calling .next on a consistent hash ring). Defaults to 2.
 * @param {Number} [opts.interval] - Interval to select a random node from the cluster and gossip the state of the ring with, with a granularity of milliseconds. Defaults to 1000.
 * @param {Number} [opts.flushInterval] - Interval to flush the state of the ring to disk, with a granularity of milliseconds. Defaults to 1000.
 * @param {String} [opts.flushPath] - Path string to flush the state of the ring to; if set to `null`, the gossip ring will just skip flushing state to disk. Defaults to `null`.
 * @param {Object} [opts.vclockOpts] - Vector clock options for trimming; occurs at the same interval as `interval`. Defaults to `clusterluck.consts.vclockOpts`.
 * @param {Object} [opts.connOpts] - Connection options for when connecting to new nodes.
 *
 * @return {Clusterluck.GossipRing} A new gossip ring instance.
 *
 * @example
 * // initializes gossip ring with defaults found in `clusterluck.consts.gossipOpts`
 * let gossip = clusterluck.createGossip(kernel);
 * assert.equal(gossip.ring().rfactor(), 3);
 * assert.equal(gossip.ring().pfactor(), 2);
 * assert.deepEqual(gossip.kernel(), kernel);
 *
 */
function createGossip(kernel, opts) {
  opts = _.defaultsDeep(utils.isPlainObject(opts) ? _.cloneDeep(opts) : {}, consts.gossipOpts);
  var chash = createCHash(opts.rfactor, opts.pfactor).insert(kernel.self());
  var vclock = createVClock();
  return new lib.gossip(kernel, chash, vclock, opts);
}

/**
 *
 * Constructs an instance of a network kernel with `id`, listening on hostname `host` and port `port`.
 *
 * @method createKernel
 * @memberof Clusterluck
 *
 * @param {String} id - Identifier for the node associated with this network kernel. Needs to be unique across the cluster, since nodes are addressed by id this way.
 * @param {String} host - Hostname for this network kernel to bind to. Can be an IPV4 address, IPV6 address, or a hostname. Hostname resolution isn't done when checking the existence of a node inside a cluster, so this hostname is taken literally for the lifetime of the node (i.e. localhost vs. 127.0.0.1 vs `> hostname`). Defaults to `os.hostname()`.
 * @param {Number} port - Port for this network kernel to listen on. Defaults to 7022.
 * @param {Object} [opts] - Network kernel options to instantiate with. Affects whether the server runs with TLS or just TCP, on what interval to attempt reconnect logic on a closed socket, and how many times to retry.
 * @param {String} [opts.networkHost] - Default network hostname to set on this network kernel. Defaults to `os.hostname()`.
 * @param {Number} [opts.networkPort] - Default network port to listen on for this network kernel. Defaults to 7022.
 * @param {Number} [opts.retry] - Default amount of time to wait before retrying a connection attempt between two nodes. Defaults to 5000.
 * @param {Object} [opts.tls] - TLS options to set on this network kernel. Defaults to `null`.
 * @param {Number} [opts.maxRetries] - Maximum number of attempts to reconnect to a node; currently, Infinity is the most stable option, since the Connection class only listens for the 'connect' and 'disconnect' events on the underlying IPC socket. Defaults to Infinity.
 * @param {Boolean} [opts.silent] - Whether to silence underlying IPC logs emitted by the `node-ipc` module. Defaults to true.
 *
 * @return {Clusterluck.NetKernel} A new network kernel instance.
 *
 * @example
 * let kernel = clusterluck.createKernel("foo", "localhost", 7022);
 * assert.equal(kernel.id(), "foo");
 * assert.equal(kernel.host(), "localhost");
 * assert.equal(kernel.port(), 7022);
 * assert.ok(kernel.self().equals(new Node("foo", "localhost", 7022)));
 *
 */
function createKernel(id, host = os.hostname(), port = 7022, opts = {}) {
  opts = _.defaultsDeep(utils.isPlainObject(opts) ? _.cloneDeep(opts) : {}, consts.kernelOpts);
  var inst = new ipc.IPC();
  inst.config.networkHost = host || opts.networkHost;
  inst.config.networkPort = port || opts.networkPort;
  inst.config.retry = opts.retry;
  inst.config.maxRetries = opts.maxRetries;
  inst.config.tls = opts.tls;
  inst.config.silent = opts.silent;
  return new lib.kernel(inst, id, inst.config.networkHost, inst.config.networkPort);
}

/**
 *
 * Constructs an instance of a command server, which responds to CLI commands.
 *
 * @method createCommServer
 * @memberof Clusterluck
 * 
 * @param {Clusterluck.Gossip} gossip - Gossip processor for this command server to report/manipulate the state of.
 * @param {Clusterluck.Kernel} kernel - Network kernel this command server uses to reply over to a CLI process' IPC socket.
 *
 * @return {Clusterluck.CommandServer} A new command server instance.
 *
 * @example
 * let comms = clusterluck.createCommServer(gossip, kernel);
 * assert.deepEqual(comms.gossip(), gossip);
 * assert.deepEqual(comms.kernel(), kernel);
 *
 */
function createCommServer(gossip, kernel) {
  return new lib.command_server(gossip, kernel);
}

/**
 *
 * Constructs an instance of a cluster node, the preferred and encompassing way to start/stop the underlying IPC node, as well as refer to underlying actors in the cluster (gossip ring, kernel, command server, forthcoming actors, etc.).
 *
 * @method createCluster
 * @memberof Clusterluck
 *
 * @param {String} Identifier for the node associated with this network kernel. Needs to be unique across the cluster, since nodes are addressed by id this way.
 * @param {String} host - Hostname for this network kernel to bind to. Can be an IPV4 address, IPV6 address, or a hostname. Hostname resolution isn't done when checking the existence of a node inside a cluster, so this hostname is taken literally for the lifetime of the node (i.e. localhost vs. 127.0.0.1 vs `> hostname`). Defaults to `os.hostname()`.
 * @param {Number} port - Port for this network kernel to listen on. Defaults to 7022.
 * @param {Object} [opts] - Options object that controls configuration options for the constructed network kernel and gossip ring.
 * @param {Object} [opts.kernelOpts] - Refer to `createKernel` for an explanation of available options.
 * @param {Object} [opts.gossipOpts] - Refer to `createGossip` for an explanation of available options.
 *
 * @return {Clusterluck.ClusterNode} A new cluster node instance.
 *
 * @example
 * let node = clusterluck.createCluster("foo", "localhost", 7022);
 * assert.equal(node.kernel().id(), "foo");
 * assert.equal(node.kernel().host(), "localhost");
 * assert.equal(node.kernel().port(), 7022);
 * assert.ok(node.kernel().self().equals(new Node("foo", "localhost", 7022)));
 * assert.equal(node.gossip().ring().rfactor(), 3);
 * assert.equal(node.gossip().ring().pfactor(), 2);
 *
 */
function createCluster(id, host = os.hostname(), port = 7022, opts = {}) {
  opts = utils.isPlainObject(opts) ? _.cloneDeep(opts) : {};
  var kernel = createKernel(id, host, port, opts.kernelOpts);
  var gossip = createGossip(kernel, opts.gossipOpts);
  var comms = createCommServer(gossip, kernel);
  return new lib.cluster_node(kernel, gossip, comms);
}

/**
 *
 * Constructs a generic server instance. Generic servers listen to the network kernel for events targetted at it's name/ID. For example, the gossip ring is a generic server that listens for events on the ID of the ring it belongs to.
 *
 * @method createGenServer
 * @memberof Clusterluck
 *
 * @param {Clusterluck.ClusterNode} cluster - Cluster for this generic server to bind to.
 * @param {Object} [opts] - Options object for creating generic server.
 * @param {Number} [opts.streamTimeout] - Timeframe a generic server will receive data for a given stream before invalidating it.
 *
 * @return {Clusterluck.GenServer} A new generic server instance.
 *
 * @example
 * let server = clusterluck.createGenServer(cluster);
 * // based on how messages are parsed, will operate on event 'command_name' sent by another actor to this node
 * server.on("command_name", handlerForCommand);
 * // will listen on server.kernel() for messages emitted on event 'foo'.
 * server.start("foo");
 *
 */
function createGenServer(cluster, opts) {
  opts = utils.isPlainObject(opts) ? _.cloneDeep(opts) : {};
  return new lib.gen_server(cluster.kernel(), opts);
}

/**
 *
 * Constructs a dtable instance, an in-memory key/value storage that persists to disk periodically.
 *
 * @method createDTable
 * @memberof Clusterluck
 *
 * @param {Object} opts - Options object for creating dtable.
 * @param {String} opts.path - Directory store table snapshot and log files under.
 * @param {Number} [opts.writeThreshold] - Number of write operations to the log file before triggering a snapshot flush to disk. Defaults to 100 writes.
 * @param {Number} [opts.autoSave] - Number of milliseconds this table will wait in an idle state before triggering a snapshot flush to disk. Defaults to 180000 milliseconds.
 * @param {Number} [opts.fsyncInterval] - Internval in milliseconds to fsync the log file. Defaults to 1000 milliseconds.
 * @param {Boolean} [opts.compress] - Whether to run RDB snapshot streams through a GZIP compression stream. Defaults to `false`.
 * @param {Function} [opts.encodeFn] - Encoding function to use when serializing writes to the AOF file and when saving to the RDB snapshot. Defaults to `DTable.encodeValue`.
 * @param {Function} [opts.decodeFn] - Decoding function to use when loading contents from disk. Defaults to `DTable.decodeValue`.
 * @param {String} [opts.name] - Name to start table with; can be used as a replacement for passing `name` to the start function. Required to be passed if you don't want a race condition between table loads and the idle interval that runs to trigger RDB snapshot logic. Defaults to `undefined`.
 *
 * @return {Clusterluck.DTable} A new dtable instance.
 *
 * @example
 * let table = clusterluck.createDTable({
 *   path: "/path/to/dir",
 *   writeThreshold: 100,
 *   autoSave: 180000,
 *   fsyncInterval: 1000
 * });
 * table.start("foo");
 *
 * @example
 * let table = clusterluck.createDTable({
 *   path: "/path/to/dir",
 *   writeThreshold: 100,
 *   autoSave: 180000,
 *   fsyncInterval: 1000,
 *   name: "TABLE_NAME"
 * });
 * table.load((err) => {
 *   if (err) process.exit(1);
 *   table.start();
 * });
 *
 */
function createDTable(opts) {
  opts = utils.isPlainObject(opts) ? _.cloneDeep(opts) : {};
  return new lib.dtable(opts);
}

/**
 *
 * Constructs a mtable instance, an in-memory key/value storage.
 *
 * @method createMTable
 * @memberof Clusterluck
 *
 * @return {Clusterluck.MTable} A new mtable instance.
 *
 * @example
 * let table = clusterluck.createMTable();
 * table.start("foo");
 *
 */
function createMTable() {
  return new lib.mtable();
}

/**
 *
 * Constructs a DLM server instance. Handles creating read locks, write locks, as well as removing such locks across a cluster of nodes. See documentation of the DLMServer class for how locks are routed and partitioned across the cluster.
 *
 * @method createDLM
 * @memberof Clusterluck
 *
 * @param {Clusterluck.ClusterNode} cluster - Cluster for this generic server to bind to.
 * @param {Object} [opts] - Options object for creating DLM server.
 * @param {Number} [opts.rquorum] - Quorum for read lock requests.
 * @param {Number} [opts.wquorum] - Quorum for write lock requests.
 * @param {Number} [opts.minWaitTimeout] - Minimum amount of time in milliseconds to wait for a retry on a locking request.
 * @param {Number} [opts.maxWaitTimeout] - Maximum amount of time in milliseconds to wait for a retry on a locking request.
 * @param {Boolean} [opts.disk] - Whether to persist lock state to disk. If `true` is passed, the following options will be read.
 * @param {String} [opts.path] - Path for underlying DTable instance to flush state to.
 * @param {Number} [opts.writeThreshold] - Write threshold of underlying DTable instance.
 * @param {Number} [opts.autoSave] - Autosave interval of underlying DTable instance.
 * @param {Number} [opts.fsyncInterval] - Fsync interval of underlying DTable instance.
 * @param {Boolean} [opts.compress] - Whether to run RDB snapshot streams through a GZIP compression stream. Defaults to `false`.
 * @param {String} [opts.name] - Name to start table with; can be used as a replacement for passing `name` to the start function. Required to be passed if you don't want a race condition between table loads and the idle interval that runs to trigger RDB snapshot logic. Defaults to `undefined`.
 *
 * @return {Clusterluck.DLMServer} A new generic server instance.
 *
 * @example
 * let server = clusterluck.createDLM(cluster, {disk: true, path: "/path/to/dir"});
 * server.load((err) => {
 *   server.start("foo");
 * });
 *
 */
function createDLM(cluster, opts) {
  opts = utils.isPlainObject(opts) ? _.cloneDeep(opts) : {};
  return new lib.dlm.DLMServer(cluster.gossip(), cluster.kernel(), opts);
}

/**
 *
 * Constructs a DSM server instance. Handles creating/reading/destroying semaphores, as well as posting and closing semaphores with requesters/actors. See documentation of the DSMServer class for how semaphores are routed and partitioned across the cluster.
 *
 * @method createDSM
 * @memberof Clusterluck
 *
 * @param {Clusterluck.ClusterNode} cluster - Cluster for this generic server to bind to.
 * @param {Object} [opts] - Options object for creating DSM server.
 * @param {Number} [opts.minWaitTimeout] - Minimum amount of time in milliseconds to wait for a retry on a post request.
 * @param {Number} [opts.maxWaitTimeout] - Maximum amount of time in milliseconds to wait for a retry on a post request.
 * @param {Boolean} [opts.disk] - Whether to persist semaphore state to disk. If `true` is passed, the following options will be read.
 * @param {String} [opts.path] - Path for underlying DTable instance to flush state to.
 * @param {Number} [opts.writeThreshold] - Write threshold of underlying DTable instance.
 * @param {Number} [opts.autoSave] - Autosave interval of underlying DTable instance.
 * @param {Number} [opts.fsyncInterval] - Fsync interval of underlying DTable instance.
 * @param {Boolean} [opts.compress] - Whether to run RDB snapshot streams through a GZIP compression stream. Defaults to `false`.
 * @param {String} [opts.name] - Name to start table with; can be used as a replacement for passing `name` to the start function. Required to be passed if you don't want a race condition between table loads and the idle interval that runs to trigger RDB snapshot logic. Defaults to `undefined`.
 *
 * @return {Clusterluck.DSMServer} A new generic server instance.
 *
 * @example
 * let server = clusterluck.createDSM(cluster, {disk: true, path: "/path/to/dir"});
 * server.load((err) => {
 *   server.start("foo");
 * });
 *
 */
function createDSM(cluster, opts) {
  opts = utils.isPlainObject(opts) ? _.cloneDeep(opts) : {};
  return new lib.dsem.DSMServer(cluster.gossip(), cluster.kernel(), opts);
}

module.exports = {
  CHash: lib.chash,
  ClusterNode: lib.cluster_node,
  GenServer: lib.gen_server,
  GossipRing: lib.gossip,
  NetKernel: lib.kernel,
  Node: lib.node,
  VectorClock: lib.vclock,
  DTable: lib.dtable,
  MTable: lib.mtable,
  DLMServer: lib.dlm.DLMServer,
  DSMServer: lib.dsem.DSMServer,
  Lock: lib.dlm.Lock,
  Semaphore: lib.dsem.Semaphore,
  createCHash: createCHash,
  createVClock: createVClock,
  createGossip: createGossip,
  createCluster: createCluster,
  createKernel: createKernel,
  createGenServer: createGenServer,
  createDTable: createDTable,
  createMTable: createMTable,
  createDLM: createDLM,
  createDSM: createDSM,
  consts: consts
};