Source: components/event-queue.js

  1. const Bluebird = require("bluebird");
  2. /**
  3. * Handles the processing order of incoming Matrix events.
  4. *
  5. * Events can be pushed to the queue and will be processed when their
  6. * corresponding data is ready and they are at the head of line.
  7. * Different types of queues can be chosen for the processing order of events.
  8. *
  9. * Abstract Base Class. Use the factory method `create` to create new instances.
  10. */
  11. class EventQueue {
  12. /**
  13. * Private constructor.
  14. *
  15. * @constructor
  16. * @param {"none"|"single"|"per_room"} type The type of event queue to create.
  17. * @param {consumeCallback} consumeFn Function which is called when an event
  18. * is consumed.
  19. */
  20. constructor(type, consumeFn) {
  21. this.type = type;
  22. this._queues = {
  23. // $identifier: {
  24. // events: [ {dataReady: } ],
  25. // consuming: true|false
  26. // }
  27. };
  28. this.consumeFn = consumeFn;
  29. }
  30. /**
  31. * Push the event and its related data to the queue.
  32. *
  33. * @param {IMatrixEvent} event The event to enqueue.
  34. * @param {Promise<object>} dataReady Promise containing data related to the event.
  35. */
  36. push(event, dataReady) {
  37. const queue = this._getQueue(event);
  38. queue.events.push({
  39. dataReady: dataReady
  40. });
  41. }
  42. _getQueue(event) {
  43. const identifier = this.type === "per_room" ? event.room_id : "none";
  44. if (!this._queues[identifier]) {
  45. this._queues[identifier] = {
  46. events: [],
  47. consuming: false
  48. };
  49. }
  50. return this._queues[identifier];
  51. }
  52. /**
  53. * Starts consuming the queue.
  54. *
  55. * As long as events are enqueued they will continue to be consumed.
  56. */
  57. consume() {
  58. Object.keys(this._queues).forEach((identifier) => {
  59. if (!this._queues[identifier].consuming) {
  60. this._queues[identifier].consuming = true;
  61. this._takeNext(identifier);
  62. }
  63. });
  64. }
  65. _takeNext(identifier) {
  66. const events = this._queues[identifier].events;
  67. if (events.length === 0) {
  68. this._queues[identifier].consuming = false;
  69. return;
  70. }
  71. const entry = events.shift();
  72. Bluebird.resolve(entry.dataReady).asCallback(this.consumeFn);
  73. entry.dataReady.finally(() => this._takeNext(identifier));
  74. }
  75. /**
  76. * Factory for EventQueues.
  77. *
  78. * @param {"none"|"single"|"per_room"} opts.type Type of the queue to create.
  79. * @param {consumeCallback} consumeFn Function which is called when an event
  80. * is consumed.
  81. * @return {EventQueue} The newly created EventQueue.
  82. */
  83. static create(opts, consumeFn) {
  84. const type = opts.type;
  85. /* eslint-disable no-use-before-define */
  86. if (type == "single") {
  87. return new EventQueueSingle(consumeFn);
  88. }
  89. if (type == "per_room") {
  90. return new EventQueuePerRoom(consumeFn);
  91. }
  92. if (type == "none") {
  93. return new EventQueueNone(consumeFn);
  94. }
  95. /* eslint-enable no-use-before-define */
  96. throw Error(`Invalid EventQueue type '${type}'.`);
  97. }
  98. }
  99. /**
  100. * EventQueue for which all events are enqueued in their order of arrival.
  101. *
  102. * The foremost event is processed as soon as its data is available.
  103. */
  104. class EventQueueSingle extends EventQueue {
  105. constructor(consumeFn) {
  106. super("single", consumeFn);
  107. }
  108. }
  109. /**
  110. * EventQueue for which one queue per room is utilized.
  111. *
  112. * Events at the head of line are processed as soon as their data is available.
  113. */
  114. class EventQueuePerRoom extends EventQueue {
  115. constructor(consumeFn) {
  116. super("per_room", consumeFn);
  117. }
  118. }
  119. /**
  120. * Dummy EventQueue for which no queue is utilized.
  121. *
  122. * Every event is handled as soon as its data is available.
  123. */
  124. class EventQueueNone extends EventQueue {
  125. constructor(consumeFn) {
  126. super("none", consumeFn);
  127. }
  128. push(event, dataReady) {
  129. // consume the event instantly
  130. Bluebird.resolve(dataReady).asCallback(this.consumeFn);
  131. }
  132. consume() {
  133. // no-op for EventQueueNone
  134. }
  135. }
  136. /**
  137. * @callback consumeCallback
  138. * @param {Error} [err] The error in case the data could not be retrieved.
  139. * @param {object} data The data associated with the consumed event.
  140. */
  141. module.exports = {
  142. EventQueue,
  143. EventQueueSingle,
  144. EventQueuePerRoom,
  145. EventQueueNone,
  146. };