"use strict";
const Datastore = require("nedb");
const Promise = require("bluebird");
const fs = require("fs");
const util = require("util");
const yaml = require("js-yaml");
const AppServiceRegistration = require("matrix-appservice").AppServiceRegistration;
const AppService = require("matrix-appservice").AppService;
const MatrixScheduler = require("matrix-js-sdk").MatrixScheduler;
const BridgeContext = require("./components/bridge-context");
const ClientFactory = require("./components/client-factory");
const AppServiceBot = require("./components/app-service-bot");
const RequestFactory = require("./components/request-factory");
const Intent = require("./components/intent");
const RoomBridgeStore = require("./components/room-bridge-store");
const UserBridgeStore = require("./components/user-bridge-store");
const EventBridgeStore = require("./components/event-bridge-store");
const MatrixUser = require("./models/users/matrix");
const MatrixRoom = require("./models/rooms/matrix");
const PrometheusMetrics = require("./components/prometheusmetrics");
const MembershipCache = require("./components/membership-cache");
const RoomLinkValidator = require("./components/room-link-validator").RoomLinkValidator;
const RLVStatus = require("./components/room-link-validator").validationStatuses;
const RoomUpgradeHandler = require("./components/room-upgrade-handler");
const EventNotHandledError = require("./errors").EventNotHandledError;
const InternalError = require("./errors").InternalError;
const EventQueue = require("./components/event-queue").EventQueue;
const log = require("./components/logging").get("bridge");
// The frequency at which we will check the list of accumulated Intent objects.
const INTENT_CULL_CHECK_PERIOD_MS = 1000 * 60; // once per minute
// How long a given Intent object can hang around unused for.
const INTENT_CULL_EVICT_AFTER_MS = 1000 * 60 * 15; // 15 minutes
/**
* @constructor
* @param {Object} opts Options to pass to the bridge
* @param {AppServiceRegistration|string} opts.registration Application service
* registration object or path to the registration file.
* @param {string} opts.homeserverUrl The base HS url
* @param {string} opts.domain The domain part for user_ids and room aliases
* e.g. "bar" in "@foo:bar".
* @param {string} opts.networkName A human readable string that will be used when
* the bridge signals errors to the client. Will not include in error events if ommited.
* @param {Object} opts.controller The controller logic for the bridge.
* @param {Bridge~onEvent} opts.controller.onEvent Function. Called when
* an event has been received from the HS.
* @param {Bridge~onUserQuery=} opts.controller.onUserQuery Function. If supplied,
* the bridge will invoke this function when queried via onUserQuery. If
* not supplied, no users will be provisioned on user queries. Provisioned users
* will automatically be stored in the associated <code>userStore</code>.
* @param {Bridge~onAliasQuery=} opts.controller.onAliasQuery Function. If supplied,
* the bridge will invoke this function when queried via onAliasQuery. If
* not supplied, no rooms will be provisioned on alias queries. Provisioned rooms
* will automatically be stored in the associated <code>roomStore</code>.
* @param {Bridge~onAliasQueried=} opts.controller.onAliasQueried Function.
* If supplied, the bridge will invoke this function when a room has been created
* via onAliasQuery.
* @param {Bridge~onLog=} opts.controller.onLog Function. Invoked when
* logging. Defaults to a function which logs to the console.
* @param {Bridge~thirdPartyLookup=} opts.controller.thirdPartyLookup Object. If
* supplied, the bridge will respond to third-party entity lookups using the
* contained helper functions.
* @param {Bridge~onRoomUpgrade=} opts.controller.onRoomUpgrade Function. If
* supplied, the bridge will invoke this function when it sees an upgrade event
* for a room.
* @param {(RoomBridgeStore|string)=} opts.roomStore The room store instance to
* use, or the path to the room .db file to load. A database will be created if
* this is not specified.
* @param {(UserBridgeStore|string)=} opts.userStore The user store instance to
* use, or the path to the user .db file to load. A database will be created if
* this is not specified.
* @param {(EventBridgeStore|string)=} opts.eventStore The event store instance to
* use, or the path to the event .db file to load. This will NOT be created if it
* isn't specified.
* @param {MembershipCache=} opts.membershipCache The membership cache instance
* to use, which can be manually created by a bridge for greater control over
* caching. By default a membership cache will be created internally.
* @param {boolean=} opts.suppressEcho True to stop receiving onEvent callbacks
* for events which were sent by a bridge user. Default: true.
* @param {ClientFactory=} opts.clientFactory The client factory instance to
* use. If not supplied, one will be created.
* @param {boolean} opts.logRequestOutcome True to enable SUCCESS/FAILED log lines
* to be sent to onLog. Default: true.
* @param {Object=} opts.intentOptions Options to supply to created Intent instances.
* @param {Object=} opts.intentOptions.bot Options to supply to the bot intent.
* @param {Object=} opts.intentOptions.clients Options to supply to the client intents.
* @param {Object=} opts.escapeUserIds Escape userIds for non-bot intents with
* {@link MatrixUser~escapeUserId}
* Default: true
* @param {Object=} opts.queue Options for the onEvent queue. When the bridge
* receives an incoming transaction, it needs to asyncly query the data store for
* contextual info before calling onEvent. A queue is used to keep the onEvent
* calls consistent with the arrival order from the incoming transactions.
* @param {string=} opts.queue.type The type of queue to use when feeding through
* to {@link Bridge~onEvent}. One of: "none", single", "per_room". If "none",
* events are fed through as soon as contextual info is obtained, which may result
* in out of order events but stops HOL blocking. If "single", onEvent calls will
* be in order but may be slower due to HOL blocking. If "per_room", a queue per
* room ID is made which reduces the impact of HOL blocking to be scoped to a room.
* Default: "single".
* @param {boolean=} opts.queue.perRequest True to only feed through the next
* event after the request object in the previous call succeeds or fails. It is
* <b>vital</b> that you consistently resolve/reject the request if this is 'true',
* else you will not get any further events from this queue. To aid debugging this,
* consider setting a delayed listener on the request factory. If false, the mere
* invockation of onEvent is enough to trigger the next event in the queue.
* You probably want to set this to 'true' if your {@link Bridge~onEvent} is
* performing async operations where ordering matters (e.g. messages). Default: false.
* @param {boolean=} opts.disableContext True to disable {@link Bridge~BridgeContext}
* parameters in {@link Bridge~onEvent}. Disabling the context makes the
* bridge do fewer database lookups, but prevents there from being a
* <code>context</code> parameter. Default: false.
* @param {Object=} opts.roomLinkValidation Options to supply to the room link
* validator. If not defined then all room links are accepted.
* @param {string} opts.roomLinkValidation.ruleFile A file containing rules
* on which matrix rooms can be bridged.
* @param {Object=} opts.roomLinkValidation.rules A object containing rules
* on which matrix rooms can be bridged. This is used if ruleFile is undefined.
* @param {boolean=} opts.roomLinkValidation.triggerEndpoint Enable the endpoint
* to trigger a reload of the rules file.
* Default: false
* @param {RoomUpgradeHandler~Options} opts.roomUpgradeOpts Options to supply to
* the room upgrade handler. If not defined then upgrades are NOT handled by the bridge.
*/
function Bridge(opts) {
if (typeof opts !== "object") {
throw new Error("opts must be supplied.");
}
var required = [
"homeserverUrl", "registration", "domain", "controller"
];
required.forEach(function(key) {
if (!opts[key]) {
throw new Error("Missing '" + key + "' in opts.");
}
});
if (typeof opts.controller.onEvent !== "function") {
throw new Error("controller.onEvent is a required function");
}
opts.userStore = opts.userStore || "user-store.db";
opts.roomStore = opts.roomStore || "room-store.db";
opts.eventStore = opts.eventStore || null; // Must be enabled
opts.queue = opts.queue || {};
opts.intentOptions = opts.intentOptions || {};
opts.queue.type = opts.queue.type || "single";
if (opts.queue.perRequest === undefined) {
opts.queue.perRequest = false;
}
if (opts.logRequestOutcome === undefined) {
opts.logRequestOutcome = true;
}
// Default: logger -> log to console
opts.controller.onLog = opts.controller.onLog || function(text, isError) {
if (isError) {
log.error(text);
return;
}
log.info(text);
};
// Default: suppress echo -> True
if (opts.suppressEcho === undefined) {
opts.suppressEcho = true;
}
if (opts.disableContext === undefined) {
opts.disableContext = false;
}
// we'll init these at runtime
this.appService = null;
this.opts = opts;
this._clientFactory = null;
this._botClient = null;
this._appServiceBot = null;
this._requestFactory = null;
this._botIntent = null;
this._intents = {
// user_id + request_id : Intent
};
this._intentLastAccessed = Object.create(null); // user_id + request_id : timestamp
this._intentLastAccessedTimeout = null;
this._powerLevelMap = {
// room_id: event.content
};
this._membershipCache = opts.membershipCache || new MembershipCache();
this._intentBackingStore = {
setMembership: this._membershipCache.setMemberEntry.bind(this._membershipCache),
setPowerLevelContent: this._setPowerLevelEntry.bind(this),
getMembership: this._membershipCache.getMemberEntry.bind(this._membershipCache),
getPowerLevelContent: this._getPowerLevelEntry.bind(this)
};
this._queue = EventQueue.create(this.opts.queue, this._onConsume.bind(this));
this._prevRequestPromise = Promise.resolve();
this._metrics = null; // an optional PrometheusMetrics instance
this._roomLinkValidator = null;
if (opts.roomUpgradeOpts) {
this.opts.roomUpgradeOpts.consumeEvent = opts.roomUpgradeOpts.consumeEvent !== false ? true : false;
this._roomUpgradeHandler = new RoomUpgradeHandler(opts.roomUpgradeOpts, this);
}
else {
this._roomUpgradeHandler = null;
}
}
/**
* Load the user and room databases. Access them via getUserStore() and getRoomStore().
* @return {Promise} Resolved/rejected when the user/room databases have been loaded.
*/
Bridge.prototype.loadDatabases = function() {
var self = this;
// Load up the databases if they provided file paths to them (or defaults)
if (typeof self.opts.userStore === "string") {
self.opts.userStore = loadDatabase(self.opts.userStore, UserBridgeStore);
}
if (typeof self.opts.roomStore === "string") {
self.opts.roomStore = loadDatabase(self.opts.roomStore, RoomBridgeStore);
}
if (typeof self.opts.eventStore === "string") {
self.opts.eventStore = loadDatabase(self.opts.eventStore, EventBridgeStore);
}
// This works because if they provided a string we converted it to a Promise
// which will be resolved when we have the db instance. If they provided a
// db instance then this will resolve immediately.
return Promise.all([
Promise.resolve(self.opts.userStore).then(function(db) {
self._userStore = db;
}),
Promise.resolve(self.opts.roomStore).then(function(db) {
self._roomStore = db;
}),
Promise.resolve(self.opts.eventStore).then(function(db) {
self._eventStore = db;
})
]);
};
/**
* Run the bridge (start listening)
* @param {Number} port The port to listen on.
* @param {Object} config Configuration options
* @param {AppService=} appServiceInstance The AppService instance to attach to.
* If not provided, one will be created.
*/
Bridge.prototype.run = function(port, config, appServiceInstance) {
var self = this;
// Load the registration file into an AppServiceRegistration object.
if (typeof self.opts.registration === "string") {
var regObj = yaml.safeLoad(fs.readFileSync(self.opts.registration, 'utf8'));
self.opts.registration = AppServiceRegistration.fromObject(regObj);
if (self.opts.registration === null) {
throw new Error("Failed to parse registration file");
}
}
this._clientFactory = self.opts.clientFactory || new ClientFactory({
url: self.opts.homeserverUrl,
token: self.opts.registration.as_token,
appServiceUserId: (
"@" + self.opts.registration.sender_localpart + ":" + self.opts.domain
),
clientSchedulerBuilder: function() {
return new MatrixScheduler(retryAlgorithm, queueAlgorithm);
},
});
this._clientFactory.setLogFunction(function(text, isErr) {
if (!self.opts.controller.onLog) {
return;
}
self.opts.controller.onLog(text, isErr);
});
this._botClient = this._clientFactory.getClientAs();
this._appServiceBot = new AppServiceBot(
this._botClient, self.opts.registration, this._membershipCache
);
if (this.opts.roomLinkValidation !== undefined) {
this._roomLinkValidator = new RoomLinkValidator(
this.opts.roomLinkValidation,
this._appServiceBot
);
}
this._requestFactory = new RequestFactory();
if (this.opts.controller.onLog && this.opts.logRequestOutcome) {
this._requestFactory.addDefaultResolveCallback(function(req, res) {
self.opts.controller.onLog(
"[" + req.getId() + "] SUCCESS (" + req.getDuration() + "ms)"
);
});
this._requestFactory.addDefaultRejectCallback(function(req, err) {
self.opts.controller.onLog(
"[" + req.getId() + "] FAILED (" + req.getDuration() + "ms) " +
(err ? util.inspect(err) : "")
);
});
}
var botIntentOpts = {
registered: true,
backingStore: this._intentBackingStore,
};
if (this.opts.intentOptions.bot) { // copy across opts
Object.keys(this.opts.intentOptions.bot).forEach(function(k) {
botIntentOpts[k] = self.opts.intentOptions.bot[k];
});
}
this._botIntent = new Intent(this._botClient, this._botClient, botIntentOpts);
this._intents = {
// user_id + request_id : Intent
};
this.appService = appServiceInstance || new AppService({
homeserverToken: this.opts.registration.getHomeserverToken()
});
this.appService.onUserQuery = this._onUserQuery.bind(this);
this.appService.onAliasQuery = this._onAliasQuery.bind(this);
this.appService.on("event", this._onEvent.bind(this));
this.appService.on("http-log", function(line) {
if (!self.opts.controller.onLog) {
return;
}
self.opts.controller.onLog(line, false);
});
this._customiseAppservice();
this._setupIntentCulling();
if (this._metrics) {
this._metrics.addAppServicePath(this);
}
this.appService.listen(port);
return this.loadDatabases();
};
/**
* Apply any customisations required on the appService object.
*/
Bridge.prototype._customiseAppservice = function() {
if (this.opts.controller.thirdPartyLookup) {
this._customiseAppserviceThirdPartyLookup(this.opts.controller.thirdPartyLookup);
}
if (this.opts.roomLinkValidation && this.opts.roomLinkValidation.triggerEndpoint) {
this.addAppServicePath({
method: "POST",
path: "/_bridge/roomLinkValidator/reload",
handler: (req, res) => {
if (!this._requestCheckToken(req, res)) {
return;
}
try {
// Will use filename if provided, or the config
// one otherwised.
this._roomLinkValidator.readRuleFile(req.query.filename);
res.status(200).send("Success");
}
catch (e) {
res.status(500).send("Failed: " + e);
}
},
});
}
};
// Set a timer going which will periodically remove Intent objects to prevent
// them from accumulating too much. Removal is based on access time (calls to
// getIntent). Intents expire after INTENT_CULL_EVICT_AFTER_MS of not being called.
Bridge.prototype._setupIntentCulling = function() {
if (this._intentLastAccessedTimeout) {
clearTimeout(this._intentLastAccessedTimeout);
}
var self = this;
this._intentLastAccessedTimeout = setTimeout(function() {
var now = Date.now();
Object.keys(self._intentLastAccessed).forEach(function(key) {
if ((self._intentLastAccessed[key] + INTENT_CULL_EVICT_AFTER_MS) < now) {
delete self._intentLastAccessed[key];
delete self._intents[key];
}
});
self._intentLastAccessedTimeout = null;
// repeat forever. We have no cancellation mechanism but we don't expect
// Bridge objects to be continually recycled so this is fine.
self._setupIntentCulling();
}, INTENT_CULL_CHECK_PERIOD_MS);
}
Bridge.prototype._customiseAppserviceThirdPartyLookup = function(lookupController) {
var protocols = lookupController.protocols || [];
var _respondErr = function(e, res) {
if (typeof e === "object" && e.code && e.err) {
res.status(e.code).json({error: e.err});
}
else {
res.status(500).send("Failed: " + e);
}
}
if (lookupController.getProtocol) {
var getProtocolFunc = lookupController.getProtocol;
this.addAppServicePath({
method: "GET",
path: "/_matrix/app/:ver/thirdparty/protocol/:protocol",
handler: function(req, res) {
if (req.params.ver !== "unstable") {
res.status(404).json(
{err: "Unrecognised API version " + req.params.ver}
);
return;
}
var protocol = req.params.protocol;
if (protocols.length && protocols.indexOf(protocol) === -1) {
res.status(404).json({err: "Unknown 3PN protocol " + protocol});
return;
}
getProtocolFunc(protocol).then(
function(result) { res.status(200).json(result) },
function(e) { _respondErr(e, res) }
);
},
});
}
if (lookupController.getLocation) {
var getLocationFunc = lookupController.getLocation;
this.addAppServicePath({
method: "GET",
path: "/_matrix/app/:ver/thirdparty/location/:protocol",
handler: function(req, res) {
if (req.params.ver !== "unstable") {
res.status(404).json(
{err: "Unrecognised API version " + req.params.ver}
);
return;
}
var protocol = req.params.protocol;
if (protocols.length && protocols.indexOf(protocol) === -1) {
res.status(404).json({err: "Unknown 3PN protocol " + protocol});
return;
}
getLocationFunc(protocol, req.query).then(
function(result) { res.status(200).json(result) },
function(e) { _respondErr(e, res) }
);
},
});
}
if (lookupController.parseLocation) {
var parseLocationFunc = lookupController.parseLocation;
this.addAppServicePath({
method: "GET",
path: "/_matrix/app/:ver/thirdparty/location",
handler: function(req, res) {
if (req.params.ver !== "unstable") {
res.status(404).json(
{err: "Unrecognised API version " + req.params.ver}
);
return;
}
var alias = req.query.alias;
if (!alias) {
res.status(400).send({err: "Missing 'alias' parameter"});
return;
}
parseLocationFunc(alias).then(
function(result) { res.status(200).json(result) },
function(e) { _respondErr(e, res) }
);
},
});
}
if (lookupController.getUser) {
var getUserFunc = lookupController.getUser;
this.addAppServicePath({
method: "GET",
path: "/_matrix/app/:ver/thirdparty/user/:protocol",
handler: function(req, res) {
if (req.params.ver !== "unstable") {
res.status(404).json(
{err: "Unrecognised API version " + req.params.ver}
);
return;
}
var protocol = req.params.protocol;
if (protocols.length && protocols.indexOf(protocol) === -1) {
res.status(404).json({err: "Unknown 3PN protocol " + protocol});
return;
}
getUserFunc(protocol, req.query).then(
function(result) { res.status(200).json(result) },
function(e) { _respondErr(e, res) }
);
}
});
}
if (lookupController.parseUser) {
var parseUserFunc = lookupController.parseUser;
this.addAppServicePath({
method: "GET",
path: "/_matrix/app/:ver/thirdparty/user",
handler: function(req, res) {
if (req.params.ver !== "unstable") {
res.status(404).json(
{err: "Unrecognised API version " + req.params.ver}
);
return;
}
var userid = req.query.userid;
if (!userid) {
res.status(400).send({err: "Missing 'userid' parameter"});
return;
}
parseUserFunc(userid).then(
function(result) { res.status(200).json(result) },
function(e) { _respondErr(e, res) }
);
},
});
}
};
/**
* Install a custom handler for an incoming HTTP API request. This allows
* callers to add extra functionality, implement new APIs, etc...
* @param {Object} opts Named options
* @param {string} opts.method The HTTP method name.
* @param {string} opts.path Path to the endpoint.
* @param {Bridge~appServicePathHandler} opts.handler Function to handle requests
* to this endpoint.
*/
Bridge.prototype.addAppServicePath = function(opts) {
// TODO(paul): This is gut-wrenching into the AppService instance itself.
// Maybe an API on that object would be good?
var app = this.appService.app;
// TODO(paul): Consider more options:
// opts.versions - automatic version filtering and rejecting of
// unrecognised API versions
// Consider automatic "/_matrix/app/:version" path prefix
app[opts.method.toLowerCase()](opts.path, opts.handler);
};
/**
* Retrieve the connected room store instance.
* @return {?RoomBridgeStore} The connected instance ready for querying.
*/
Bridge.prototype.getRoomStore = function() {
return this._roomStore;
};
/**
* Retrieve the connected user store instance.
* @return {?UserBridgeStore} The connected instance ready for querying.
*/
Bridge.prototype.getUserStore = function() {
return this._userStore;
};
/**
* Retrieve the connected event store instance, if one was configured.
* @return {?EventBridgeStore} The connected instance ready for querying.
*/
Bridge.prototype.getEventStore = function() {
return this._eventStore;
};
/**
* Retrieve the request factory used to create incoming requests.
* @return {RequestFactory}
*/
Bridge.prototype.getRequestFactory = function() {
return this._requestFactory;
};
/**
* Retrieve the matrix client factory used when sending matrix requests.
* @return {ClientFactory}
*/
Bridge.prototype.getClientFactory = function() {
return this._clientFactory;
};
/**
* Get the AS bot instance.
* @return {AppServiceBot}
*/
Bridge.prototype.getBot = function() {
return this._appServiceBot;
};
/**
* Determines whether a room should be provisoned based on
* user provided rules and the room state. Will default to true
* if no rules have been provided.
* @param {string} roomId The room to check.
* @param {boolean} cache Should the validator check it's cache.
* @returns {Promise} resolves if can and rejects if it cannot.
* A status code is returned on both.
*/
Bridge.prototype.canProvisionRoom = function(roomId, cache=true) {
if (this._roomLinkValidator === null) {
return Promise.resolve(RLVStatus.PASSED);
}
return this._roomLinkValidator.validateRoom(roomId, cache);
}
Bridge.prototype.getRoomLinkValidator = function() {
return this._roomLinkValidator;
}
/**
* Retrieve an Intent instance for the specified user ID. If no ID is given, an
* instance for the bot itself is returned.
* @param {?string} userId The user ID to get an Intent for.
* @param {Request=} request Optional. The request instance to tie the MatrixClient
* instance to. Useful for logging contextual request IDs.
* @return {Intent} The intent instance
*/
Bridge.prototype.getIntent = function(userId, request) {
if (!userId) {
return this._botIntent;
}
if (this.opts.escapeUserIds === undefined || this.opts.escapeUserIds) {
userId = new MatrixUser(userId).getId(); // Escape the ID
}
const key = userId + (request ? request.getId() : "");
if (!this._intents[key]) {
const client = this._clientFactory.getClientAs(userId, request);
const clientIntentOpts = {
backingStore: this._intentBackingStore
};
if (this.opts.intentOptions.clients) {
Object.keys(this.opts.intentOptions.clients).forEach((k) => {
clientIntentOpts[k] = this.opts.intentOptions.clients[k];
});
}
clientIntentOpts.registered = this._membershipCache.isUserRegistered(userId);
this._intents[key] = new Intent(client, this._botClient, clientIntentOpts);
}
this._intentLastAccessed[key] = Date.now();
return this._intents[key];
};
/**
* Retrieve an Intent instance for the specified user ID localpart. This <i>must
* be the complete user localpart</i>.
* @param {?string} localpart The user ID localpart to get an Intent for.
* @param {Request=} request Optional. The request instance to tie the MatrixClient
* instance to. Useful for logging contextual request IDs.
* @return {Intent} The intent instance
*/
Bridge.prototype.getIntentFromLocalpart = function(localpart, request) {
return this.getIntent(
"@" + localpart + ":" + this.opts.domain
);
};
/**
* Provision a user on the homeserver.
* @param {MatrixUser} matrixUser The virtual user to be provisioned.
* @param {Bridge~ProvisionedUser} provisionedUser Provisioning information.
* @return {Promise} Resolved when provisioned.
*/
Bridge.prototype.provisionUser = function(matrixUser, provisionedUser) {
var self = this;
var promise = self._botClient.register(matrixUser.localpart).then(function() {
return self._userStore.setMatrixUser(matrixUser);
});
// storage promise chain
if (provisionedUser.remote) {
promise = promise.then(function() {
return self._userStore.linkUsers(
matrixUser, provisionedUser.remote
);
});
}
// HTTP promise chain
var newUser = self._clientFactory.getClientAs(matrixUser.getId());
if (provisionedUser.name) {
promise = promise.then(function() {
return newUser.setDisplayName(provisionedUser.name);
});
}
if (provisionedUser.url) {
promise = promise.then(function() {
return newUser.setAvatarUrl(provisionedUser.url);
});
}
return promise;
};
Bridge.prototype._onUserQuery = function(userId) {
var self = this;
if (self.opts.controller.onUserQuery) {
var matrixUser = new MatrixUser(userId);
return Promise.resolve(
self.opts.controller.onUserQuery(matrixUser)
).then(function(provisionedUser) {
if (!provisionedUser) {
throw new Error("Not provisioning user for this ID");
}
return self.provisionUser(matrixUser, provisionedUser);
});
}
return Promise.resolve();
};
Bridge.prototype._onAliasQuery = function(alias) {
var self = this;
var remoteRoom = null;
var roomId;
if (self.opts.controller.onAliasQuery) {
return Promise.resolve(
self.opts.controller.onAliasQuery(alias, alias.split(":")[0].substring(1))
).then(function(provisionedRoom) {
if (!provisionedRoom) {
throw new Error("Not provisioning room for this alias");
}
// do the HTTP hit
remoteRoom = provisionedRoom.remote;
return self._botClient.createRoom(
provisionedRoom.creationOpts
);
}).then(function(createRoomResponse) {
// persist the mapping in the store
roomId = createRoomResponse.room_id;
var matrixRoom = new MatrixRoom(roomId);
if (remoteRoom) {
return self._roomStore.linkRooms(matrixRoom, remoteRoom);
}
// store the matrix room only
return self._roomStore.setMatrixRoom(matrixRoom);
}).then(function() {
if (self.opts.controller.onAliasQueried) {
self.opts.controller.onAliasQueried(alias, roomId);
}
});
}
return Promise.resolve();
};
// returns a Promise for the request linked to this event for testing.
Bridge.prototype._onEvent = async function(event) {
const isCanonicalState = event.state_key === "";
this._updateIntents(event);
if (this.opts.suppressEcho &&
this.opts.registration.isUserMatch(event.user_id, true)) {
return null;
}
if (this._roomUpgradeHandler) {
// m.room.tombstone is the event that signals a room upgrade.
if (event.type === "m.room.tombstone" && isCanonicalState && this._roomUpgradeHandler) {
this._roomUpgradeHandler.onTombstone(event);
if (this.opts.roomUpgradeOpts.consumeEvent) {
return null;
}
}
else if (event.type === "m.room.member" &&
event.state_key === this._appServiceBot.getUserId() &&
event.content.membership === "invite") {
// A invite-only room that has been upgraded won't have been joinable,
// so we are listening for any invites to the new room.
const isUpgradeInvite = this._roomUpgradeHandler.onInvite(event);
if (isUpgradeInvite &&
this.opts.roomUpgradeOpts.consumeEvent) {
return null;
}
}
}
const request = this._requestFactory.newRequest({ data: event });
const contextReady = this._getBridgeContext(event);
const dataReady = contextReady.then(context => ({ request, context }));
const dataReadyLimited = this._limited(dataReady, request);
this._queue.push(event, dataReadyLimited);
this._queue.consume();
dataReady.catch(e => this._handleEventError(event, e));
return (
request
.getPromise()
.catch(EventNotHandledError, e => this._handleEventError(event, e))
.catch(e =>
this.opts.controller.onLog(
`Dropping event ${event.event_id} (${e.name})`,
false
)
)
);
};
/**
* Restricts the promise according to the bridges `perRequest` setting.
*
* `perRequest` enabled:
* Returns a promise similar to `promise`, with the addition of it only
* resolving after `request`.
* `perRequest` disabled:
* Returns the promise unchanged.
*/
Bridge.prototype._limited = async function(promise, request) {
// queue.perRequest controls whether multiple request can be processed by
// the bridge at once.
if (this.opts.queue.perRequest) {
const promiseLimited = this._prevRequestPromise.reflect().return(promise);
this._prevRequestPromise = request.getPromise();
return promiseLimited;
}
return promise;
}
Bridge.prototype._onConsume = function(err, data) {
if (err) {
// The data for the event could not be retrieved.
this.opts.controller.onLog("onEvent failure: " + err, true);
return;
}
this.opts.controller.onEvent(data.request, data.context);
};
Bridge.prototype._getBridgeContext = async function(event) {
if (this.opts.disableContext) {
return null;
}
const context = new BridgeContext({
sender: event.user_id,
target: event.type === "m.room.member" ? event.state_key : null,
room: event.room_id
});
return context.get(this._roomStore, this._userStore);
}
Bridge.prototype._handleEventError = function(event, error) {
if (!(error instanceof EventNotHandledError)) {
error = wrap(error, InternalError);
}
// TODO[V02460@gmail.com]: Send via different means when the bridge bot is
// unavailable. _MSC2162: Signaling Errors at Bridges_ will have details on
// how this should be done.
this._botIntent.unstableSignalBridgeError(
event.room_id,
event.event_id,
this.opts.networkName,
error.reason,
this._getUserRegex(),
);
};
/**
* Returns a regex matching all users of the bridge.
*
* @return {string} Super regex composed of all user regexes.
*/
Bridge.prototype._getUserRegex = function() {
const reg = this.opts.registration;
return reg.namespaces["users"].map(o => o.regex);
};
Bridge.prototype._updateIntents = function(event) {
if (event.type === "m.room.member") {
this._membershipCache.setMemberEntry(
event.room_id,
event.state_key,
event.content ? event.content.membership : null
);
}
else if (event.type === "m.room.power_levels") {
this._setPowerLevelEntry(event.room_id, event.content);
}
};
Bridge.prototype._setPowerLevelEntry = function(roomId, content) {
this._powerLevelMap[roomId] = content;
};
Bridge.prototype._getPowerLevelEntry = function(roomId) {
return this._powerLevelMap[roomId];
};
/**
* Returns a PrometheusMetrics instance stored on the bridge, creating it first
* if required. The instance will be registered with the HTTP server so it can
* serve the "/metrics" page in the usual way.
* The instance will automatically register the Matrix SDK metrics by calling
* {PrometheusMetrics~registerMatrixSdkMetrics}.
*/
Bridge.prototype.getPrometheusMetrics = function() {
if (this._metrics) {
return this._metrics;
}
var metrics = this._metrics = new PrometheusMetrics();
metrics.registerMatrixSdkMetrics();
// TODO(paul): register some bridge-wide standard ones here
// In case we're called after .run()
if (this.appService) {
metrics.addAppServicePath(this);
}
return metrics;
};
/**
* A convenient shortcut to calling registerBridgeGauges() on the
* PrometheusMetrics instance directly. This version will supply the value of
* the matrixGhosts field if the counter function did not return it, for
* convenience.
* @param {PrometheusMetrics~BridgeGaugesCallback} counterFunc A function that
* when invoked returns the current counts of various items in the bridge.
*
* @example
* bridge.registerBridgeGauges(() => {
* return {
* matrixRoomConfigs: Object.keys(this.matrixRooms).length,
* remoteRoomConfigs: Object.keys(this.remoteRooms).length,
*
* remoteGhosts: Object.keys(this.remoteGhosts).length,
*
* ...
* }
* })
*/
Bridge.prototype.registerBridgeGauges = function(counterFunc) {
var self = this;
this.getPrometheusMetrics().registerBridgeGauges(function() {
var counts = counterFunc();
if (!("matrixGhosts" in counts)) {
counts.matrixGhosts = Object.keys(self._intents).length;
}
return counts;
});
};
Bridge.prototype._requestCheckToken = function(req, res) {
if (req.query.access_token !== this.opts.registration.hs_token) {
res.status(403).send({
errcode: "M_FORBIDDEN",
error: "Bad token supplied,"
});
return false;
}
return true;
}
function loadDatabase(path, Cls) {
var defer = Promise.defer();
var db = new Datastore({
filename: path,
autoload: true,
onload: function(err) {
if (err) {
defer.reject(err);
}
else {
defer.resolve(new Cls(db));
}
}
});
return defer.promise;
}
function retryAlgorithm(event, attempts, err) {
if (err.httpStatus === 400 || err.httpStatus === 403 || err.httpStatus === 401) {
// client error; no amount of retrying with save you now.
return -1;
}
// we ship with browser-request which returns { cors: rejected } when trying
// with no connection, so if we match that, give up since they have no conn.
if (err.cors === "rejected") {
return -1;
}
if (err.name === "M_LIMIT_EXCEEDED") {
var waitTime = err.data.retry_after_ms;
if (waitTime) {
return waitTime;
}
}
if (attempts > 4) {
return -1; // give up
}
return 1000 + (1000 * attempts);
}
function queueAlgorithm(event) {
if (event.getType() === "m.room.message") {
// use a separate queue for each room ID
return "message_" + event.getRoomId();
}
// allow all other events continue concurrently.
return null;
}
module.exports = Bridge;
/**
* @typedef Bridge~ProvisionedUser
* @type {Object}
* @property {string=} name The display name to set for the provisioned user.
* @property {string=} url The avatar URL to set for the provisioned user.
* @property {RemoteUser=} remote The remote user to link to the provisioned user.
*/
/**
* @typedef Bridge~ProvisionedRoom
* @type {Object}
* @property {Object} creationOpts Room creation options to use when creating the
* room. Required.
* @property {RemoteRoom=} remote The remote room to link to the provisioned room.
*/
/**
* Invoked when the bridge receives a user query from the homeserver. Supports
* both sync return values and async return values via promises.
* @callback Bridge~onUserQuery
* @param {MatrixUser} matrixUser The matrix user queried. Use <code>getId()</code>
* to get the user ID.
* @return {?Bridge~ProvisionedUser|Promise<Bridge~ProvisionedUser, Error>}
* Reject the promise / return null to not provision the user. Resolve the
* promise / return a {@link Bridge~ProvisionedUser} object to provision the user.
* @example
* new Bridge({
* controller: {
* onUserQuery: function(matrixUser) {
* var remoteUser = new RemoteUser("some_remote_id");
* return {
* name: matrixUser.localpart + " (Bridged)",
* url: "http://someurl.com/pic.jpg",
* user: remoteUser
* };
* }
* }
* });
*/
/**
* Invoked when the bridge receives an alias query from the homeserver. Supports
* both sync return values and async return values via promises.
* @callback Bridge~onAliasQuery
* @param {string} alias The alias queried.
* @param {string} aliasLocalpart The parsed localpart of the alias.
* @return {?Bridge~ProvisionedRoom|Promise<Bridge~ProvisionedRoom, Error>}
* Reject the promise / return null to not provision the room. Resolve the
* promise / return a {@link Bridge~ProvisionedRoom} object to provision the room.
* @example
* new Bridge({
* controller: {
* onAliasQuery: function(alias, aliasLocalpart) {
* return {
* creationOpts: {
* room_alias_name: aliasLocalpart, // IMPORTANT: must be set to make the link
* name: aliasLocalpart,
* topic: "Auto-generated bridged room"
* }
* };
* }
* }
* });
*/
/**
* Invoked when a response is returned from onAliasQuery. Supports
* both sync return values and async return values via promises.
* @callback Bridge~onAliasQueried
* @param {string} alias The alias queried.
* @param {string} roomId The parsed localpart of the alias.
*/
/**
* @callback Bridge~onRoomUpgrade
* @param {string} oldRoomId The roomId of the old room.
* @param {string} newRoomId The roomId of the new room.
* @param {string} newVersion The new room version.
* @param {Bridge~BridgeContext} context Context for the upgrade event.
*/
/**
* Invoked when the bridge receives an event from the homeserver.
* @callback Bridge~onEvent
* @param {Request} request The request to resolve or reject depending on the
* outcome of this request. The 'data' attached to this Request is the raw event
* JSON received (accessed via <code>request.getData()</code>)
* @param {Bridge~BridgeContext} context Context for this event, including
* instantiated client instances.
*/
/**
* Invoked when the bridge is attempting to log something.
* @callback Bridge~onLog
* @param {string} line The text to be logged.
* @param {boolean} isError True if this line should be treated as an error msg.
*/
/**
* Handler function for custom applied HTTP API request paths. This is invoked
* as defined by expressjs.
* @callback Bridge~appServicePathHandler
* @param {Request} req An expressjs Request object the handler can use to
* inspect the incoming request.
* @param {Response} res An expressjs Response object the handler can use to
* send the outgoing response.
*/
/**
* @typedef Bridge~thirdPartyLookup
* @type {Object}
* @property {string[]} protocols Optional list of recognised protocol names.
* If present, lookups for unrecognised protocols will be automatically
* rejected.
* @property {Bridge~getProtocol} getProtocol Function. Called for requests
* for 3PE query metadata.
* @property {Bridge~getLocation} getLocation Function. Called for requests
* for 3PLs.
* @property {Bridge~parseLocation} parseLocation Function. Called for reverse
* parse requests on 3PL aliases.
* @property {Bridge~getUser} getUser Function. Called for requests for 3PUs.
* @property {Bridge~parseUser} parseUser Function. Called for reverse parse
* requests on 3PU user IDs.
*/
/**
* Invoked on requests for 3PE query metadata
* @callback Bridge~getProtocol
* @param {string} protocol The name of the 3PE protocol to query
* @return {Promise<Bridge~thirdPartyProtocolResult>} A Promise of metadata
* about 3PE queries that can be made for this protocol.
*/
/**
* Returned by getProtocol third-party query metadata requests
* @typedef Bridge~thirdPartyProtocolResult
* @type {Object}
* @property {string[]} [location_fields] Names of the fields required for
* location lookups if location queries are supported.
* @property {string[]} [user_fields] Names of the fields required for user
* lookups if user queries are supported.
/**
* Invoked on requests for 3PLs
* @callback Bridge~getLocation
* @param {string} protocol The name of the 3PE protocol
* @param {Object} fields The location query field data as specified by the
* specific protocol.
* @return {Promise<Bridge~thirdPartyLocationResult[]>} A Promise of a list of
* 3PL lookup results.
*/
/**
* Invoked on requests to parse 3PL aliases
* @callback Bridge~parseLocation
* @param {string} alias The room alias to parse.
* @return {Promise<Bridge~thirdPartyLocationResult[]>} A Promise of a list of
* 3PL lookup results.
*/
/**
* Returned by getLocation and parseLocation third-party location lookups
* @typedef Bridge~thirdPartyLocationResult
* @type {Object}
* @property {string} alias The Matrix room alias to the portal room
* representing this 3PL
* @property {string} protocol The name of the 3PE protocol
* @property {object} fields The normalised values of the location query field
* data.
*/
/**
* Invoked on requests for 3PUs
* @callback Bridge~getUser
* @param {string} protocol The name of the 3PE protocol
* @param {Object} fields The user query field data as specified by the
* specific protocol.
* @return {Promise<Bridge~thirdPartyUserResult[]>} A Promise of a list of 3PU
* lookup results.
*/
/**
* Invoked on requests to parse 3PU user IDs
* @callback Bridge~parseUser
* @param {string} userid The user ID to parse.
* @return {Promise<Bridge~thirdPartyUserResult[]>} A Promise of a list of 3PU
* lookup results.
*/
/**
* Returned by getUser and parseUser third-party user lookups
* @typedef Bridge~thirdPartyUserResult
* @type {Object}
* @property {string} userid The Matrix user ID for the ghost representing
* this 3PU
* @property {string} protocol The name of the 3PE protocol
* @property {object} fields The normalised values of the user query field
* data.
*/