"use strict";
var AppServiceRegistration = require("matrix-appservice").AppServiceRegistration;
var AppService = require("matrix-appservice").AppService;
var ClientFactory = require("./components/client-factory");
var AppServiceBot = require("./components/app-service-bot");
var RequestFactory = require("./components/request-factory");
var Intent = require("./components/intent");
var RoomBridgeStore = require("./components/room-bridge-store");
var UserBridgeStore = require("./components/user-bridge-store");
var MatrixUser = require("./models/users/matrix");
var MatrixRoom = require("./models/rooms/matrix");
var fs = require("fs");
var yaml = require("js-yaml");
var Promise = require("bluebird");
var Datastore = require("nedb");
var util = require("util");
/**
* @constructor
* @param {Object} opts Options to pass to the bridge
* @param {AppServiceRegistration|string} opts.registration Application service
* registration object or path to the registration file.
* @param {string} opts.homeserverUrl The base HS url
* @param {string} opts.domain The domain part for user_ids and room aliases
* e.g. "bar" in "@foo:bar".
* @param {Object} opts.controller The controller logic for the bridge.
* @param {Bridge~onEvent} opts.controller.onEvent Function. Called when
* an event has been received from the HS.
* @param {Bridge~onUserQuery=} opts.controller.onUserQuery Function. If supplied,
* the bridge will invoke this function when queried via onUserQuery. If
* not supplied, no users will be provisioned on user queries. Provisioned users
* will automatically be stored in the associated <code>userStore</code>.
* @param {Bridge~onAliasQuery=} opts.controller.onAliasQuery Function. If supplied,
* the bridge will invoke this function when queried via onAliasQuery. If
* not supplied, no rooms will be provisioned on alias queries. Provisioned rooms
* will automatically be stored in the associated <code>roomStore</code>.
* @param {Bridge~onLog=} opts.controller.onLog Function. Invoked when
* logging. Defaults to a function which logs to the console.
* @param {(RoomBridgeStore|string)=} opts.roomStore The room store instance to
* use, or the path to the room .db file to load. A database will be created if
* this is not specified.
* @param {(UserBridgeStore|string)=} opts.userStore The user store instance to
* use, or the path to the user .db file to load. A database will be created if
* this is not specified.
* @param {boolean=} opts.suppressEcho True to stop receiving onEvent callbacks
* for events which were sent by a bridge user. Default: true.
* @param {ClientFactory=} opts.clientFactory The client factory instance to
* use. If not supplied, one will be created.
* @param {Object=} opts.intentOptions Options to supply to created Intent instances.
* @param {Object=} opts.intentOptions.bot Options to supply to the bot intent.
* @param {Object=} opts.intentOptions.clients Options to supply to the client intents.
* @param {Object=} opts.queue Options for the onEvent queue. When the bridge
* receives an incoming transaction, it needs to asyncly query the data store for
* contextual info before calling onEvent. A queue is used to keep the onEvent
* calls consistent with the arrival order from the incoming transactions.
* @param {string=} opts.queue.type The type of queue to use when feeding through
* to {@link Bridge~onEvent}. One of: "none", single", "per_room". If "none",
* events are fed through as soon as contextual info is obtained, which may result
* in out of order events but stops HOL blocking. If "single", onEvent calls will
* be in order but may be slower due to HOL blocking. If "per_room", a queue per
* room ID is made which reduces the impact of HOL blocking to be scoped to a room.
* Default: "single".
* @param {boolean=} opts.queue.perRequest True to only feed through the next
* event after the request object in the previous call succeeds or fails. It is
* <b>vital</b> that you consistently resolve/reject the request if this is 'true',
* else you will not get any further events from this queue. To aid debugging this,
* consider setting a delayed listener on the request factory. If false, the mere
* invockation of onEvent is enough to trigger the next event in the queue.
* You probably want to set this to 'true' if your {@link Bridge~onEvent} is
* performing async operations where ordering matters (e.g. messages). Default: false.
*/
function Bridge(opts) {
if (typeof opts !== "object") {
throw new Error("opts must be supplied.");
}
var required = [
"homeserverUrl", "registration", "domain", "controller"
];
required.forEach(function(key) {
if (!opts[key]) {
throw new Error("Missing '" + key + "' in opts.");
}
});
if (typeof opts.controller.onEvent !== "function") {
throw new Error("controller.onEvent is a required function");
}
opts.userStore = opts.userStore || "user-store.db";
opts.roomStore = opts.roomStore || "room-store.db";
opts.queue = opts.queue || {};
opts.intentOptions = opts.intentOptions || {};
opts.queue.type = opts.queue.type || "single";
if (opts.queue.perRequest === undefined) {
opts.queue.perRequest = false;
}
// Default: logger -> log to console
opts.controller.onLog = opts.controller.onLog || function(text, isError) {
if (isError) {
console.error(text);
return;
}
console.log(text);
};
// Default: suppress echo -> True
if (opts.suppressEcho === undefined) {
opts.suppressEcho = true;
}
// we'll init these at runtime
this.appService = null;
this.opts = opts;
this._clientFactory = null;
this._botClient = null;
this._appServiceBot = null;
this._requestFactory = null;
this._botIntent = null;
this._intents = {
// user_id + request_id : Intent
};
this._intents["bot"] = null;
this._queue = new EventQueue(this.opts.queue, this._onConsume.bind(this));
this._prevRequestPromise = Promise.resolve();
}
/**
* Load the user and room databases. Access them via getUserStore() and getRoomStore().
* @return {Promise} Resolved/rejected when the user/room databases have been loaded.
*/
Bridge.prototype.loadDatabases = function() {
var self = this;
// Load up the databases if they provided file paths to them (or defaults)
if (typeof self.opts.userStore === "string") {
self.opts.userStore = loadDatabase(self.opts.userStore, UserBridgeStore);
}
if (typeof self.opts.roomStore === "string") {
self.opts.roomStore = loadDatabase(self.opts.roomStore, RoomBridgeStore);
}
// This works because if they provided a string we converted it to a Promise
// which will be resolved when we have the db instance. If they provided a
// db instance then this will resolve immediately.
return Promise.all([
Promise.resolve(self.opts.userStore).then(function(db) {
self._userStore = db;
}),
Promise.resolve(self.opts.roomStore).then(function(db) {
self._roomStore = db;
})
]);
};
/**
* Run the bridge (start listening)
* @param {Number} port The port to listen on.
* @param {Object} config Configuration options
* @param {AppService=} appServiceInstance The AppService instance to attach to.
* If not provided, one will be created.
*/
Bridge.prototype.run = function(port, config, appServiceInstance) {
var self = this;
// Load the registration file into an AppServiceRegistration object.
if (typeof self.opts.registration === "string") {
var regObj = yaml.safeLoad(fs.readFileSync(self.opts.registration, 'utf8'));
self.opts.registration = AppServiceRegistration.fromObject(regObj);
if (self.opts.registration === null) {
throw new Error("Failed to parse registration file");
}
}
this._clientFactory = self.opts.clientFactory || new ClientFactory({
url: self.opts.homeserverUrl,
token: self.opts.registration.as_token,
appServiceUserId: (
"@" + self.opts.registration.sender_localpart + ":" + self.opts.domain
)
});
this._clientFactory.setLogFunction(function(text, isErr) {
if (!self.opts.controller.onLog) {
return;
}
self.opts.controller.onLog(text, isErr);
});
this._botClient = this._clientFactory.getClientAs();
this._appServiceBot = new AppServiceBot(
this._botClient, self.opts.registration
);
this._requestFactory = new RequestFactory();
if (this.opts.controller.onLog) {
this._requestFactory.addDefaultResolveCallback(function(req, res) {
self.opts.controller.onLog(
"[" + req.getId() + "] SUCCESS (" + req.getDuration() + "ms)"
);
});
this._requestFactory.addDefaultRejectCallback(function(req, err) {
self.opts.controller.onLog(
"[" + req.getId() + "] FAILED (" + req.getDuration() + "ms) " +
(err ? util.inspect(err) : "")
);
});
}
var botIntentOpts = { registered: true };
if (this.opts.intentOptions.bot) { // copy across opts
Object.keys(this.opts.intentOptions.bot).forEach(function(k) {
botIntentOpts[k] = self.opts.intentOptions.bot[k];
});
}
this._botIntent = new Intent(this._botClient, this._botClient, botIntentOpts);
this._intents = {
// user_id + request_id : Intent
};
this._intents["bot"] = this._botIntent;
this.appService = appServiceInstance || new AppService({
homeserverToken: this.opts.registration.getHomeserverToken()
});
this.appService.onUserQuery = this._onUserQuery.bind(this);
this.appService.onAliasQuery = this._onAliasQuery.bind(this);
this.appService.on("event", this._onEvent.bind(this));
this.appService.on("http-log", function(line) {
if (!self.opts.controller.onLog) {
return;
}
self.opts.controller.onLog(line, false);
});
this.appService.listen(port);
return this.loadDatabases();
};
/**
* Retrieve a connected room store instance.
* @return {?RoomBridgeStore} The connected instance ready for querying.
*/
Bridge.prototype.getRoomStore = function() {
return this._roomStore;
};
/**
* Retrieve a connected user store instance.
* @return {?UserBridgeStore} The connected instance ready for querying.
*/
Bridge.prototype.getUserStore = function() {
return this._userStore;
};
/**
* Retrieve the request factory used to create incoming requests.
* @return {RequestFactory}
*/
Bridge.prototype.getRequestFactory = function() {
return this._requestFactory;
};
/**
* Retrieve the matrix client factory used when sending matrix requests.
* @return {ClientFactory}
*/
Bridge.prototype.getClientFactory = function() {
return this._clientFactory;
};
/**
* Get the AS bot instance.
* @return {AppServiceBot}
*/
Bridge.prototype.getBot = function() {
return this._appServiceBot;
};
/**
* Retrieve an Intent instance for the specified user ID. If no ID is given, an
* instance for the bot itself is returned.
* @param {?string} userId The user ID to get an Intent for.
* @param {Request=} request Optional. The request instance to tie the MatrixClient
* instance to. Useful for logging contextual request IDs.
* @return {Intent} The intent instance
*/
Bridge.prototype.getIntent = function(userId, request) {
var self = this;
if (!userId) {
return this._botIntent;
}
var key = userId + (request ? request.getId() : "");
if (!this._intents[key]) {
var client = this._clientFactory.getClientAs(userId, request);
var clientIntentOpts = {};
if (this.opts.intentOptions.clients) {
Object.keys(this.opts.intentOptions.clients).forEach(function(k) {
clientIntentOpts[k] = self.opts.intentOptions.clients[k];
});
}
this._intents[key] = new Intent(client, this._botClient, clientIntentOpts);
}
return this._intents[key];
};
/**
* Retrieve an Intent instance for the specified user ID localpart. This <i>must
* be the complete user localpart</i>.
* @param {?string} localpart The user ID localpart to get an Intent for.
* @param {Request=} request Optional. The request instance to tie the MatrixClient
* instance to. Useful for logging contextual request IDs.
* @return {Intent} The intent instance
*/
Bridge.prototype.getIntentFromLocalpart = function(localpart, request) {
return this.getIntent(
"@" + localpart + ":" + this.opts.domain
);
};
/**
* Provision a user on the homeserver.
* @param {MatrixUser} matrixUser The virtual user to be provisioned.
* @param {Bridge~ProvisionedUser} provisionedUser Provisioning information.
* @return {Promise} Resolved when provisioned.
*/
Bridge.prototype.provisionUser = function(matrixUser, provisionedUser) {
var self = this;
var promise = self._botClient.register(matrixUser.localpart).then(function() {
return self._userStore.setMatrixUser(matrixUser);
});
// storage promise chain
if (provisionedUser.remote) {
promise = promise.then(function() {
return self._userStore.linkUsers(
matrixUser, provisionedUser.remote
);
});
}
// HTTP promise chain
var newUser = self._clientFactory.getClientAs(matrixUser.getId());
if (provisionedUser.name) {
promise = promise.then(function() {
return newUser.setDisplayName(provisionedUser.name);
});
}
if (provisionedUser.url) {
promise = promise.then(function() {
return newUser.setAvatarUrl(provisionedUser.url);
});
}
return promise;
};
Bridge.prototype._onUserQuery = function(userId) {
var self = this;
if (self.opts.controller.onUserQuery) {
var matrixUser = new MatrixUser(userId);
return Promise.resolve(
self.opts.controller.onUserQuery(matrixUser)
).then(function(provisionedUser) {
if (!provisionedUser) {
throw new Error("Not provisioning user for this ID");
}
return self.provisionUser(matrixUser, provisionedUser);
});
}
return Promise.resolve();
};
Bridge.prototype._onAliasQuery = function(alias) {
var self = this;
var remoteRoom = null;
if (self.opts.controller.onAliasQuery) {
return Promise.resolve(
self.opts.controller.onAliasQuery(alias, alias.split(":")[0].substring(1))
).then(function(provisionedRoom) {
if (!provisionedRoom) {
throw new Error("Not provisioning room for this alias");
}
// do the HTTP hit
remoteRoom = provisionedRoom.remote;
return self._botClient.createRoom(
provisionedRoom.creationOpts
);
}).then(function(createRoomResponse) {
// persist the mapping in the store
var roomId = createRoomResponse.room_id;
var matrixRoom = new MatrixRoom(roomId);
if (remoteRoom) {
return self._roomStore.linkRooms(matrixRoom, remoteRoom);
}
// store the matrix room only
return self._roomStore.setMatrixRoom(matrixRoom);
});
}
return Promise.resolve();
};
// returns a Promise for the request linked to this event for testing.
Bridge.prototype._onEvent = function(event) {
this._updateIntents(event);
if (this.opts.suppressEcho &&
this.opts.registration.isUserMatch(event.user_id, true)) {
return Promise.resolve();
}
var self = this;
var request = this._requestFactory.newRequest({ data: event });
var context = new BridgeContext({
sender: event.user_id,
target: event.state_key,
room: event.room_id
});
var data = {
request: request,
context: context
};
var promise = context.get(this._roomStore, this._userStore);
if (this.opts.queue.type === "none") { // consume as soon as we have context
promise.done(function() {
self._onConsume(null, data);
}, function(err) {
self._onConsume(err);
});
return request.getPromise();
}
if (this.opts.queue.perRequest) {
promise = Promise.settle([
promise,
this._prevRequestPromise
]);
this._prevRequestPromise = request.getPromise();
}
this._queue.push(event, data, promise);
this._queue.consume();
return request.getPromise();
};
Bridge.prototype._onConsume = function(err, data) {
if (!err) {
this.opts.controller.onEvent(data.request, data.context);
return;
}
if (!this.opts.controller.onLog) {
return;
}
this.opts.controller.onLog(
"onEvent failure: " + err
);
};
Bridge.prototype._updateIntents = function(event) {
var self = this;
Object.keys(this._intents).forEach(function(key) {
self._intents[key].onEvent(event);
});
};
module.exports = Bridge;
function loadDatabase(path, Cls) {
var defer = Promise.defer();
var db = new Datastore({
filename: path,
autoload: true,
onload: function(err) {
if (err) {
defer.reject(err);
}
else {
defer.resolve(new Cls(db));
}
}
});
return defer.promise;
}
function EventQueue(opts, consumeFn) {
this.type = opts.type;
this._queues = {
// $identifier: {
// events: [ {data: promise: } ],
// consuming: true|false
// }
};
this.consumeFn = consumeFn;
}
EventQueue.prototype.push = function(event, data, promise) {
var identifier = this.type === "per_room" ? event.room_id : "none";
if (!this._queues[identifier]) {
this._queues[identifier] = {
events: [],
consuming: false
};
}
this._queues[identifier].events.push({
data: data,
promise: promise
});
};
EventQueue.prototype.consume = function() {
var self = this;
Object.keys(this._queues).forEach(function(identifier) {
if (!self._queues[identifier].consuming) {
self._queues[identifier].consuming = true;
self._takeNext(identifier);
}
});
};
EventQueue.prototype._takeNext = function(identifier) {
var self = this;
var events = this._queues[identifier].events;
if (events.length === 0) {
this._queues[identifier].consuming = false;
return;
}
var entry = events.shift();
entry.promise.done(function() {
self.consumeFn(null, entry.data);
self._takeNext(identifier);
}, function(e) {
self.consumeFn(e, null);
self._takeNext(identifier);
});
};
function BridgeContext(ctx) {
this._ctx = ctx;
this.senders = {
matrix: new MatrixUser(ctx.sender),
remote: null,
remotes: []
};
this.targets = {
matrix: ctx.target ? new MatrixUser(ctx.target) : null,
remote: null,
remotes: []
};
this.rooms = {
matrix: new MatrixRoom(ctx.room),
remote: null,
remotes: []
};
}
BridgeContext.prototype.get = function(roomStore, userStore) {
var self = this;
return Promise.try(function() {
return [
roomStore.getLinkedRemoteRooms(self._ctx.room),
userStore.getRemoteUsersFromMatrixId(self._ctx.sender),
(self._ctx.target ?
userStore.getRemoteUsersFromMatrixId(self._ctx.target) :
Promise.resolve([])),
roomStore.getMatrixRoom(self._ctx.room),
userStore.getMatrixUser(self._ctx.sender)
];
}).spread(function(remoteRooms, remoteSenders, remoteTargets, mxRoom, mxSender) {
if (remoteRooms && remoteRooms.length > 0) {
self.rooms.remotes = remoteRooms;
self.rooms.remote = remoteRooms[0];
}
if (remoteSenders && remoteSenders.length > 0) {
self.senders.remotes = remoteSenders;
self.senders.remote = remoteSenders[0];
}
if (remoteTargets && remoteTargets.length > 0) {
self.targets.remotes = remoteTargets;
self.targets.remote = remoteTargets[0];
}
if (mxRoom) {
self.rooms.matrix = mxRoom;
}
if (mxSender) {
self.senders.matrix = mxSender;
}
});
};
/**
* @typedef Bridge~BridgeContext
* @type {Object}
* @property {Object} senders Data models on senders of this event
* @property {MatrixUser} senders.matrix The sender of this event
* @property {?RemoteUser} senders.remote The first linked remote sender: remotes[0]
* @property {RemoteUser[]} senders.remotes The linked remote senders
* @property {Object} targets Data models on targets (e.g. state_key in
* m.room.member) of this event.
* @property {?MatrixUser} targets.matrix The target of this event if applicable.
* @property {?RemoteUser} targets.remote The first linked remote target: remotes[0]
* @property {RemoteUser[]} targets.remotes The linked remote targets
* @property {Object} rooms Data models on rooms concerning this event.
* @property {MatrixRoom} rooms.matrix The room for this event.
* @property {?RemoteRoom} rooms.remote The first linked remote room: remotes[0]
* @property {RemoteRoom[]} rooms.remotes The linked remote rooms for this event
*/
/**
* @typedef Bridge~ProvisionedUser
* @type {Object}
* @property {string=} name The display name to set for the provisioned user.
* @property {string=} url The avatar URL to set for the provisioned user.
* @property {RemoteUser=} remote The remote user to link to the provisioned user.
*/
/**
* @typedef Bridge~ProvisionedRoom
* @type {Object}
* @property {Object} creationOpts Room creation options to use when creating the
* room. Required.
* @property {RemoteRoom=} remote The remote room to link to the provisioned room.
*/
/**
* Invoked when the bridge receives a user query from the homeserver. Supports
* both sync return values and async return values via promises.
* @callback Bridge~onUserQuery
* @param {MatrixUser} matrixUser The matrix user queried. Use <code>getId()</code>
* to get the user ID.
* @return {?Bridge~ProvisionedUser|Promise<Bridge~ProvisionedUser, Error>}
* Reject the promise / return null to not provision the user. Resolve the
* promise / return a {@link Bridge~ProvisionedUser} object to provision the user.
* @example
* new Bridge({
* controller: {
* onUserQuery: function(matrixUser) {
* var remoteUser = new RemoteUser("some_remote_id");
* return {
* name: matrixUser.localpart + " (Bridged)",
* url: "http://someurl.com/pic.jpg",
* user: remoteUser
* };
* }
* }
* });
*/
/**
* Invoked when the bridge receives an alias query from the homeserver. Supports
* both sync return values and async return values via promises.
* @callback Bridge~onAliasQuery
* @param {string} alias The alias queried.
* @param {string} aliasLocalpart The parsed localpart of the alias.
* @return {?Bridge~ProvisionedRoom|Promise<Bridge~ProvisionedRoom, Error>}
* Reject the promise / return null to not provision the room. Resolve the
* promise / return a {@link Bridge~ProvisionedRoom} object to provision the room.
* @example
* new Bridge({
* controller: {
* onAliasQuery: function(alias, aliasLocalpart) {
* return {
* creationOpts: {
* room_alias_name: aliasLocalpart, // IMPORTANT: must be set to make the link
* name: aliasLocalpart,
* topic: "Auto-generated bridged room"
* }
* };
* }
* }
* });
*/
/**
* Invoked when the bridge receives an event from the homeserver.
* @callback Bridge~onEvent
* @param {Request} request The request to resolve or reject depending on the
* outcome of this request. The 'data' attached to this Request is the raw event
* JSON received (accessed via <code>request.getData()</code>)
* @param {Bridge~BridgeContext} context Context for this event, including
* instantiated client instances.
*/
/**
* Invoked when the bridge is attempting to log something.
* @callback Bridge~onLog
* @param {string} line The text to be logged.
* @param {boolean} isError True if this line should be treated as an error msg.
*/