Mosaic products documentation: Concepts, API Reference, Technical articles, How-to, Downloads and tools

Messaging

General

This how-to guide walks you through the steps on how to use the Mosaic messaging approach in your applications. The example extends the Media Service ingest process to update a movie with details from the MovieDB database.

Make sure your development environment is set up correctly and start the following development scripts:

  • run yarn dev:libs in a separate console

  • run yarn dev:services in a separate console

  • run yarn dev:workflows in a separate console

Define and register a message

  • create a command message that takes the movie_id to update → TS file is generated

  • update libs\media-messages\src\media\media-messaging-settings.ts to add the new command

      public static readonly UpdateDataFromMovieDb = new MediaMessagingSettings(
        MediaCommandsTypes.UpdateDataFromMovieDbCommand,
        'movie:update_from_movie_db',
        'movie.update_from_movie_db',
      );
  • Add the sendCommand to services\media\service\src\messaging\register-messaging.ts

        new RascalConfigBuilder(
          MediaMessagingSettings.UpdateDataFromMovieDb,
          config,
        )
          .sendCommand(),

Plugin

  • create the plugin - copy&paste an existing one, e.g. the start-ingest-endpoint-plugin.ts as start-movie-db-update-plugin.ts and adjust it

  • register the plugin via postgraphile-middleware.ts

  • check the service console log output - notice that the endpoint is not allowed

  • add the endpoint to Movies: Edit permission in permission-definition.ts

Message Handler

  • create a Movie DB account

  • get an API key and add it to the .env file as MOVIE_DB_API_KEY=xxxxxxxxxxxxx

  • Adjust the config-definitions.ts file and add a new property as movieDbApiKey: () ⇒ env.get('MOVIE_DB_API_KEY').asString(),

  • copy one existing handler, e.g. publish-entity-command-handler.ts as update-data-from-movie-db-handler.ts

  • adjust the handler to request the data from the movie DB and update our own movie data

  • register the handler and the command in the services\media\service\src\messaging\register-messaging.ts

  • Update the Rascal config in services\media\service\src\messaging\register-messaging.ts to include the subscription

        new RascalConfigBuilder(
          MediaMessagingSettings.UpdateDataFromMovieDb,
          config,
        )
          .sendCommand()
          .subscribeForCommand<UpdateDataFromMovieDbCommand>(
            () => new UpdateDataFromMovieDbHandler(loginPool, config),
          ),

Example files

update-data-from-movie-db-command.json

{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "title": "update_data_from_movie_db_command",
  "description": "Update movie data from the movie DB schema.",
  "additionalProperties": false,
  "required": ["movie_id"],
  "properties": {
    "movie_id": {
      "type": "integer",
      "description": "Id of the movie to update."
    }
  }
}

start-movie-db-update-plugin.ts

import { ApiError } from '@axinom/mosaic-service-common';
import { gql, makeExtendSchemaPlugin } from 'graphile-utils';
import {
  MediaMessagingSettings,
  UpdateDataFromMovieDbCommand,
} from 'media-messages';
import { ExtendedGraphQLContext } from '../middleware';
import { getLongLivedToken } from '../security';

export const StartMovieDbUpdatePlugin = makeExtendSchemaPlugin((build) => {
  return {
    typeDefs: gql`
      input StartMovieDbUpdateInput {
        movieId: Int!
      }
      type StartMovieDbUpdatePayload {
        message: String!
        query: Query
      }
      extend type Mutation {
        startMovieDbUpdate(
          input: StartMovieDbUpdateInput!
        ): StartMovieDbUpdatePayload
      }
    `,
    resolvers: {
      Mutation: {
        startMovieDbUpdate: async (
          _query,
          args,
          context: ExtendedGraphQLContext,
        ) => {
          try {
            const movieId = args.input.movieId;

            const token = await getLongLivedToken(
              context.jwtToken ?? '',
              context.config,
            );

            // Sending only a database ID in a scenario of detached services is an anti-pattern
            // Ideally, the whole doc should have been sent and the message should be self-contained,
            // but because the document can be quite big we save it to the DB and pass only its ID.
            await context.messagingBroker.publish<UpdateDataFromMovieDbCommand>(
              MediaMessagingSettings.UpdateDataFromMovieDb.messageType,
              { movie_id: movieId },
              { auth_token: token },
            );

            return {
              message: 'The import from The Movie DB was started.',
              query: build.$$isQuery,
            };
          } catch (error) {
            if (error instanceof ApiError) {
              throw error;
            }
            throw new ApiError(
              error,
              'The attempt to start the movie DB import failed.',
            );
          }
        },
      },
    },
  };
});

update-data-from-movie-db-handler.ts

import { transactionWithContext } from '@axinom/mosaic-db-common';
import { MessageInfo } from '@axinom/mosaic-message-bus';
import { Logger } from '@axinom/mosaic-service-common';
import fetch from 'cross-fetch';
import {
  MediaMessagingSettings,
  UpdateDataFromMovieDbCommand,
} from 'media-messages';
import { Pool } from 'pg';
import * as db from 'zapatos/db';
import { selectExactlyOne, update } from 'zapatos/db';
import { MediaGuardedMessageHandler } from '../../common';
import { Config } from '../../common/config';

export class UpdateDataFromMovieDbHandler extends MediaGuardedMessageHandler<
  UpdateDataFromMovieDbCommand
> {
  private readonly logger: Logger;

  constructor(private readonly dbPool: Pool, config: Config) {
    super(
      MediaMessagingSettings.UpdateDataFromMovieDb.messageType,
      ['ADMIN', 'MOVIES_EDIT'],
      config,
    );

    this.logger = new Logger(config, 'DeleteEntityCommandHandler');
  }

  async onMessage(
    payload: UpdateDataFromMovieDbCommand,
    messageInfo: MessageInfo<UpdateDataFromMovieDbCommand>,
  ): Promise<void> {
    await transactionWithContext(
      this.dbPool,
      db.IsolationLevel.Serializable,
      await this.getPgSettings(messageInfo),
      async (tnxClient) => {
        const movieId = payload.movie_id;

        this.logger.debug({
          message: 'Update movie from The Movie DB',
          details: {
            movieId,
          },
        });

        const movie = await selectExactlyOne('movies', { id: movieId }).run(
          tnxClient,
        );

        const searchMovieUrl = `https://api.themoviedb.org/3/search/movie?api_key=${this.config.movieDbApiKey}&language=en-US&query=${movie.title}&page=1`;

        const searchMovieResponse = await fetch(searchMovieUrl);
        if (searchMovieResponse.ok) {
          const searchMovieData = await searchMovieResponse.json();
          const firstMovie = searchMovieData['results'][0];
          if (firstMovie) {
            this.logger.debug(JSON.stringify(firstMovie));
            await update(
              'movies',
              {
                original_title: firstMovie.original_title,
                description: firstMovie.overview,
                released: firstMovie.release_date,
              },
              {
                id: movieId,
              },
            ).run(tnxClient);
          }
        }
      },
    );
  }
}