Source: crossbar.js

const isString = require( "lodash.isstring" );
const isFunction = require( "lodash.isfunction" );
const autobahn = require( "autobahn" );

/**
 *  @typedef  RPC
 *  @type     {Object}
 *  @property {string}    name  The name of the RPC.
 *  @property {function}  func  The function to execute.
 */

/**
 *  @typedef  options
 *  @type     {Object}
 *  @property {Object}  connect             See {@link https://github.com/crossbario/autobahn-js/blob/master/doc/reference.md#connection-options|connection options}
 *  @property {string}  [connect.url="ws://localhost:8080/ws"]  Crossbar "url" to connect to.
 *  @property {string}  [connect.realm="realm1"]                Crossbar "realm" for the "url".
 *  @property {Object}  [publish={}]        See {@link https://github.com/crossbario/autobahn-js/blob/master/doc/reference.md#publish|publish options}
 *  @property {Object}  [subscribe={}]      See {@link https://github.com/crossbario/autobahn-js/blob/master/doc/reference.md#subscribe|subscribe options}
 *  @property {Object}  [call={}]           See {@link https://github.com/crossbario/autobahn-js/blob/master/doc/reference.md#call|call options}
 *  @property {Object}  [register={}]       See {@link https://github.com/crossbario/autobahn-js/blob/master/doc/reference.md#register|register options}
 */

/**
 *  @public
 *  @author   Pedro Miguel P. S. Martins
 *  @version  1.2.0
 *  @module   crossbarjs
 *  @desc
 *  Encapsulates crossbar publish/subscribe and register/unregister/call functionality into a facade, easier to use and reason about.
 */
const crossbarFactory = () => {

    const DEFAULT_OPTS = Object.freeze( {
        connect: {
            "url": "ws://localhost:8080/ws",
            "realm": "realm1"
        },
        publish: {},
        subscribe: {},
        call: {},
        register: {}
    } );

    const subscritionMap = new Map(),
        registrationMap = new Map();

    let connection,
        options = Object.assign( {}, DEFAULT_OPTS );

    /**
     *  @public
     *  @function connect
     *  @param    {Object=}  [connectOpts]  Connection object with the
     *                                      options to connect.
     *                                      The provided Object must have both
     *                                      an <code>url</code> and a
     *                                      <code>realm</code> properties to
     *                                      properly connect or else it fails.
     *                                      If no  object is passed, the
     *                                      function will use the default object.
     *  @param    {string}  [connectOpts.url = "ws://localhost:8080/ws"]  The connection 'url' as described in autobahn connection options.
     *  @param    {string}  [connectOpts.realm = "realm1"]                The connection 'realm' as described in autobahn connection options.
     *  @returns  {Promise}
     *
     *  @description
     *  Connects this instance to the given direction.
     *  Resolves if a connection is established **and** opened successfully.
     *  If it fails to open the connection, it rejects with a reason and an optional  details object.
     *
     *  @see          {@link https://github.com/crossbario/autobahn-js/blob/master/doc/reference.md#connection-options|autobahn-js connection options}
     *
     *  @example  <caption>Creates a connection with the default parameters:</caption>
     *  const crossbarjs = require("crossbarjs");
     *
     *  const crossbar  = crossbarjs();
     *  crossbar.connect()
     *    .then(() => console.log("Great Success!"))
     *    .catch((reason, details) => {
     *        console.log(`Failed becasue ${reason}: ${JSON.stringify(details)}`);
     *    });
     *
     *  @example  <caption>Creates a connections with custom parameters:</caption>
     *  const crossbarjs = require("crossbarjs");
     *
     *  const crossbar  = crossbarjs();
     *  const connectParams = {url: "myURL", realm: "Lovecraft"};
     *  crossbar.connect(connectParams)
     *    .then(() => console.log("Great Success!"))
     *    .catch((reason, details) => {
     *        console.log(`Failed becasue ${reason}: ${JSON.stringify(details)}`);
     *    });
     *
     *  @example  <caption>Additionally, you may also change the "options.connect":</caption>
     *  const crossbarjs = require("crossbarjs");
     *
     *  const crossbar  = crossbarjs();
     *
     *  crossbar.setOpts({
     *    connect: { url: "myURL", realm: "Lovecraft" }
     *  });
     *
     *  crossbar.connect()
     *    .then(() => console.log("Great Success!"))
     *    .catch((reason, details) => {
     *        console.log(`Failed becasue ${reason}: ${JSON.stringify(details)}`);
     *    });
     */
    const connect = function ( connectOpts = options.connect ) {
        return new Promise( ( resolve, reject ) => {
            connection = new autobahn.Connection( connectOpts );
            connection.onopen = () => {
                events.onOpen();
                resolve();

                connection.onopen = () => {
                    events.onOpen();
                    recover()
                        .then( events.onRecover )
                        .catch( events.onError );
                };
            };

            connection.onclose = ( reason, details ) => {
                events.onClose( reason, details );
                reject( reason, details );
                connection.onclose = events.onClose;
            };
            connection.open();
        } );
    };

    /**
     * @private
     * @function  recover
     *
     * @description
     * This is responsible for the recovery proccess. Every time a connection is recovered, this function will re-subscribe and re-register everything to the functionality of the service is not interrupted.
     */
    const recover = () =>
        recoverFromMap( "subscribe", subscritionMap )
            .then( () => recoverFromMap( "register", registrationMap ) );

    const recoverFromMap = async function ( action, map ) {
        for ( const [ key, data ] of map ) {
            await add(
                action,
                key,
                data.cb,
                options[ action ], { set: () => {} }
            ).catch( err => events.onError( err ) );
        }
    };

    const events = {
        onClose: () => {},
        onOpen: () => {},
        onRecover: () => {},
        onError: () => {}
    };

    /**
     * @public
     * @function  onClose
     * @param     {function}   fun   The function to be called when a connection closes.
     * @throws    {TypeError} If <code>fun</code> is not a function.
     *
     * @description
     * Hook for when the connection closes. This usually happens when crossbar itself dies or closes its connections.
     * The passed function will receive 2 parameters, <code>reason</code>, a human readable reason for why the connection was closed, and a second optional parameter <code>details</code>, an object containing the details of why the connection was closed. This second parameter is not always passed.
     *
     * @example <caption>Creating a hook that logs when a connection was closed:</caption>
     * const crossbarjs = require("crossbarjs");
     *
     * const crossbar  = crossbarjs();
     * crossbar.connect()
     *    .then(() => crossbar.onClose(console.log))  //if crossbar dies, this gets fired
     *    .catch((reason, details) => {
     *        console.log(`Failed becasue ${reason}: ${JSON.stringify(details)}`);
     * });
     */
    const onClose = fun => {
        if ( !isFunction( fun ) )
            throw new TypeError( `${fun} must be a Function.` );
        events.onClose = fun;
    };

    /**
     * @public
     * @function  onOpen
     * @param     {function}   fun   The function to be called when a connection opens.
     * @throws    {TypeError} If <code>fun</code> is not a function.
     *
     * @description
     * Hook for when the connection opens. This usually happens when the application first connects to crossbar and when the connection is lost and later on recovered.
     * The passed function will receive no parameters.
     *
     * @example <caption>Creating a hook that logs when a connection opens:</caption>
     * const crossbarjs = require("crossbarjs");
     *
     * const crossbar  = crossbarjs();
     * crossbar.onOpen(() => console.log("I'm alive!"));
     * crossbar.connect()
     *    .catch((reason, details) => {
     *        console.log(`Failed becasue ${reason}: ${JSON.stringify(details)}`);
     * });
     */
    const onOpen = fun => {
        events.onOpen = fun;
    };

    /**
     * @public
     * @function  onRecover
     * @param     {function}   fun   The function to be called when a connection recovers.
     * @throws    {TypeError} If <code>fun</code> is not a function.
     *
     * @description
     * Hook for when the connection recovers. A connection recovers when it has closed unexpectadly and then reconnects activating the recover proceaduer, that re-subscribes and re-registers any calls previously done automatically.
     * The passed function will receive no parameters.
     *
     * @example <caption>Creating a hook that logs when a connection recovers:</caption>
     * const crossbarjs = require("crossbarjs");
     *
     * const crossbar  = crossbarjs();
     * crossbar.onRecover(() => console.log("I'm back baby!"));
     * crossbar.connect()
     *    .catch((reason, details) => {
     *        console.log(`Failed becasue ${reason}: ${JSON.stringify(details)}`);
     * });
     * //kill crossbar
     * //start crossbar
     * //message should appear
     */
    const onRecover = fun => {
        events.onRecover = fun;
    };

    /**
     * @public
     * @function  onError
     * @param     {function}   fun   The function to be called when an Error occurs.
     * @throws    {TypeError} If <code>fun</code> is not a function.
     *
     * @description
     * Hook for when an error occurs. Errors may occur when crossbarjs is attempting automatic reconnection or becasue some other component failed.
     * The passed function will receive the <code>error</code> as a parameter.
     *
     * @example <caption>Creating a hook that logs when an error occurs:</caption>
     * const crossbarjs = require("crossbarjs");
     *
     * const crossbar  = crossbarjs();
     * crossbar.onRecover(error => console.log(`Got error: ${error}`));
     */
    const onError = fun => {
        events.onError = fun;
    };

    /**
     *  @public
     *  @function disconnect
     *  @param    {string}  [reason="wamp.goodbye.normal"]  WAMP URI providing a closing reason e.g. 'com.myapp.close.signout' to the server side.
     *  @param    {string}  [message]                       Human-readable closing message.
     *  @returns {Promise}
     *
     *  @description
     *  Closes the crossbar connection. Resolves once the connection is closed or rejects if there was an error closing.
     *
     *  @example <caption>Simply disconnect:</caption>
     *
     *  //imagine we have previously connected
     *  crossbar.disconnect()
     *    .then(() => console.log("disconnected!"))
     *    .catch(console.log);
     *
     *  @example <caption>Disconnect after connecting:</caption>
     *  const crossbarjs = require("crossbarjs");
     *
     *  const crossbar  = crossbarjs();
     *  crossbar.connect()
     *    .then(() => console.log("connected!"))
     *    .then(() => crossbar.disconnect("com.myapp.close.signout", "client does not like our service !!!!"))
     *    .then(() => console.log("disconnected!"))
     *    .catch(console.log);
     *
     * @example <caption>Error while disconnecting:</caption>
     *  const crossbarjs = require("crossbarjs");
     *
     *  const crossbar  = crossbarjs();
     *  crossbar.disconnect()
     *    .catch(console.log);  //error, we never connected in the first place!
     */
    const disconnect = function ( reason, message ) {
        return new Promise( ( resolve, reject ) => {
            connection.onclose = resolve;
            try {
                connection.close( reason, message );
            } catch ( error ) {
                reject( error );
            }
        } );
    };

    /**
     *  @public
     *  @function getSession
     *  @returns {Session}
     *
     *  @description
     *  Returns the current autobahn.Session object.
     *  Ideally you shouldn't need to use it with the current interface, but in case you need you can have direct access to it.
     *
     *  @see {@link https://github.com/crossbario/autobahn-js/blob/master/doc/reference.md#sessions|autobahn-js sessions}
     *
     *  @example <caption>Using a session:</caption>
     *  //Assuming we have previously connected
     *  const session = crossbar.getSession();
     *  console.log(`Session id is: ${session.id}`);
     */
    const getSession = function () {
        return connection.session;
    };

    /**
     *  @public
     *  @function getConnection
     *  @returns {Connection}
     *
     *  @description  Returns the current autobahn.Connection object.
     *
     *  @see  {@link  https://github.com/crossbario/autobahn-js/blob/master/doc/reference.md#connections|autobahn-js connections}
     *
     *  @example <caption>Using a connection:</caption>
     *  //Assuming we have previously connected
     *  const conn = crossbar.getConnection();
     */
    const getConnection = function () {
        return connection;
    };

    /**
     *  @public
     *  @function register
     *  @param    {(string|RPC[])}  args  It can either receive two arguments,
     *                                    a string and a function, to register
     *                                    one RPC, or it can receive an array of
     *                                    RPC objects, to register them all.
     *  @returns  {Promise}
     *
     *  @description
     *  Registers the given RPCs, biinding each RPC to a name.
     *  It can either register a single RPC, or an array of RPC objects.
     *  Resolves if all RPCs were registered successfully or rejects if one of them fails.
     *
     *  @example <caption>Registering a single RPC:</caption>
     *  //Assuming we have previously connected
     *  const myHello = () => {
     *      console.log("Hello World");
     *  }
     *
     *  crossbar.register("hello", myHello)
     *      .then(() => console.log("great success!"))
     *      .catch(console.log);
     *
     *  @example <caption>Registering multiple RPCs:</caption>
     *  //Assuming we have previously connected
     *  const myHello = () => {
     *      console.log("Hello World");
     *  }
     *
     *  const myGoodbye = () => {
     *      console.log("Goodbye World!");
     *  };
     *
     *  const RPCs = [
     *      { name: "hello" , func: myHello   },
     *      { name: "bye"   , func: myGoodbye }
     *  ];
     *
     *  crossbar.register(RPCs)
     *      .then(() => console.log("great success!"))
     *      .catch(console.log);
     */
    const register = function ( ...args ) {
        const argsArray = Array.from( args );

        if ( Array.isArray( argsArray[ 0 ] ) )
            return registerMany( argsArray[ 0 ] );

        if ( isString( argsArray[ 0 ] ) && isFunction( argsArray[ 1 ] ) && argsArray.length === 2 )
            return registerOne( argsArray[ 0 ], argsArray[ 1 ] );

        return Promise.reject( new Error( "Unrecognized parameters" ) );
    };

    const registerOne = function ( name, func ) {
        if ( !isString( name ) ) {
            return Promise.reject( new TypeError( `${name} must be a String.` ) );
        }

        if ( !isFunction( func ) ) {
            return Promise.reject( new TypeError( `${func} must be a Function.` ) );
        }

        return add(
            "register",
            name,
            deCrossbarify( func ),
            options.register,
            registrationMap
        );
    };

    const registerMany = async function ( rpcList ) {
        for ( const rpc of rpcList ) {
            await registerOne( rpc.name, rpc.func )
                .catch( err => {
                    throw new Error( `Failed to register "${rpc.name}":
                      ${JSON.stringify(err)}` );
                } );
        }
    };

    /**
     *  @public
     *  @function unregister
     *  @param  {...string} args  The names of the RPCs to unregister
     *  @returns {Promise}
     *
     *  @description
     *  Unregisters the RPC with the given name, or all the RPCs with the names provided in the array.
     *  Returns a promise once all RPCs have be unregistered successfully or rejects if one of them fails.
     *
     *  @example <caption>Unregister a single RPC:</caption>
     *  //Assuming we have previously connected and registered "hello"
     *  crossbar.unregister("hello")
     *      .then(() => console.log("great success!"))
     *      .catch(console.log);
     *
     *  @example <caption>Unregister multiple RPCs:</caption>
     *  //Assuming we have previously connected and registered the RPCs with the given names
     *  crossbar.unregister("hello", "bye")
     *      .then(() => console.log("great success!"))
     *      .catch(console.log);
     */
    const unregister = function ( ...args ) {
        return unregisterMany( args );
    };

    const unregisterOne = function ( name ) {
        if ( !isString( name ) ) {
            return Promise.reject( new TypeError( `${name} must be a String.` ) );
        }

        if ( !registrationMap.has( name ) ) {
            return Promise.reject( new Error( `${name} is not registered.` ) );
        }

        return remove(
            "unregister",
            name,
            registrationMap
        );
    };

    const unregisterMany = async function ( rpcNamesList ) {
        for ( const rpcName of rpcNamesList ) {
            await unregisterOne( rpcName )
                .catch( err => {
                    throw new Error( `Failed to unregister "${rpcName.name}":
                      ${JSON.stringify(err)}` );
                } );
        }
    };

    /**
     *  @public
     *  @function call
     *  @param    {string}    rpcName The name of the RPC we wish to call.
     *  @param    {...Object} args    Variable number of arguments we wish to
     *                                pass.
     *  @returns  {Promise}
     *
     *  @description
     *  Calls the RPC with the given name, providing the given arguments.
     *  Resolves if it succeeds, rejects otherwise.
     *
     *  @example <caption>Call an RPC with no arguments:</caption>
     *  //Assuming we have previously connected and registered the RPC "hello"
     *
     *  const hello = () => {
     *      console.log("Hello World");
     *  };
     *
     *  crossbar.call("hello")
     *      .then(() => console.log("great success!"))
     *      .catch(console.log);
     *
     *  @example <caption>Call an RPC with multiple arguments:</caption>
     *  //Assuming we have previously connected and registered the RPC "add"
     *
     *  const add = (n1, n2) => n1 + n2;
     *
     *  crossbar.call("add", 1, 2)
     *      .then(sum => console.log(`sum is: ${sum}`))
     *      .catch(console.log);
     */
    const call = function ( rpcName, ...args ) {
        try {
            return getSession().call( rpcName, args, {}, options.call );
        } catch ( error ) {
            return Promise.reject( error );
        }
    };

    /**
     *  @public
     *  @function getOpts
     *  @returns  {options}
     *
     *  @description  Returns a clone of the options object.
     *
     *  @example <caption>Get a clone of the options object:</caption>
     *  let opts = crossbar.getOpts();
     *  opts = {};  //this wont alter the object being used in crossbarjs
     */
    const getOpts = function () {
        return Object.assign( {}, options );
    };

    /**
     *  @public
     *  @function setOpts
     *  @param    {Object}  newOpts The options we want to add.
     *
     *  @description
     *  Concatenates the given options object with the current one.
     *  This is the only way to change the <code>options</code> object.
     *
     *  @see {options}
     *
     *  @example <caption>Add publish parameters to the options object:</caption>
     *  crossbar.setOpts({
     *      publish: { some options }
     *  });
     *  console.log(JSON.stringify(crossbar.getOpts()));
     *  //will print
     *  //{
     *  //  connect: {
     *  //    "url": "ws://localhost:8080/ws",
     *  //    "realm": "realm1"
     *  //  },
     *  //  publish: { some options },
     *  //  subscribe: {},
     *  //  call: {},
     *  //  register: {}
     *  //}
     */
    const setOpts = function ( newOpts ) {
        Object.assign( options, newOpts );
    };

    /**
     *  @public
     *  @function setOptsDefault
     *
     *  @description  Resets the options object to its default state.
     *
     *  @see  {options}
     */
    const setOptsDefault = function () {
        options = Object.assign( {}, DEFAULT_OPTS );
    };

    /**
     *  @public
     *  @function publish
     *  @param    {string}    topic   The topic of the message.
     *  @param    {...Object} params  The parameters that the subscribed
     *                                functions will receive.
     *  @returns  {Promise}
     *
     *  @description
     *  Publishes the given topic with the given list of variable parameters.
     *  Resolves if it succeeds, rejects otherwise.
     *
     *  @example <caption>Publish a topic:</caption>
     *  //Assuming we are already connected
     *  crossbar.publish("add", 1, 2)
     *      .then(() => console.log("Published!"))
     *      .catch(console.log);
     */
    const publish = function ( topic, ...params ) {
        //autobahn-js only returns promise under specific circumstances. We
        // fix that here.
        let res;
        try {
            res = getSession().publish( topic, params, {}, options.publish );
        } catch ( error ) {
            return Promise.reject( error );
        }

        return options.publish.acknowledge !== undefined ? res : Promise.resolve();
    };

    /**
     *  @public
     *  @function subscribe
     *  @param    {string}    topic     The topic to wich we want to subscribe.
     *  @param    {function}  callback  The function to execute every time we
     *                                  receive a message.
     *  @returns  {Promise}
     *
     *  @description
     *  Subscribes to the given topic, executing the function every time crossbar receives a message.
     *  Resolves if the subscription was successful, rejects otherwise.
     *
     *  @example <caption>Subscribe to the topic "add". See <code>publish</code>:</caption>
     *  //Assuming we are already connected
     *  const myAdd = (n1, n2) => n1 + n2;
     *
     *  crossbar.subscribe("add", myAdd);
     *      .then(() => console.log("Subscribed!"))
     *      .catch(console.log);
     */
    const subscribe = function ( topic, callback ) {

        if ( subscritionMap.has( topic ) ) {
            return Promise.reject( new Error( `Already subscribed to ${topic}` ) );
        }

        return add(
            "subscribe",
            topic,
            deCrossbarify( callback ),
            options.subscribe,
            subscritionMap
        );
    };

    /**
     * @private
     * @function  add
     * @param     {string}    action    A session's function name to execute. In
     *                                  theory it should have been the function
     *                                  itself, but since there were some
     *                                  context issues, I decided to pass the
     *                                  function's name and then execute it.
     * @param     {string}    id        The id of the thing we will be adding.
     * @param     {function}  callback  The function we associate with the given
     *                                  id.
     * @param     {Object}    options   Options object for the action.
     * @param     {Map}       map       The map that will save the association
     *                                  betwwen the id and the result of the
     *                                  action.
     * @returns   {Promise}
     *
     * @description
     * Introduced after codeclimate code quality analysis as a means to remove duplication betwwen regiterOne and subscribe, since they both have the same structure.
     * */
    const add = ( action, id, callback, options, map ) => {
        return getSession()[ action ]( id, callback, options )
            .then( result => {
                map.set( id, {
                    cb: callback,
                    opResult: result
                } );
            } );
    };

    /**
     * @private
     * @function  add
     * @param     {string}    action    A session's function name to execute. In
     *                                  theory it should have been the function
     *                                  itself, but since there were some
     *                                  context issues, I decided to pass the
     *                                  function's name and then execute it.
     * @param     {string}    id        The id of the thing we will be removing.
     * @param     {Map}       map       The map containing the id.
     * @returns   {Promise}
     *
     * @description
     * Introduced after codeclimate code quality analysis as a means to remove duplication betwwen regiterOne and subscribe, since they both have the same structure.
     * */
    const remove = ( action, id, map ) => {
        return getSession()[ action ]( map.get( id ).opResult )
            .then( () => {
                map.delete( id );
            } );
    };

    /**
     * @private
     * @function  deCrossbarify
     * @param     {function}  callback  The function with the actual parameters.
     * @returns   {function}
     *
     * @description
     *  <p>
     *    To register and subscribe to crossbar events, you either need to have all arguments in an array, or in a object.
     *    This approach is counter intuitive and cumbersome, and many beginners have issues with it.
     *  </p>
     *  <p>
     *    This function takes the array argument, and spreads it to the given function.
     *    This way people can have functions with all the arguments listed as subscribers and RPCs.
     *    The code is thus cleaner and easier to reason about.
     *  </p>
     */
    const deCrossbarify = callback => args => callback.call( null, ...args );

    /**
     *  @public
     *  @function unsubscribe
     *  @param    {string}  topic The topic to which we want to unsubscribe.
     *  @returns  {Promise}
     *
     *  @description  Unsubscribes from the given topic. Resolves if successful,
     *                rejects otherwise.
     *
     *  @example <caption>Unsubscribe to the topic "add". See <code>subscribe</code>:</caption>
     *  //Assuming we are already connected
     *  crossbar.unsubscribe("add");
     *      .then(() => console.log("Unsubscribed!"))
     *      .catch(console.log);
     */
    const unsubscribe = function ( topic ) {
        if ( !subscritionMap.has( topic ) ) {
            return Promise.reject( new Error( `Not subscribed to ${topic}` ) );
        }

        return remove(
            "unsubscribe",
            topic,
            subscritionMap
        );
    };

    return Object.freeze( {
        connect,
        disconnect,
        getSession,
        getConnection,
        register,
        unregister,
        call,
        setOpts,
        getOpts,
        setOptsDefault,
        publish,
        subscribe,
        unsubscribe,
        onOpen,
        onClose,
        onRecover,
        onError
    } );
};

module.exports = crossbarFactory;