The Axinom Mosaic platform uses RabbitMQ as the message broker to deliver messages between its different parts. Read on to learn more about the messaging concepts.

Messaging

Overview

Messaging is the concept of how different parts of a service or different services altogether connect and communicate with each other. The Mosaic platform uses RabbitMQ as the message broker that implements all the required options to manage and distribute messages.

This document explains the messaging concepts and what is going on "under the hood". In your daily work, most of this is encapsulated by the Mosaic message bus, so you don’t have to worry about it.

your application codePublishersends messagesConsumer Asubscribes to a queueConsumer Bfetches/pulls from a queueBrokerThe RabbitMQ applicationVirtual Hosts (Vhosts)logical groupingcontext for permissionsConnectionTCP-based long-lived connectionChannellightweight connection ona single real TCP connectionExchangedistributes messagesQueuekeeps messageshasconnects tostartsconnects toconnects toroutes messagepublish messagepushpull
Figure 1. Different parts of a RabbitMQ setup

At the heart of RabbitMQ, are exchanges and queues. An exchange is the endpoint, where messages are sent to. A queue is a place where a message is stored. From there, consumers can read messages. Exchanges route messages to the queues. The routing defines the logic of which messages should go to which queue(s). These can be zero queues if nobody is interested in such messages. However, it could also be one queue or many queues.

There are different exchange types that route messages in different ways to the queues:

  • A fanout exchange forwards a message to all queues that are bound to this exchange. This is the fastest exchange as it does not look into messages at all to decide where to route a message.

  • There is also the direct exchange. It forwards messages based on the routing key that is defined in the message to the matching queues.

  • Finally, there is a topic exchange. It is similar to the direct exchange but it has a pattern matching logic to route messages based on the message routing key. There is always a default exchange with the empty string as the name. All queues are automatically bound to it. Messages are automatically routed to a queue if the message routing key matches the queue name. This way, a message can be kind of directly sent to a queue.

The term publisher is used in your application for the functionality that sends a message to an exchange in RabbitMQ. On the other side, there are the so-called consumers that receive messages. They can use a push model (preferred) where RabbitMQ actively sends them new messages. They could also use a pull model where a consumer regularly asks a queue whether there are new items.

When a message is retrieved from a queue, it is processed by a consumer which is most often called message handler. Once the consumer is done with processing the message, it sends an "ack" for success (the message gets deleted) or "nack" if it could not process the message. If there are errors ("nack"), retry strategies are needed. This needs some care as RabbitMQ does not handle retries and redeliveries in a very good way.

To be able to read or send a message, an application must first connect to RabbitMQ. The RabbitMQ instance is called the Broker. It has virtual hosts (vhost) that offer a logical grouping of the exchanges and queues and include a permission concept. This can be thought of as a similar concept as a database server like PostgreSQL (~Broker) having different databases (~vhosts). Moreover, databases have tables (~queues). A connection needs to be established to such a vhost.

RabbitMQ supports different connection protocols. The most common one, also used by Mosaic, is AMQP 0.9.1 which is based on a long-lived TCP connection. This is the "physical" connection from an application to a RabbitMQ vhost. On top of that, there are channels which are a kind of "virtual" connections that use the "physical" connection. A best-practice for connections is to use one connection for publishers and another for consumers.

Messaging Patterns

Reliable messaging is hard. RabbitMQ offers many ways to handle exchanges and messages which can be quite confusing. There are many places where a message might be delivered but something might fail during consumption. This can be caused by connection errors and many more reasons.

We try to make it as simple as possible to achieve reliable messaging. For this purpose, we implement RabbitMQ best practices, use the Rascal libraries, and the Mosaic message-bus implementation.

Messages

A message is a general term that is used for sending some data to RabbitMQ that ends up in a queue. In Mosaic, messages are further differentiated into commands and events.

Commands are messages that are sent to a known recipient with the intention that this recipient performs some action. Commands should be named in the imperative form: "StartIngest" or "CreateUser". This tells the consumer of this command to start the ingest process based on the details given in the message. It could also be used to create a new user. The owner of a command is the consumer - there is only one consumer of this command. A command is "sent", never "published".

Events are messages that want to inform about something that happened. It is not known (or at least does not need to be known) to the sender who is interested in this message. Possibly, it could be nobody, a single subscriber, or many subscribers who might want to receive such a message. Events are named in the past tense (something has been performed). Examples are "IngestFinished" and "UserCreated". The owner of an event is the producer. There is only one producer of this event. An event is "published", never "sent".

Exchanges

The Mosaic message bus uses two exchanges to send messages. The command and the event exchange where the corresponding messages are sent/published to. Both of them are topic exchanges that allow for flexible message routing. Queues bind to these exchanges to subscribe to commands and events.

In addition, there are delay and retry exchanges that are used for delayed retries if a message fails to be processed. Moreover, there is also the dead_letter exchange which means that a message failed too many times.

Every Mosaic service must make sure that those exchanges exist when the application starts.

Queues and Bindings

Queues are used to receive commands and events. When consumers want to receive a specific message (command or event), they create a queue in which the messages should end up. The queue is then bound to one of the exchanges to receive the desired messages.

As mentioned in the message patterns section, a command is always handled by a single consumer. A logical consumer (with potentially multiple scaled application instances) has one queue from where it reads one type of command message. Command messages have the routing key with a naming convention of consumer.entity.command and are sent to the command exchange. That exchange forwards the message to the bound queue.

Events are owned by a single publisher. The routing key of the message is formed with a naming convention of producer.entity.event. Unlike commands, the routing key for events is formed around the producer. Every interested consumer can now subscribe to this routing key. An example routing key would be media_service.movie.published.

The catalog service subscribes to such a message but there could be other interested consumers (a reporting service or a monetization service) that are also subscribed to this event. Each consumer has its own message queue to which the exchange adds all such event messages.

As the publisher of an event does not know the consumers, it is the responsibility of the consumer to create the queue and bind it to the event exchange.

Summary: every queue receives exactly one type of messages. For every command, there is exactly one queue. For events, there is exactly one queue for every consumer, interested in this event. In total, this can be zero, one, or many queues per event type. If there are multiple consumers in the same service that want to act on the same event, then they would still each have their own queues. Only the dead letter queues hold messages of different types.

Retries

If a message is delivered to a consumer (push or pull), the consumer tries to handle it. If the consumer is not able to correctly handle it (some database is currently down, some data missing, any other exception), the message should be retried. Retry means that our application noticed that this message could not be handled. It shall "not acknowledge" the delivery via "nack".

The goal would then be to take the message, wait for a while, and put it back to the end of the queue. Then we would want to retry the message a defined number of times. Maybe the database would be then up again, some required data would exist, or some other issue would be resolved.

Mosaic uses prolonged retries: a message that produced an error is kept in a delay queue. The first few times when a message fails, it is put to the 10 second delay queue. If the message fails a few times more, it is put to the 30 seconds delay queue. When all attempts fail, the message is forwarded to the dead_letter queue.

Redeliveries

Redeliveries are related to retries. These are needed if a message was delivered to a consumer but the consumer crashed. That could be caused by some development bug, incorrect error handling, out of memory exception, or any other reason that caused the application to crash while the message was delivered but not acknowledged.

RabbitMQ notices that it delivered the message but the channel closed and no ack/nack answer was received. It puts the message back at the front of the queue. The next consumer takes the message. If the error persists, the consuming application crashes again. The message would thus be a "poisonous message" that brings down the system until it is removed.

RabbitMQ marks a message that was already delivered but put back with the redelivered flag. The problem is that this is just a flag, not a counter. This means we would only have the option to process every message once and move flagged messages directly to the dead_letter queue. Another option would be to find some other way to know how often this message was already consumed.

To solve this, we need to count the number of times a message was already delivered in our application. If that count is exceeded, we move it to the dead_letter queue. This information cannot be stored with the message: the service crashes before we can handle it. We cannot store it in memory either as it would be lost when the application crashes. Therefore, we need to store this in some external service like PostgreSQL. The Mosaic libraries include the functionality to create such a redeliveries counter.

Message Structure

RabbitMQ allows to send messages with textual or binary message bodies. Based on the content-type, the consumer can check how to deserialize the message.

Mosaic-based messages use JSON as the message format. All messages share a common message structure that defines the message payload (custom define) but also the metadata for exchanging messages. The metadata allows for a common way to log/trace messages, define the message type, carry the authentication token, share context, etc. The business-relevant data is contained in the payload field.

Example message
{
   "payload":{
      "video_location":"featured/Avatar",
      "video_profile":"DEFAULT"
   },
   "message_context":{
      "videoId":"9b9824d7-5af1-45c2-9f5d-32423f0d6d39"
   },
   "message_id":"3eb8257f-1341-49fb-bf77-027549cd4761",
   "timestamp":"2021-07-02T08:56:22.555Z",
   "message_type":"EnsureVideoExistsStart",
   "message_version":"1.0",
   "auth_token":"eyJhbGcixxx.eyJ0ZW5hbnRJZCxxx.VYqnlcLWxxx"
}
Table 1. Message field definitions
Property Description

payload

The Actual message payload/content.

message_context

An object that provides contextual information. This object can be attached to messages. When the initial message contains a message context, it should be included in every event that is published when handling that message. This is especially useful when sending a command where the resulting event should be mapped back to the starting entity.

message_id

The unique message identifier as UUID.

timestamp

Date and time (UTC) when the message was first published.

message_type

Message type, e.g. EnsureVideoExistsStart.

message_version

A version string of the message version. Can be increased if a new message format is created, so both the old and new message format can be correctly handled.

auth_token

A JWT token of the user/actor who triggered the message publication.

Development

Mosaic uses the client library amqplib which provides all the logic to connect to RabbitMQ through the AMQP 0-9-1 protocol. On top of that, we also use the rascal library. Rascal is a rich pub/sub wrapper for the above amqplib library which offers a nice "reliable by default" setup, connection retries, message retry attempts, poisonous message handling, and much more.

On top of those libraries, Mosaic offers the message bus library to encapsulate the specific messaging patterns described in the section above.

Message Definition

Messages define the contract regarding how services interact through commands and events.

Mosaic uses a schema-first approach that allows to define the message payload as JSON schema from which the TypeScript definitions are generated. The schema definitions can also be exchanged with third parties for integration into Mosaic messaging.

  • Single source of truth with a clear process about how to manage messages.

  • Serves as documentation.

  • Easier to extend/integrate: define once, generate code for multiple technologies and e2e tests.

Messages are defined using JSON schema because of its powerful expressive features and mature (and abundant) tooling.

Every project that is based on Mosaic should have a single "messages"` library. The directory layout should be based on the following structure:

Solution Structure
<root>
└── schemas
    └── payloads
        └── <Context A>
        │   └── commands
        │   └── event
        └── <Context B>
            └── ...

Payloads are grouped together by their origin (e.g. media-service) or context (e.g. publishing). It is suggested to further group the message schemas into event and command subdirectories if there are quite a few of them.

All schemas that represent either an event or a command should be suffixed with -event or -command, respectively. This signals the code generator what files to convert to TypeScript. You can use other files (e.g. common.json) to store reusable definitions. It is preferable to scope and keep each message definition as self-contained as possible.

A list of best practices and conventions on how to define the schemas:

  • Use lower snake case for names (properties, definitions, etc.), e.g. video_stream.

  • All top level schemas, properties, and definitions should have description set to something useful.

  • All top level schemas must have the title property set. It should match the file name without the file extension.

  • Definitions can optionally have the title property set. It indicates the code generator to declare these definitions as interfaces. Otherwise, they are inlined. If title is set, it should match the definition key.

  • Try to avoid allOf until better tooling is available.

The template projects come with tooling support out of the box that lints the written JSON schema files and generates TypeScript definitions from them. All output is created under src/generated. It mirrors the structure of the root schemas directory. You can find two subdirectories there: types for generated TS interfaces and schemas for bundled JSON schemas. The latter makes it possible to easily validate payloads against a schema later on.

The following two message payload definitions are used to define a CreateExampleCommand as the command message payload and a ExampleCreatedEvent for the event message payload.

Message Definition: `create-example-command.json `
{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "title": "create_example_command",
  "description": "Create example command schema.",
  "additionalProperties": false,
  "required": ["title", "count"],
  "properties": {
    "title": {
      "type": "string",
      "description": "The title of the example entity."
    },
    "count": {
      "type": "integer",
      "description": "Some example count."
    }
  }
}
Message Definition: `example-created-event.json `
{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "title": "example_created_event",
  "description": "Example created event schema.",
  "additionalProperties": false,
  "required": ["example_id"],
  "properties": {
    "example_id": {
      "type": "integer",
      "description": "The entity ID of the example stored in the database."
    },
    "message": {
      "type": "string",
      "description": "The creation message."
    },
  }
}

When generating the TypeScript schema, it would result in the following type definitions:

/**
 * Create example command schema.
 */
export interface CreateExampleCommand {
  /**
   * The title of the example entity.
   */
  title: string;

  /**
   * Some example count.
   */
  count: number;
}

/**
 * Example created event schema.
 */
export interface ExampleCreatedEvent {
  /**
   * The entity ID of the example stored in the database.
   */
  entity_id: number;

  /**
   * The creation message.
   */
  message: string;
}

Messaging Usage

To use Mosaic messaging, you need to set up the messaging broker and register all messaging handlers. This should be done once during application startup. An example setup is given below.

Messaging Setup
// import [... snip ...]

// general application setup
const app = express();
const config = getFullConfig();
const logger = new Logger('bootstrap');
const shutdownActions = setupShutdownActions(app, logger);

// Define example messaging settings (likely the messages library)
export class ExampleMessagingSettings implements MessagingSettings {
  public static readonly CreateExample = new ExampleMessagingSettings(
    'CreateExample',
    'example:create',
    'example.create',
  );
  public static readonly ExampleCreated = new ExampleMessagingSettings(
    'ExampleCreated',
    'example:created',
    'example.created',
  );

  private constructor(
    public readonly messageType: string,
    public readonly queue: string,
    public readonly routingKey: string,
  ) {}

  public toString = (): string => {
    return this.messageType;
  };
}

// register the configuration builders
const builders = [
    // Register a message handler
    new RascalConfigBuilder(
      ExampleMessagingSettings.CreateExample,
      config,
    ).subscribeForCommand(
      (broker: Broker) =>
        new CreateExampleHandler(broker, config),
    ),
    // Register an event publisher
    new RascalConfigBuilder(
      ExampleMessagingSettings.ExampleCreated,
      config,
    ).publishEvent(),
];

// Init the redeliveries message counter based on the PostgreSQL DB pool
const counter = initMessagingCounter(getOwnerPgPool(app));

// Register the messaging broker and bind all message handlers
await setupMessagingBroker(
  app,
  config,
  builders,
  logger,
  shutdownActions,
  undefined,
  { counters: { postgresCounter: counter } },
);

The following code shows an example message handler that listens to the CreateExampleCommand message, handles it in the onMessage method, and publishes a ExampleCreatedEvent once the operation is finished.

Message Handler and Message Publishing
// import [... snip ...]

export class CreateExampleHandler extends GuardedMessageHandler<
  CreateExampleCommand
> {
  constructor(
    private broker: Broker,
    config: Config,
  ) {
    super(
      ExampleMessagingSettings.CreateExample.messageType, // type: CreateExample
      ['EXAMPLES_EDIT', 'ADMIN'], // Required permissions
      config,
    );
  }

  async onMessage(
    content: CreateExampleCommand,
    message: MessageInfo<CreateExampleCommand>,
  ): Promise<void> {

    // Execute some business specific work...
    const exampleId = 1;
    const message = "Example item created";

    // Publishing an Event
    await this.broker.publish<ExampleCreatedEvent>(
      ExampleMessagingSettings.ExampleCreated.messageType,
      {
        example_id: exampleId,
        message: message
      },
      {
        auth_token: message.envelope.auth_token,
        message_context: message.envelope.message_context,
      },
    );
  }
}