const isString = require( "lodash.isstring" ); const isFunction = require( "lodash.isfunction" ); const autobahn = require( "autobahn" ); /** * @typedef RPC * @type {Object} * @property {string} name The name of the RPC. * @property {function} func The function to execute. */ /** * @typedef options * @type {Object} * @property {Object} connect See {@link https://github.com/crossbario/autobahn-js/blob/master/doc/reference.md#connection-options|connection options} * @property {string} [connect.url="ws://localhost:8080/ws"] Crossbar "url" to connect to. * @property {string} [connect.realm="realm1"] Crossbar "realm" for the "url". * @property {Object} [publish={}] See {@link https://github.com/crossbario/autobahn-js/blob/master/doc/reference.md#publish|publish options} * @property {Object} [subscribe={}] See {@link https://github.com/crossbario/autobahn-js/blob/master/doc/reference.md#subscribe|subscribe options} * @property {Object} [call={}] See {@link https://github.com/crossbario/autobahn-js/blob/master/doc/reference.md#call|call options} * @property {Object} [register={}] See {@link https://github.com/crossbario/autobahn-js/blob/master/doc/reference.md#register|register options} */ /** * @public * @author Pedro Miguel P. S. Martins * @version 1.2.0 * @module crossbarjs * @desc * Encapsulates crossbar publish/subscribe and register/unregister/call functionality into a facade, easier to use and reason about. */ const crossbarFactory = () => { const DEFAULT_OPTS = Object.freeze( { connect: { "url": "ws://localhost:8080/ws", "realm": "realm1" }, publish: {}, subscribe: {}, call: {}, register: {} } ); const subscritionMap = new Map(), registrationMap = new Map(); let connection, options = Object.assign( {}, DEFAULT_OPTS ); /** * @public * @function connect * @param {Object=} [connectOpts] Connection object with the * options to connect. * The provided Object must have both * an <code>url</code> and a * <code>realm</code> properties to * properly connect or else it fails. * If no object is passed, the * function will use the default object. * @param {string} [connectOpts.url = "ws://localhost:8080/ws"] The connection 'url' as described in autobahn connection options. * @param {string} [connectOpts.realm = "realm1"] The connection 'realm' as described in autobahn connection options. * @returns {Promise} * * @description * Connects this instance to the given direction. * Resolves if a connection is established **and** opened successfully. * If it fails to open the connection, it rejects with a reason and an optional details object. * * @see {@link https://github.com/crossbario/autobahn-js/blob/master/doc/reference.md#connection-options|autobahn-js connection options} * * @example <caption>Creates a connection with the default parameters:</caption> * const crossbarjs = require("crossbarjs"); * * const crossbar = crossbarjs(); * crossbar.connect() * .then(() => console.log("Great Success!")) * .catch((reason, details) => { * console.log(`Failed becasue ${reason}: ${JSON.stringify(details)}`); * }); * * @example <caption>Creates a connections with custom parameters:</caption> * const crossbarjs = require("crossbarjs"); * * const crossbar = crossbarjs(); * const connectParams = {url: "myURL", realm: "Lovecraft"}; * crossbar.connect(connectParams) * .then(() => console.log("Great Success!")) * .catch((reason, details) => { * console.log(`Failed becasue ${reason}: ${JSON.stringify(details)}`); * }); * * @example <caption>Additionally, you may also change the "options.connect":</caption> * const crossbarjs = require("crossbarjs"); * * const crossbar = crossbarjs(); * * crossbar.setOpts({ * connect: { url: "myURL", realm: "Lovecraft" } * }); * * crossbar.connect() * .then(() => console.log("Great Success!")) * .catch((reason, details) => { * console.log(`Failed becasue ${reason}: ${JSON.stringify(details)}`); * }); */ const connect = function ( connectOpts = options.connect ) { return new Promise( ( resolve, reject ) => { connection = new autobahn.Connection( connectOpts ); connection.onopen = () => { events.onOpen(); resolve(); connection.onopen = () => { events.onOpen(); recover() .then( events.onRecover ) .catch( events.onError ); }; }; connection.onclose = ( reason, details ) => { events.onClose( reason, details ); reject( reason, details ); connection.onclose = events.onClose; }; connection.open(); } ); }; /** * @private * @function recover * * @description * This is responsible for the recovery proccess. Every time a connection is recovered, this function will re-subscribe and re-register everything to the functionality of the service is not interrupted. */ const recover = () => recoverFromMap( "subscribe", subscritionMap ) .then( () => recoverFromMap( "register", registrationMap ) ); const recoverFromMap = async function ( action, map ) { for ( const [ key, data ] of map ) { await add( action, key, data.cb, options[ action ], { set: () => {} } ).catch( err => events.onError( err ) ); } }; const events = { onClose: () => {}, onOpen: () => {}, onRecover: () => {}, onError: () => {} }; /** * @public * @function onClose * @param {function} fun The function to be called when a connection closes. * @throws {TypeError} If <code>fun</code> is not a function. * * @description * Hook for when the connection closes. This usually happens when crossbar itself dies or closes its connections. * The passed function will receive 2 parameters, <code>reason</code>, a human readable reason for why the connection was closed, and a second optional parameter <code>details</code>, an object containing the details of why the connection was closed. This second parameter is not always passed. * * @example <caption>Creating a hook that logs when a connection was closed:</caption> * const crossbarjs = require("crossbarjs"); * * const crossbar = crossbarjs(); * crossbar.connect() * .then(() => crossbar.onClose(console.log)) //if crossbar dies, this gets fired * .catch((reason, details) => { * console.log(`Failed becasue ${reason}: ${JSON.stringify(details)}`); * }); */ const onClose = fun => { if ( !isFunction( fun ) ) throw new TypeError( `${fun} must be a Function.` ); events.onClose = fun; }; /** * @public * @function onOpen * @param {function} fun The function to be called when a connection opens. * @throws {TypeError} If <code>fun</code> is not a function. * * @description * Hook for when the connection opens. This usually happens when the application first connects to crossbar and when the connection is lost and later on recovered. * The passed function will receive no parameters. * * @example <caption>Creating a hook that logs when a connection opens:</caption> * const crossbarjs = require("crossbarjs"); * * const crossbar = crossbarjs(); * crossbar.onOpen(() => console.log("I'm alive!")); * crossbar.connect() * .catch((reason, details) => { * console.log(`Failed becasue ${reason}: ${JSON.stringify(details)}`); * }); */ const onOpen = fun => { events.onOpen = fun; }; /** * @public * @function onRecover * @param {function} fun The function to be called when a connection recovers. * @throws {TypeError} If <code>fun</code> is not a function. * * @description * Hook for when the connection recovers. A connection recovers when it has closed unexpectadly and then reconnects activating the recover proceaduer, that re-subscribes and re-registers any calls previously done automatically. * The passed function will receive no parameters. * * @example <caption>Creating a hook that logs when a connection recovers:</caption> * const crossbarjs = require("crossbarjs"); * * const crossbar = crossbarjs(); * crossbar.onRecover(() => console.log("I'm back baby!")); * crossbar.connect() * .catch((reason, details) => { * console.log(`Failed becasue ${reason}: ${JSON.stringify(details)}`); * }); * //kill crossbar * //start crossbar * //message should appear */ const onRecover = fun => { events.onRecover = fun; }; /** * @public * @function onError * @param {function} fun The function to be called when an Error occurs. * @throws {TypeError} If <code>fun</code> is not a function. * * @description * Hook for when an error occurs. Errors may occur when crossbarjs is attempting automatic reconnection or becasue some other component failed. * The passed function will receive the <code>error</code> as a parameter. * * @example <caption>Creating a hook that logs when an error occurs:</caption> * const crossbarjs = require("crossbarjs"); * * const crossbar = crossbarjs(); * crossbar.onRecover(error => console.log(`Got error: ${error}`)); */ const onError = fun => { events.onError = fun; }; /** * @public * @function disconnect * @param {string} [reason="wamp.goodbye.normal"] WAMP URI providing a closing reason e.g. 'com.myapp.close.signout' to the server side. * @param {string} [message] Human-readable closing message. * @returns {Promise} * * @description * Closes the crossbar connection. Resolves once the connection is closed or rejects if there was an error closing. * * @example <caption>Simply disconnect:</caption> * * //imagine we have previously connected * crossbar.disconnect() * .then(() => console.log("disconnected!")) * .catch(console.log); * * @example <caption>Disconnect after connecting:</caption> * const crossbarjs = require("crossbarjs"); * * const crossbar = crossbarjs(); * crossbar.connect() * .then(() => console.log("connected!")) * .then(() => crossbar.disconnect("com.myapp.close.signout", "client does not like our service !!!!")) * .then(() => console.log("disconnected!")) * .catch(console.log); * * @example <caption>Error while disconnecting:</caption> * const crossbarjs = require("crossbarjs"); * * const crossbar = crossbarjs(); * crossbar.disconnect() * .catch(console.log); //error, we never connected in the first place! */ const disconnect = function ( reason, message ) { return new Promise( ( resolve, reject ) => { connection.onclose = resolve; try { connection.close( reason, message ); } catch ( error ) { reject( error ); } } ); }; /** * @public * @function getSession * @returns {Session} * * @description * Returns the current autobahn.Session object. * Ideally you shouldn't need to use it with the current interface, but in case you need you can have direct access to it. * * @see {@link https://github.com/crossbario/autobahn-js/blob/master/doc/reference.md#sessions|autobahn-js sessions} * * @example <caption>Using a session:</caption> * //Assuming we have previously connected * const session = crossbar.getSession(); * console.log(`Session id is: ${session.id}`); */ const getSession = function () { return connection.session; }; /** * @public * @function getConnection * @returns {Connection} * * @description Returns the current autobahn.Connection object. * * @see {@link https://github.com/crossbario/autobahn-js/blob/master/doc/reference.md#connections|autobahn-js connections} * * @example <caption>Using a connection:</caption> * //Assuming we have previously connected * const conn = crossbar.getConnection(); */ const getConnection = function () { return connection; }; /** * @public * @function register * @param {(string|RPC[])} args It can either receive two arguments, * a string and a function, to register * one RPC, or it can receive an array of * RPC objects, to register them all. * @returns {Promise} * * @description * Registers the given RPCs, biinding each RPC to a name. * It can either register a single RPC, or an array of RPC objects. * Resolves if all RPCs were registered successfully or rejects if one of them fails. * * @example <caption>Registering a single RPC:</caption> * //Assuming we have previously connected * const myHello = () => { * console.log("Hello World"); * } * * crossbar.register("hello", myHello) * .then(() => console.log("great success!")) * .catch(console.log); * * @example <caption>Registering multiple RPCs:</caption> * //Assuming we have previously connected * const myHello = () => { * console.log("Hello World"); * } * * const myGoodbye = () => { * console.log("Goodbye World!"); * }; * * const RPCs = [ * { name: "hello" , func: myHello }, * { name: "bye" , func: myGoodbye } * ]; * * crossbar.register(RPCs) * .then(() => console.log("great success!")) * .catch(console.log); */ const register = function ( ...args ) { const argsArray = Array.from( args ); if ( Array.isArray( argsArray[ 0 ] ) ) return registerMany( argsArray[ 0 ] ); if ( isString( argsArray[ 0 ] ) && isFunction( argsArray[ 1 ] ) && argsArray.length === 2 ) return registerOne( argsArray[ 0 ], argsArray[ 1 ] ); return Promise.reject( new Error( "Unrecognized parameters" ) ); }; const registerOne = function ( name, func ) { if ( !isString( name ) ) { return Promise.reject( new TypeError( `${name} must be a String.` ) ); } if ( !isFunction( func ) ) { return Promise.reject( new TypeError( `${func} must be a Function.` ) ); } return add( "register", name, deCrossbarify( func ), options.register, registrationMap ); }; const registerMany = async function ( rpcList ) { for ( const rpc of rpcList ) { await registerOne( rpc.name, rpc.func ) .catch( err => { throw new Error( `Failed to register "${rpc.name}": ${JSON.stringify(err)}` ); } ); } }; /** * @public * @function unregister * @param {...string} args The names of the RPCs to unregister * @returns {Promise} * * @description * Unregisters the RPC with the given name, or all the RPCs with the names provided in the array. * Returns a promise once all RPCs have be unregistered successfully or rejects if one of them fails. * * @example <caption>Unregister a single RPC:</caption> * //Assuming we have previously connected and registered "hello" * crossbar.unregister("hello") * .then(() => console.log("great success!")) * .catch(console.log); * * @example <caption>Unregister multiple RPCs:</caption> * //Assuming we have previously connected and registered the RPCs with the given names * crossbar.unregister("hello", "bye") * .then(() => console.log("great success!")) * .catch(console.log); */ const unregister = function ( ...args ) { return unregisterMany( args ); }; const unregisterOne = function ( name ) { if ( !isString( name ) ) { return Promise.reject( new TypeError( `${name} must be a String.` ) ); } if ( !registrationMap.has( name ) ) { return Promise.reject( new Error( `${name} is not registered.` ) ); } return remove( "unregister", name, registrationMap ); }; const unregisterMany = async function ( rpcNamesList ) { for ( const rpcName of rpcNamesList ) { await unregisterOne( rpcName ) .catch( err => { throw new Error( `Failed to unregister "${rpcName.name}": ${JSON.stringify(err)}` ); } ); } }; /** * @public * @function call * @param {string} rpcName The name of the RPC we wish to call. * @param {...Object} args Variable number of arguments we wish to * pass. * @returns {Promise} * * @description * Calls the RPC with the given name, providing the given arguments. * Resolves if it succeeds, rejects otherwise. * * @example <caption>Call an RPC with no arguments:</caption> * //Assuming we have previously connected and registered the RPC "hello" * * const hello = () => { * console.log("Hello World"); * }; * * crossbar.call("hello") * .then(() => console.log("great success!")) * .catch(console.log); * * @example <caption>Call an RPC with multiple arguments:</caption> * //Assuming we have previously connected and registered the RPC "add" * * const add = (n1, n2) => n1 + n2; * * crossbar.call("add", 1, 2) * .then(sum => console.log(`sum is: ${sum}`)) * .catch(console.log); */ const call = function ( rpcName, ...args ) { try { return getSession().call( rpcName, args, {}, options.call ); } catch ( error ) { return Promise.reject( error ); } }; /** * @public * @function getOpts * @returns {options} * * @description Returns a clone of the options object. * * @example <caption>Get a clone of the options object:</caption> * let opts = crossbar.getOpts(); * opts = {}; //this wont alter the object being used in crossbarjs */ const getOpts = function () { return Object.assign( {}, options ); }; /** * @public * @function setOpts * @param {Object} newOpts The options we want to add. * * @description * Concatenates the given options object with the current one. * This is the only way to change the <code>options</code> object. * * @see {options} * * @example <caption>Add publish parameters to the options object:</caption> * crossbar.setOpts({ * publish: { some options } * }); * console.log(JSON.stringify(crossbar.getOpts())); * //will print * //{ * // connect: { * // "url": "ws://localhost:8080/ws", * // "realm": "realm1" * // }, * // publish: { some options }, * // subscribe: {}, * // call: {}, * // register: {} * //} */ const setOpts = function ( newOpts ) { Object.assign( options, newOpts ); }; /** * @public * @function setOptsDefault * * @description Resets the options object to its default state. * * @see {options} */ const setOptsDefault = function () { options = Object.assign( {}, DEFAULT_OPTS ); }; /** * @public * @function publish * @param {string} topic The topic of the message. * @param {...Object} params The parameters that the subscribed * functions will receive. * @returns {Promise} * * @description * Publishes the given topic with the given list of variable parameters. * Resolves if it succeeds, rejects otherwise. * * @example <caption>Publish a topic:</caption> * //Assuming we are already connected * crossbar.publish("add", 1, 2) * .then(() => console.log("Published!")) * .catch(console.log); */ const publish = function ( topic, ...params ) { //autobahn-js only returns promise under specific circumstances. We // fix that here. let res; try { res = getSession().publish( topic, params, {}, options.publish ); } catch ( error ) { return Promise.reject( error ); } return options.publish.acknowledge !== undefined ? res : Promise.resolve(); }; /** * @public * @function subscribe * @param {string} topic The topic to wich we want to subscribe. * @param {function} callback The function to execute every time we * receive a message. * @returns {Promise} * * @description * Subscribes to the given topic, executing the function every time crossbar receives a message. * Resolves if the subscription was successful, rejects otherwise. * * @example <caption>Subscribe to the topic "add". See <code>publish</code>:</caption> * //Assuming we are already connected * const myAdd = (n1, n2) => n1 + n2; * * crossbar.subscribe("add", myAdd); * .then(() => console.log("Subscribed!")) * .catch(console.log); */ const subscribe = function ( topic, callback ) { if ( subscritionMap.has( topic ) ) { return Promise.reject( new Error( `Already subscribed to ${topic}` ) ); } return add( "subscribe", topic, deCrossbarify( callback ), options.subscribe, subscritionMap ); }; /** * @private * @function add * @param {string} action A session's function name to execute. In * theory it should have been the function * itself, but since there were some * context issues, I decided to pass the * function's name and then execute it. * @param {string} id The id of the thing we will be adding. * @param {function} callback The function we associate with the given * id. * @param {Object} options Options object for the action. * @param {Map} map The map that will save the association * betwwen the id and the result of the * action. * @returns {Promise} * * @description * Introduced after codeclimate code quality analysis as a means to remove duplication betwwen regiterOne and subscribe, since they both have the same structure. * */ const add = ( action, id, callback, options, map ) => { return getSession()[ action ]( id, callback, options ) .then( result => { map.set( id, { cb: callback, opResult: result } ); } ); }; /** * @private * @function add * @param {string} action A session's function name to execute. In * theory it should have been the function * itself, but since there were some * context issues, I decided to pass the * function's name and then execute it. * @param {string} id The id of the thing we will be removing. * @param {Map} map The map containing the id. * @returns {Promise} * * @description * Introduced after codeclimate code quality analysis as a means to remove duplication betwwen regiterOne and subscribe, since they both have the same structure. * */ const remove = ( action, id, map ) => { return getSession()[ action ]( map.get( id ).opResult ) .then( () => { map.delete( id ); } ); }; /** * @private * @function deCrossbarify * @param {function} callback The function with the actual parameters. * @returns {function} * * @description * <p> * To register and subscribe to crossbar events, you either need to have all arguments in an array, or in a object. * This approach is counter intuitive and cumbersome, and many beginners have issues with it. * </p> * <p> * This function takes the array argument, and spreads it to the given function. * This way people can have functions with all the arguments listed as subscribers and RPCs. * The code is thus cleaner and easier to reason about. * </p> */ const deCrossbarify = callback => args => callback.call( null, ...args ); /** * @public * @function unsubscribe * @param {string} topic The topic to which we want to unsubscribe. * @returns {Promise} * * @description Unsubscribes from the given topic. Resolves if successful, * rejects otherwise. * * @example <caption>Unsubscribe to the topic "add". See <code>subscribe</code>:</caption> * //Assuming we are already connected * crossbar.unsubscribe("add"); * .then(() => console.log("Unsubscribed!")) * .catch(console.log); */ const unsubscribe = function ( topic ) { if ( !subscritionMap.has( topic ) ) { return Promise.reject( new Error( `Not subscribed to ${topic}` ) ); } return remove( "unsubscribe", topic, subscritionMap ); }; return Object.freeze( { connect, disconnect, getSession, getConnection, register, unregister, call, setOpts, getOpts, setOptsDefault, publish, subscribe, unsubscribe, onOpen, onClose, onRecover, onError } ); }; module.exports = crossbarFactory;