Source: bridge.js

"use strict";

var AppServiceRegistration = require("matrix-appservice").AppServiceRegistration;
var AppService = require("matrix-appservice").AppService;
var ClientFactory = require("./components/client-factory");
var AppServiceBot = require("./components/app-service-bot");
var RequestFactory = require("./components/request-factory");
var Intent = require("./components/intent");
var RoomBridgeStore = require("./components/room-bridge-store");
var UserBridgeStore = require("./components/user-bridge-store");
var MatrixUser = require("./models/users/matrix");
var MatrixRoom = require("./models/rooms/matrix");
var fs = require("fs");
var yaml = require("js-yaml");
var Promise = require("bluebird");
var Datastore = require("nedb");
var util = require("util");

/**
 * @constructor
 * @param {Object} opts Options to pass to the bridge
 * @param {AppServiceRegistration|string} opts.registration Application service
 * registration object or path to the registration file.
 * @param {string} opts.homeserverUrl The base HS url
 * @param {string} opts.domain The domain part for user_ids and room aliases
 * e.g. "bar" in "@foo:bar".
 * @param {Object} opts.controller The controller logic for the bridge.
 * @param {Bridge~onEvent} opts.controller.onEvent Function. Called when
 * an event has been received from the HS.
 * @param {Bridge~onUserQuery=} opts.controller.onUserQuery Function. If supplied,
 * the bridge will invoke this function when queried via onUserQuery. If
 * not supplied, no users will be provisioned on user queries. Provisioned users
 * will automatically be stored in the associated <code>userStore</code>.
 * @param {Bridge~onAliasQuery=} opts.controller.onAliasQuery Function. If supplied,
 * the bridge will invoke this function when queried via onAliasQuery. If
 * not supplied, no rooms will be provisioned on alias queries. Provisioned rooms
 * will automatically be stored in the associated <code>roomStore</code>.
 * @param {Bridge~onLog=} opts.controller.onLog Function. Invoked when
 * logging. Defaults to a function which logs to the console.
 * @param {(RoomBridgeStore|string)=} opts.roomStore The room store instance to
 * use, or the path to the room .db file to load. A database will be created if
 * this is not specified.
 * @param {(UserBridgeStore|string)=} opts.userStore The user store instance to
 * use, or the path to the user .db file to load. A database will be created if
 * this is not specified.
 * @param {boolean=} opts.suppressEcho True to stop receiving onEvent callbacks
 * for events which were sent by a bridge user. Default: true.
 * @param {ClientFactory=} opts.clientFactory The client factory instance to
 * use. If not supplied, one will be created.
 * @param {Object=} opts.intentOptions Options to supply to created Intent instances.
 * @param {Object=} opts.intentOptions.bot Options to supply to the bot intent.
 * @param {Object=} opts.intentOptions.clients Options to supply to the client intents.
 * @param {Object=} opts.queue Options for the onEvent queue. When the bridge
 * receives an incoming transaction, it needs to asyncly query the data store for
 * contextual info before calling onEvent. A queue is used to keep the onEvent
 * calls consistent with the arrival order from the incoming transactions.
 * @param {string=} opts.queue.type The type of queue to use when feeding through
 * to {@link Bridge~onEvent}. One of: "none", single", "per_room". If "none",
 * events are fed through as soon as contextual info is obtained, which may result
 * in out of order events but stops HOL blocking. If "single", onEvent calls will
 * be in order but may be slower due to HOL blocking. If "per_room", a queue per
 * room ID is made which reduces the impact of HOL blocking to be scoped to a room.
 * Default: "single".
 * @param {boolean=} opts.queue.perRequest True to only feed through the next
 * event after the request object in the previous call succeeds or fails. It is
 * <b>vital</b> that you consistently resolve/reject the request if this is 'true',
 * else you will not get any further events from this queue. To aid debugging this,
 * consider setting a delayed listener on the request factory. If false, the mere
 * invockation of onEvent is enough to trigger the next event in the queue.
 * You probably want to set this to 'true' if your {@link Bridge~onEvent} is
 * performing async operations where ordering matters (e.g. messages). Default: false.
 */
function Bridge(opts) {
    if (typeof opts !== "object") {
        throw new Error("opts must be supplied.");
    }
    var required = [
        "homeserverUrl", "registration", "domain", "controller"
    ];
    required.forEach(function(key) {
        if (!opts[key]) {
            throw new Error("Missing '" + key + "' in opts.");
        }
    });
    if (typeof opts.controller.onEvent !== "function") {
        throw new Error("controller.onEvent is a required function");
    }

    opts.userStore = opts.userStore || "user-store.db";
    opts.roomStore = opts.roomStore || "room-store.db";
    opts.queue = opts.queue || {};
    opts.intentOptions = opts.intentOptions || {};
    opts.queue.type = opts.queue.type || "single";
    if (opts.queue.perRequest === undefined) {
        opts.queue.perRequest = false;
    }

    // Default: logger -> log to console
    opts.controller.onLog = opts.controller.onLog || function(text, isError) {
        if (isError) {
            console.error(text);
            return;
        }
        console.log(text);
    };
    // Default: suppress echo -> True
    if (opts.suppressEcho === undefined) {
        opts.suppressEcho = true;
    }

    // we'll init these at runtime
    this.appService = null;
    this.opts = opts;
    this._clientFactory = null;
    this._botClient = null;
    this._appServiceBot = null;
    this._requestFactory = null;
    this._botIntent = null;
    this._intents = {
        // user_id + request_id : Intent
    };
    this._intents["bot"] = null;
    this._queue = new EventQueue(this.opts.queue, this._onConsume.bind(this));
    this._prevRequestPromise = Promise.resolve();
}

/**
 * Load the user and room databases. Access them via getUserStore() and getRoomStore().
 * @return {Promise} Resolved/rejected when the user/room databases have been loaded.
 */
Bridge.prototype.loadDatabases = function() {
    var self = this;
    // Load up the databases if they provided file paths to them (or defaults)
    if (typeof self.opts.userStore === "string") {
        self.opts.userStore = loadDatabase(self.opts.userStore, UserBridgeStore);
    }
    if (typeof self.opts.roomStore === "string") {
        self.opts.roomStore = loadDatabase(self.opts.roomStore, RoomBridgeStore);
    }

    // This works because if they provided a string we converted it to a Promise
    // which will be resolved when we have the db instance. If they provided a
    // db instance then this will resolve immediately.
    return Promise.all([
        Promise.resolve(self.opts.userStore).then(function(db) {
            self._userStore = db;
        }),
        Promise.resolve(self.opts.roomStore).then(function(db) {
            self._roomStore = db;
        })
    ]);
};

/**
 * Run the bridge (start listening)
 * @param {Number} port The port to listen on.
 * @param {Object} config Configuration options
 * @param {AppService=} appServiceInstance The AppService instance to attach to.
 * If not provided, one will be created.
 */
Bridge.prototype.run = function(port, config, appServiceInstance) {
    var self = this;

    // Load the registration file into an AppServiceRegistration object.
    if (typeof self.opts.registration === "string") {
        var regObj = yaml.safeLoad(fs.readFileSync(self.opts.registration, 'utf8'));
        self.opts.registration = AppServiceRegistration.fromObject(regObj);
        if (self.opts.registration === null) {
            throw new Error("Failed to parse registration file");
        }
    }

    this._clientFactory = self.opts.clientFactory || new ClientFactory({
        url: self.opts.homeserverUrl,
        token: self.opts.registration.as_token,
        appServiceUserId: (
            "@" + self.opts.registration.sender_localpart + ":" + self.opts.domain
        )
    });
    this._clientFactory.setLogFunction(function(text, isErr) {
        if (!self.opts.controller.onLog) {
            return;
        }
        self.opts.controller.onLog(text, isErr);
    });
    this._botClient = this._clientFactory.getClientAs();
    this._appServiceBot = new AppServiceBot(
        this._botClient, self.opts.registration
    );
    this._requestFactory = new RequestFactory();
    if (this.opts.controller.onLog) {
        this._requestFactory.addDefaultResolveCallback(function(req, res) {
            self.opts.controller.onLog(
                "[" + req.getId() + "] SUCCESS (" + req.getDuration() + "ms)"
            );
        });
        this._requestFactory.addDefaultRejectCallback(function(req, err) {
            self.opts.controller.onLog(
                "[" + req.getId() + "] FAILED (" + req.getDuration() + "ms) " +
                (err ? util.inspect(err) : "")
            );
        });
    }
    var botIntentOpts = { registered: true };
    if (this.opts.intentOptions.bot) { // copy across opts
        Object.keys(this.opts.intentOptions.bot).forEach(function(k) {
            botIntentOpts[k] = self.opts.intentOptions.bot[k];
        });
    }
    this._botIntent = new Intent(this._botClient, this._botClient, botIntentOpts);
    this._intents = {
        // user_id + request_id : Intent
    };
    this._intents["bot"] = this._botIntent;

    this.appService = appServiceInstance || new AppService({
        homeserverToken: this.opts.registration.getHomeserverToken()
    });
    this.appService.onUserQuery = this._onUserQuery.bind(this);
    this.appService.onAliasQuery = this._onAliasQuery.bind(this);
    this.appService.on("event", this._onEvent.bind(this));
    this.appService.on("http-log", function(line) {
        if (!self.opts.controller.onLog) {
            return;
        }
        self.opts.controller.onLog(line, false);
    });
    this.appService.listen(port);
    return this.loadDatabases();
};

/**
 * Retrieve a connected room store instance.
 * @return {?RoomBridgeStore} The connected instance ready for querying.
 */
Bridge.prototype.getRoomStore = function() {
    return this._roomStore;
};

/**
 * Retrieve a connected user store instance.
 * @return {?UserBridgeStore} The connected instance ready for querying.
 */
Bridge.prototype.getUserStore = function() {
    return this._userStore;
};

/**
 * Retrieve the request factory used to create incoming requests.
 * @return {RequestFactory}
 */
Bridge.prototype.getRequestFactory = function() {
    return this._requestFactory;
};

/**
 * Retrieve the matrix client factory used when sending matrix requests.
 * @return {ClientFactory}
 */
Bridge.prototype.getClientFactory = function() {
    return this._clientFactory;
};

/**
 * Get the AS bot instance.
 * @return {AppServiceBot}
 */
Bridge.prototype.getBot = function() {
    return this._appServiceBot;
};

/**
 * Retrieve an Intent instance for the specified user ID. If no ID is given, an
 * instance for the bot itself is returned.
 * @param {?string} userId The user ID to get an Intent for.
 * @param {Request=} request Optional. The request instance to tie the MatrixClient
 * instance to. Useful for logging contextual request IDs.
 * @return {Intent} The intent instance
 */
Bridge.prototype.getIntent = function(userId, request) {
    var self = this;
    if (!userId) {
        return this._botIntent;
    }
    var key = userId + (request ? request.getId() : "");
    if (!this._intents[key]) {
        var client = this._clientFactory.getClientAs(userId, request);
        var clientIntentOpts = {};
        if (this.opts.intentOptions.clients) {
            Object.keys(this.opts.intentOptions.clients).forEach(function(k) {
                clientIntentOpts[k] = self.opts.intentOptions.clients[k];
            });
        }
        this._intents[key] = new Intent(client, this._botClient, clientIntentOpts);
    }
    return this._intents[key];
};

/**
 * Retrieve an Intent instance for the specified user ID localpart. This <i>must
 * be the complete user localpart</i>.
 * @param {?string} localpart The user ID localpart to get an Intent for.
 * @param {Request=} request Optional. The request instance to tie the MatrixClient
 * instance to. Useful for logging contextual request IDs.
 * @return {Intent} The intent instance
 */
Bridge.prototype.getIntentFromLocalpart = function(localpart, request) {
    return this.getIntent(
        "@" + localpart + ":" + this.opts.domain
    );
};

/**
 * Provision a user on the homeserver.
 * @param {MatrixUser} matrixUser The virtual user to be provisioned.
 * @param {Bridge~ProvisionedUser} provisionedUser Provisioning information.
 * @return {Promise} Resolved when provisioned.
 */
Bridge.prototype.provisionUser = function(matrixUser, provisionedUser) {
    var self = this;
    var promise = self._botClient.register(matrixUser.localpart).then(function() {
        return self._userStore.setMatrixUser(matrixUser);
    });

    // storage promise chain
    if (provisionedUser.remote) {
        promise = promise.then(function() {
            return self._userStore.linkUsers(
                matrixUser, provisionedUser.remote
            );
        });
    }

    // HTTP promise chain
    var newUser = self._clientFactory.getClientAs(matrixUser.getId());
    if (provisionedUser.name) {
        promise = promise.then(function() {
            return newUser.setDisplayName(provisionedUser.name);
        });
    }
    if (provisionedUser.url) {
        promise = promise.then(function() {
            return newUser.setAvatarUrl(provisionedUser.url);
        });
    }
    return promise;
};

Bridge.prototype._onUserQuery = function(userId) {
    var self = this;
    if (self.opts.controller.onUserQuery) {
        var matrixUser = new MatrixUser(userId);
        return Promise.resolve(
            self.opts.controller.onUserQuery(matrixUser)
        ).then(function(provisionedUser) {
            if (!provisionedUser) {
                throw new Error("Not provisioning user for this ID");
            }
            return self.provisionUser(matrixUser, provisionedUser);
        });
    }
    return Promise.resolve();
};

Bridge.prototype._onAliasQuery = function(alias) {
    var self = this;
    var remoteRoom = null;
    if (self.opts.controller.onAliasQuery) {
        return Promise.resolve(
            self.opts.controller.onAliasQuery(alias, alias.split(":")[0].substring(1))
        ).then(function(provisionedRoom) {
            if (!provisionedRoom) {
                throw new Error("Not provisioning room for this alias");
            }
            // do the HTTP hit
            remoteRoom = provisionedRoom.remote;
            return self._botClient.createRoom(
                provisionedRoom.creationOpts
            );
        }).then(function(createRoomResponse) {
            // persist the mapping in the store
            var roomId = createRoomResponse.room_id;
            var matrixRoom = new MatrixRoom(roomId);
            if (remoteRoom) {
                return self._roomStore.linkRooms(matrixRoom, remoteRoom);
            }
            // store the matrix room only
            return self._roomStore.setMatrixRoom(matrixRoom);
        });
    }
    return Promise.resolve();
};

// returns a Promise for the request linked to this event for testing.
Bridge.prototype._onEvent = function(event) {
    this._updateIntents(event);
    if (this.opts.suppressEcho &&
            this.opts.registration.isUserMatch(event.user_id, true)) {
        return Promise.resolve();
    }

    var self = this;
    var request = this._requestFactory.newRequest({ data: event });
    var context = new BridgeContext({
        sender: event.user_id,
        target: event.state_key,
        room: event.room_id
    });
    var data = {
        request: request,
        context: context
    };
    var promise = context.get(this._roomStore, this._userStore);

    if (this.opts.queue.type === "none") { // consume as soon as we have context
        promise.done(function() {
            self._onConsume(null, data);
        }, function(err) {
            self._onConsume(err);
        });
        return request.getPromise();
    }

    if (this.opts.queue.perRequest) {
        promise = Promise.settle([
            promise,
            this._prevRequestPromise
        ]);
        this._prevRequestPromise = request.getPromise();
    }

    this._queue.push(event, data, promise);
    this._queue.consume();
    return request.getPromise();
};

Bridge.prototype._onConsume = function(err, data) {
    if (!err) {
        this.opts.controller.onEvent(data.request, data.context);
        return;
    }
    if (!this.opts.controller.onLog) {
        return;
    }
    this.opts.controller.onLog(
        "onEvent failure: " + err
    );
};

Bridge.prototype._updateIntents = function(event) {
    var self = this;
    Object.keys(this._intents).forEach(function(key) {
        self._intents[key].onEvent(event);
    });
};

module.exports = Bridge;

function loadDatabase(path, Cls) {
    var defer = Promise.defer();
    var db = new Datastore({
        filename: path,
        autoload: true,
        onload: function(err) {
            if (err) {
                defer.reject(err);
            }
            else {
                defer.resolve(new Cls(db));
            }
        }
    });
    return defer.promise;
}

function EventQueue(opts, consumeFn) {
    this.type = opts.type;
    this._queues = {
        // $identifier: {
        //  events: [ {data: promise: } ],
        //  consuming: true|false
        // }
    };
    this.consumeFn = consumeFn;
}

EventQueue.prototype.push = function(event, data, promise) {
    var identifier = this.type === "per_room" ? event.room_id : "none";
    if (!this._queues[identifier]) {
        this._queues[identifier] = {
            events: [],
            consuming: false
        };
    }
    this._queues[identifier].events.push({
        data: data,
        promise: promise
    });
};

EventQueue.prototype.consume = function() {
    var self = this;
    Object.keys(this._queues).forEach(function(identifier) {
        if (!self._queues[identifier].consuming) {
            self._queues[identifier].consuming = true;
            self._takeNext(identifier);
        }
    });
};

EventQueue.prototype._takeNext = function(identifier) {
    var self = this;
    var events = this._queues[identifier].events;
    if (events.length === 0) {
        this._queues[identifier].consuming = false;
        return;
    }
    var entry = events.shift();
    entry.promise.done(function() {
        self.consumeFn(null, entry.data);
        self._takeNext(identifier);
    }, function(e) {
        self.consumeFn(e, null);
        self._takeNext(identifier);
    });
};

function BridgeContext(ctx) {
    this._ctx = ctx;
    this.senders = {
        matrix: new MatrixUser(ctx.sender),
        remote: null,
        remotes: []
    };
    this.targets = {
        matrix: ctx.target ? new MatrixUser(ctx.target) : null,
        remote: null,
        remotes: []
    };
    this.rooms = {
        matrix: new MatrixRoom(ctx.room),
        remote: null,
        remotes: []
    };
}

BridgeContext.prototype.get = function(roomStore, userStore) {
    var self = this;
    return Promise.try(function() {
        return [
            roomStore.getLinkedRemoteRooms(self._ctx.room),
            userStore.getRemoteUsersFromMatrixId(self._ctx.sender),
            (self._ctx.target ?
                userStore.getRemoteUsersFromMatrixId(self._ctx.target) :
                Promise.resolve([])),
            roomStore.getMatrixRoom(self._ctx.room),
            userStore.getMatrixUser(self._ctx.sender)
        ];
    }).spread(function(remoteRooms, remoteSenders, remoteTargets, mxRoom, mxSender) {
        if (remoteRooms && remoteRooms.length > 0) {
            self.rooms.remotes = remoteRooms;
            self.rooms.remote = remoteRooms[0];
        }
        if (remoteSenders && remoteSenders.length > 0) {
            self.senders.remotes = remoteSenders;
            self.senders.remote = remoteSenders[0];
        }
        if (remoteTargets && remoteTargets.length > 0) {
            self.targets.remotes = remoteTargets;
            self.targets.remote = remoteTargets[0];
        }
        if (mxRoom) {
            self.rooms.matrix = mxRoom;
        }
        if (mxSender) {
            self.senders.matrix = mxSender;
        }
    });
};

/**
 * @typedef Bridge~BridgeContext
 * @type {Object}
 * @property {Object} senders Data models on senders of this event
 * @property {MatrixUser} senders.matrix The sender of this event
 * @property {?RemoteUser} senders.remote The first linked remote sender: remotes[0]
 * @property {RemoteUser[]} senders.remotes The linked remote senders
 * @property {Object} targets Data models on targets (e.g. state_key in
 * m.room.member) of this event.
 * @property {?MatrixUser} targets.matrix The target of this event if applicable.
 * @property {?RemoteUser} targets.remote The first linked remote target: remotes[0]
 * @property {RemoteUser[]} targets.remotes The linked remote targets
 * @property {Object} rooms Data models on rooms concerning this event.
 * @property {MatrixRoom} rooms.matrix The room for this event.
 * @property {?RemoteRoom} rooms.remote The first linked remote room: remotes[0]
 * @property {RemoteRoom[]} rooms.remotes The linked remote rooms for this event
 */

/**
 * @typedef Bridge~ProvisionedUser
 * @type {Object}
 * @property {string=} name The display name to set for the provisioned user.
 * @property {string=} url The avatar URL to set for the provisioned user.
 * @property {RemoteUser=} remote The remote user to link to the provisioned user.
 */

/**
 * @typedef Bridge~ProvisionedRoom
 * @type {Object}
 * @property {Object} creationOpts Room creation options to use when creating the
 * room. Required.
 * @property {RemoteRoom=} remote The remote room to link to the provisioned room.
 */

/**
 * Invoked when the bridge receives a user query from the homeserver. Supports
 * both sync return values and async return values via promises.
 * @callback Bridge~onUserQuery
 * @param {MatrixUser} matrixUser The matrix user queried. Use <code>getId()</code>
 * to get the user ID.
 * @return {?Bridge~ProvisionedUser|Promise<Bridge~ProvisionedUser, Error>}
 * Reject the promise / return null to not provision the user. Resolve the
 * promise / return a {@link Bridge~ProvisionedUser} object to provision the user.
 * @example
 * new Bridge({
 *   controller: {
 *     onUserQuery: function(matrixUser) {
 *       var remoteUser = new RemoteUser("some_remote_id");
 *       return {
 *         name: matrixUser.localpart + " (Bridged)",
 *         url: "http://someurl.com/pic.jpg",
 *         user: remoteUser
 *       };
 *     }
 *   }
 * });
 */

/**
 * Invoked when the bridge receives an alias query from the homeserver. Supports
 * both sync return values and async return values via promises.
 * @callback Bridge~onAliasQuery
 * @param {string} alias The alias queried.
 * @param {string} aliasLocalpart The parsed localpart of the alias.
 * @return {?Bridge~ProvisionedRoom|Promise<Bridge~ProvisionedRoom, Error>}
 * Reject the promise / return null to not provision the room. Resolve the
 * promise / return a {@link Bridge~ProvisionedRoom} object to provision the room.
 * @example
 * new Bridge({
 *   controller: {
 *     onAliasQuery: function(alias, aliasLocalpart) {
 *       return {
 *         creationOpts: {
 *           room_alias_name: aliasLocalpart, // IMPORTANT: must be set to make the link
 *           name: aliasLocalpart,
 *           topic: "Auto-generated bridged room"
 *         }
 *       };
 *     }
 *   }
 * });
 */

 /**
 * Invoked when the bridge receives an event from the homeserver.
 * @callback Bridge~onEvent
 * @param {Request} request The request to resolve or reject depending on the
 * outcome of this request. The 'data' attached to this Request is the raw event
 * JSON received (accessed via <code>request.getData()</code>)
 * @param {Bridge~BridgeContext} context Context for this event, including
 * instantiated client instances.
 */

 /**
 * Invoked when the bridge is attempting to log something.
 * @callback Bridge~onLog
 * @param {string} line The text to be logged.
 * @param {boolean} isError True if this line should be treated as an error msg.
 */