The package @axinom/mosaic-message-bus encapsulates the RabbitMQ setup and initialization logic. It offers ways to easily and consistently manage reliable messaging between backend processes and across service boundaries

@axinom/mosaic-message-bus

The mosaic-message-bus package implements the Mosaic messaging pattern in a robust and reliable way. Please read up on the messaging concept to familiarize yourself with this approach. The library includes the functionality to set up the RabbitMQ connection, queues, bindings, etc. You use it to send messages, handle incoming messages, and have automatic retries and redeliveries.

The messages that you are sending and receiving are defined in messaging packages. These packages include also message routing instructions, queue bindings, and other messaging settings.

Axinom provides the @axinom/mosaic-messages which contains all the events and commands that the Axinom Mosaic services are sending and receiving. Each of your projects should have its own customizable messages package with your events, commands, and messaging settings. You can find more information on how to implement or extend your messaging in the messaging usage how-to guide.

Setup

The library offers one central entry point which is the setupMessagingBroker function. It is used to initialize the messaging broker and the bindings for all the message handlers.

You supply it with the required configuration parameters on how to connect to the RabbitMQ server, builders, and optional middleware. All message handlers and the messages that are sent are configured via the RascalConfigBuilder.

A basic usage example of the library:

// general setup (plus database pools and other setup-related work)
const app = express();
const config = getFullConfig();
const counter = initMessagingCounter(getOwnerPgPool(app));

// register all your builders to send or receive events/commands:
const builders = [new RascalConfigBuilder(MediaServiceMessagingSettings.StartIngest, config)
  .sendCommand()
  .subscribeForCommand<StartIngestCommand>(
    (broker: Broker) =>
      new IngestHandler(broker, getLoginPgPool(app), config),
  ),
  // ...
  ];

const broker = await setupMessagingBroker({
  app,
  config,
  builders,
  components: { counters: { postgresCounter: counter } },
  // ...
});
Table 1. Input object definitions
Property Description

app

A MessagingRegistry object to store and share objects by keys e.g. the express app.

config

The MessagingConfig object with settings on how to connect to the RabbitMQ server, vhost, and other similar settings.

builders

A list of RascalConfigBuilder configuration builder objects. They are responsible to configure the message bindings for sending and receiving events and commands in your code.

components

Extension possibility for the broker initialization. In this example, a message counter is added to protect from "poisonous" messages.

onMessageMiddleware

An array of OnMessageMiddleware middleware objects. Each of them will be called before a message is given to the corresponding message handler.

logger

A Mosaic logger instance used during setup and for error/debug logging during message processing.

shutdownActions

Used to nicely shut down the message broker and RabbitMQ connections when your application is being shut down.

rascalConfigExportPath

A relative path from your project root to a folder where the generated config JSON will be saved to.

Sending Messages

Messages can be sent via the Broker instance from anywhere in your code. This can be either events or commands.

Publishing an event or command message:
await this.broker.publish(
  MediaServiceMessagingSettings.IngestStarted.messageType,
  {
      entity_id: 123,
      entity_type: 'movie'
  },
  {
      auth_token: my_auth_token,
  },
);
Table 2. Publish method parameters
Property Description

key

The RabbitMQ routing key. If you follow the Mosaic conventions this key is defined in your AsyncAPI document and exposed in your custom messaging settings.

payload

The business-relevant part of the message. This will be added as the payload property when sending the RabbitMQ message.

envelopeOverrides

Explicitly defined message envelope values, e.g. an auth token or message context information. (optional)

options

Extra message options, e.g. if additional header values should be passed in the RabbitMQ message.

Receiving Messages

Incoming messages are handled by a message handler. It is strongly advised to inherit from the GuardedMessageHandler that is part of the @axinom/mosaic-id-guard library. This adds support for secure permission checks.

Message Handler

Message Handler structure
export class StartIngestHandler extends GuardedMessageHandler<StartIngestCommand> {
  constructor(
    private readonly broker: Broker,
    private readonly loginPool: LoginPgPool,
    config: Config,
  ) {
    super(
      MediaServiceMessagingSettings.DeleteEntity.messageType,
      [
        'ADMIN',
        'COLLECTIONS_EDIT',
        'MOVIES_EDIT',
        'SETTINGS_EDIT',
        'TVSHOWS_EDIT',
      ],
      config.serviceId,
      {
        tenantId: config.tenantId,
        environmentId: config.environmentId,
        authEndpoint: config.idServiceAuthBaseUrl,
      },
    );
  }


  public async onMessage(
    content: StartIngestCommand,
    message: MessageInfo,
  ): Promise<void> {
    // handler logic
  }

  public async onMessageFailure(
    content: StartIngestCommand,
    message: MessageInfo,
    error: Error,
  ): Promise<void> {
    // final error handling logic
  }
}

Initialize a Message Handler

Table 3. Constructor parameters
Property Description

messageType

The type of message that this message handler handles.

permissions

A set of permissions that allow the usage of this message handler. The message sender authentication token must include at least one of them in order for the message to be processed.

serviceId

The ID of the current service. This is used to check the permissions of the message sender.

authEndpoint

The authorization endpoint URL to get the JWT signing public key from to verify the authentication tokens.

overrides

Custom adjustments to overwrite Rascal subscription configuration settings.

middleware

Middleware functions that should be executed for this specific message handler in addition to the global ones.

Handle a Message

The "onMessage" function is the core part of a message handler. You add the message processing logic into this function. By default, a message will be acknowledged when the "onMessage" function does not throw an error. Otherwise it will not be acknowledged and be put back into the RabbitMQ queue.

Table 4. onMessage parameters
Property Description

payload

The business-relevant payload of the message

message

The full message envelope with all related message metadata.

ackOrNack

Acknowledgment or rejection callback that can be called to forcefully complete the message processing.

Messaging Error Handling.

The Mosaic Message Bus library takes care of the redelivery of messages if the handler fails to process them. It will redeliver the message for up to ten attempts in case of a message handling failure. When the final attempt should fail, the onMessageFailure function is called and the message is (by default) moved to the dead-letter queue.

Table 5. onMessageFailure parameters
Property Description

payload

The business-relevant payload of the message

message

The full message envelope with all related message metadata.

error

The last error that was thrown when trying to execute the "onMessage" function.