Constructor
new NetKernel(ipc, id, host, port)
Networking kernel implementation, responsible for routing messages to/from internal/external nodes, which processors can then read from. Messages can be either synchronous or asynchronous, and can be sent to single nodes or to a list of nodes simulataneously. On start, listens on hostname host
and port port
for IPC messages. Maintains two maps: the sink
map contains information about which nodes this kernel has IPC connections to, while the source
map contains the sockets of nodes this kernel has connections from. To ensure messages are only from members of a cluster, the kernel has a secret cookie responsible for calculating hmac signatures of incoming and outgoing messages. Any messages that don't match the hmac signature calculated with this cookie are ignored.
Parameters:
Name | Type | Description |
---|---|---|
ipc |
IPC | IPC instance to route node communication through. |
id |
String | Unique identifier of this node. |
host |
String | The hostname this node will listen on. Can be either an IP address (longname) or a hostname (shortname). |
port |
Number | The port this node will listen on. |
- Source:
Methods
abcast(nodes, event, data) → {Clusterluck.NetKernel}
Makes an asynchronous call to an array of external nodes nodes
, streaming data
over. Ignores any error handling or if no connection exists to any node in nodes
.
Parameters:
Name | Type | Description |
---|---|---|
nodes |
Array | Nodes to send |
event |
String | Event this message is sent under. |
data |
Stream | Buffer | String | Number | Boolean | Object | Array | Data to send with this message. |
- Source:
Returns:
This instance.
call(node, event, data, cbopt) → {Stream}
Makes a synchronous call to external node node
, streaming data
over and then waits for a complete response. To accomplish this, a tag is passed that uniquely identifies the returnee. This netkernel then listens for messages with the event ID'd as this tag and passes data into the return stream, closing the stream when a done
message is passed for this tag. If cb
is passed, the return stream is collected into a single Buffer and then returned. Otherwise, the return stream is given to the caller to do manual data collection and error handling.
Parameters:
Name | Type | Attributes | Description |
---|---|---|---|
node |
Clusterluck.Node | Node to send |
|
event |
String | Event this message is sent under. |
|
data |
Stream | Buffer | String | Number | Boolean | Object | Array | Data to send with this message. |
|
cb |
function |
<optional> |
Optional callback to collect stream data and handle error reporting. Useful for smaller payloads with minimal memory footprints. Has two parameters: the first is error for when an error occurs at any point in the request, and the second is a returned Buffer on successful completion. |
- Source:
Returns:
If cb
is not passed, the return stream.
- Type
- Stream
Examples
// with callback
kernel.call(node, "job", "hello", (err, data) => {
// data is a Buffer on success
// ...
});
// w/o callback
var rstream = kernel.call(node, "job", "hello");
rstream.on("data", (data) => {
// data handler ...
}).on("error", (error) => {
// error handler ...
}).on("end", () => {
// end handler ...
});
callSingleton(node, event, data, cb)
Makes a synchronous call to external node node
, sending data
over and then waits for a complete response. To accomplish this, a tag is passed that uniquely identifies the returnee. This netkernel then listens for a single message with the event ID'd as this tag and passes data into the return callback.
Parameters:
Name | Type | Description |
---|---|---|
node |
Clusterluck.Node | Node to send |
event |
String | Event this message is sent under. |
data |
Buffer | Stream | String | Number | Object | Array | Boolean | Data to send with this message. |
cb |
function | Callback to receive the response from |
- Source:
Example
// with callback
kernel.callSingleton(node, "job", "hello", (err, data) => {
// data can be any JSON or Buffer
// ...
}, 5000);
cast(node, event, data) → {Clusterluck.NetKernel}
Makes an asynchronous call to external node node
, streaming data
over. Ignores any error handling or if no connection exists to node
.
Parameters:
Name | Type | Description |
---|---|---|
node |
Clusterluck.Node | Node to send |
event |
String | Event this message is sent under. |
data |
Stream | Buffer | String | Number | Boolean | Object | Array | Data to send with this message. |
- Source:
Returns:
This instance.
connect(node, cbopt)
Creates a socket connection to external node node
, which this netkernel can use to send messages to node
. If node
is identical to this.self()
or a connection already exists, this will immediately return. Otherwise, creates the connection and inserts node
into this netkernel's sink map.
Parameters:
Name | Type | Attributes | Description |
---|---|---|---|
node |
Clusterluck.Node | Node to create socket connection with. |
|
cb |
function |
<optional> |
Callback to call once this process has finished. |
- Source:
connection(node) → {Clusterluck.Connection}
Grabs the connection object between this netkernel and node
.
Parameters:
Name | Type | Description |
---|---|---|
node |
Clusterluck.Node | Node to grab connection of. |
- Source:
Returns:
Connection of node
, or null if it doesn't exist.
cookie(cookieopt) → {String}
Acts as a getter/setter for the cookie this netkernel uses for cluster member verification.
Parameters:
Name | Type | Attributes | Description |
---|---|---|---|
cookie |
String |
<optional> |
Cookie to set on this netkernel. |
- Source:
Returns:
Cookie of this instance.
- Type
- String
disconnect(node) → {Clusterluck.NetKernel}
Disconnects socket connection to node
. If node
is identical to this.self()
or a connection doesn't exist, this will immediately return. Otherwise, terminates the connection and removes node
from this netkernel's sink and source map.
Parameters:
Name | Type | Description |
---|---|---|
node |
Clusterluck.Node |
- Source:
Returns:
This instance.
host(hostopt) → {String}
Acts as a getter/setter for the hostname of this netkernel, modifying the host value of the self
node if used as a setter. Can be either a shortname (host) or a longname (ip).
Parameters:
Name | Type | Attributes | Description |
---|---|---|---|
host |
String |
<optional> |
Host to set on this netkernel. |
- Source:
Returns:
Host of this instance.
- Type
- String
id(idopt) → {String}
Acts as a getter/setter for the unique identifier of this netkernel, modifying the ID value of the self
node if used as a setter.
Parameters:
Name | Type | Attributes | Description |
---|---|---|---|
id |
String |
<optional> |
ID to set on this netkernel. |
- Source:
Returns:
ID of this instance.
- Type
- String
ipc(ipcopt) → {IPC}
Acts as a getter/setter for the IPC instance of this netkernel. Only safe to do when this instance is in an unstarted state.
Parameters:
Name | Type | Attributes | Description |
---|---|---|---|
ipc |
IPC |
<optional> |
IPC instance to set on this netkernel. |
- Source:
Returns:
IPC instance of this kernel.
- Type
- IPC
isConnected(node) → {Boolean}
Checks if a connections exists with node
.
Parameters:
Name | Type | Description |
---|---|---|
node |
Clusterluck.Node | Node to check existing connection to. |
- Source:
Returns:
Whether an existing connection exists.
- Type
- Boolean
multicall(nodes, event, data, cbopt) → {Array}
Makes a synchronous call to a list of external nodes nodes
, streaming data
over and then waits for a complete response from all nodes (using this.call(...)
on each node in nodes
). If cb
is passed, each return stream is collected into a single Buffer and then returned. Otherwise, the list of return streams is given to the caller to do manual data collection and error handling. If cb
is passed and any node in the list incurs an error at any point, it's called with this error and any further processing on the other nodes is ignored.
Parameters:
Name | Type | Attributes | Description |
---|---|---|---|
nodes |
Array | Nodes to send |
|
event |
String | Event this message is sent under. |
|
data |
Stream | Buffer | String | Number | Boolean | Object | Array | Data to send with each message. |
|
cb |
function |
<optional> |
Optional callback to wait for collect stream data and handle error reporting on each node in |
- Source:
Returns:
If cb
is passed, an array of return streams.
- Type
- Array
Examples
// with callback
kernel.multicall([node1, node2], "job", "hello", (err, data) => {
// data is an array of Buffers on success
// ...
});
// w/o callback
var rstreamList = kernel.multicall([node1, node2], "job", "hello");
rstreamList.forEach((rstream) => {
rstream.on("data", (data) => {
// data handler ...
}).on("error", (error) => {
// error handler ...
}).on("end", () => {
// end handler ...
});
});
multicallSingleton(nodes, event, data, cb)
Makes a synchronous call to a list of external nodes nodes
, sending data
over and then waits for a complete response from all nodes (using this.call(...)
on each node in nodes
).
Parameters:
Name | Type | Description |
---|---|---|
nodes |
Array | Nodes to send |
event |
String | Event this message is sent under. |
data |
Buffer | String | Number | Object | Array | Boolean | Data to send with each message. |
cb |
function | Callback to wait for message data and handle error reporting on each node in |
- Source:
Example
kernel.multicallSingleton([node1, node2], "job", "hello", (err, data) => {
// data is an array of JSON and/or Buffers on success
// ...
});
ping(node, cb)
Makes a synchronous ping to external node node
. To accomplish this, a tag is passed that uniquely identifies the returnee. This netkernel then listens for a message with the event ID'd as this tag and passes data into the callback.
Parameters:
Name | Type | Description |
---|---|---|
node |
Clusterluck.Node | Node to send |
cb |
function | Callback to receive the ping response to |
- Source:
port(portopt) → {Number}
Acts as a getter/setter for the port of this netkernel, modifying the port value of the self
node if used as a setter.
Parameters:
Name | Type | Attributes | Description |
---|---|---|---|
port |
Number |
<optional> |
Port to set on this netkernel. |
- Source:
Returns:
Port of this instance.
- Type
- Number
reply(from, data) → {Clusterluck.NetKernel}
Replies to a synchronous message received on this netkernel. If responding to a request made with callSingleton
, a stream should not be passed in, as the reqquester is only listening for a single message (whereas a stream has at least two messages).
Parameters:
Name | Type | Description | |||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
from |
Object | From object received on synchronous message. This contains a tag to uniquely identity the request on the sender's end. Properties
|
|||||||||
data |
Stream | Buffer | String | Number | Boolean | Object | Array | Data to send with this message. |
- Source:
Returns:
This instance.
self(nodeopt) → {Clusterluck.Node}
Acts as a getter/setter for the node representing this netkernel, modifying the id, port, and host of this instance if used as a setter.
Parameters:
Name | Type | Attributes | Description |
---|---|---|---|
node |
Clusterluck.Node |
<optional> |
Node to set as "self" on this netkernel. |
- Source:
Returns:
Self-referencing node of this instance.
- Type
- Clusterluck.Node
sinks(sinksopt) → {Map}
Acts as a getter/setter for the sink map this netkernel has external connections to.
Parameters:
Name | Type | Attributes | Description |
---|---|---|---|
sinks |
Map |
<optional> |
Map of (node id => node) to set on this netkernel. |
- Source:
Returns:
Map of external node connections of this instance.
- Type
- Map
sources(sourcesopt) → {Map}
Acts as a getter/setter for the source map this netkernel has sockets receiving data from.
Parameters:
Name | Type | Attributes | Description |
---|---|---|---|
sources |
Map |
<optional> |
Map of (node => socket connections) to set on this netkernel. |
- Source:
Returns:
Map of socket connections of this instance.
- Type
- Map
start(optsopt) → {Clusterluck.NetKernel}
Starts the IPC server on the configured host and port, as well as the routing routine. Can be configured with a cluster cookie (for hmac signature cluster member checks), the retry interval for failed message sends, the number of times to retry sending a message, and whether to use TLS on TCP sockets between nodes or not.
Parameters:
Name | Type | Attributes | Description | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
opts |
Object |
<optional> |
Options object for IPC server configuration between nodes. Properties
|
- Source:
Returns:
This instance.
stop() → {Clusterluck.NetKernel}
Stops the IPC server on this node and empties any sockets receiving data from external nodes. Maintains connections for sending messages to external nodes. All listening processes should be terminated before calling this function, in order to safely finish any job streams.
- Source:
Fires:
Returns:
This instance.
Events
NetKernel:_ready
Emitted when this instance's IPC server has started, and ready to receive messages from other nodes.
- Source:
NetKernel:_skip
Emitted whenever this instance has received a message that a) contained invalid JSON or b) had a mismatched HMAC checksum in the body.
Properties:
Name | Type | Description |
---|---|---|
data |
Object | Contents of skipped message. |
- Source:
NetKernel:_stop
Emitted when this instance has stopped it's IPC server and stopped receiving messages from other nodes.
- Source:
NetKernel:user_defined
Emitted whenever this instance has received a message that has passed security checks and ready to be routed to a GenServer/event handler.
Properties:
Name | Type | Description | |||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
id |
String | Local target of this message (corresponding to a GenServer). |
|||||||||
data |
Buffer | Data buffer of message. |
|||||||||
stream |
Object | Stream object for this message. Properties
|
|||||||||
from |
Object | From object received on a message. This contains a tag to uniquely identity the request on the sender's end. Properties
|
- Source: