Messaging
General
This how-to guide walks you through the steps on how to use the Mosaic messaging approach in your applications. The example extends the Media Service ingest process to update a movie with details from the MovieDB database.
Make sure your development environment is set up correctly and start the following development scripts:
-
run
yarn dev:libs
in a separate console -
run
yarn dev:services
in a separate console -
run
yarn dev:workflows
in a separate console
Define and register a message
-
create a command message that takes the
movie_id
to update → TS file is generated -
update
libs\media-messages\src\media\media-messaging-settings.ts
to add the new commandpublic static readonly UpdateDataFromMovieDb = new MediaMessagingSettings( MediaCommandsTypes.UpdateDataFromMovieDbCommand, 'movie:update_from_movie_db', 'movie.update_from_movie_db', );
-
Add the sendCommand to
services\media\service\src\messaging\register-messaging.ts
new RascalConfigBuilder( MediaMessagingSettings.UpdateDataFromMovieDb, config, ) .sendCommand(),
Plugin
-
create the plugin - copy&paste an existing one, e.g. the
start-ingest-endpoint-plugin.ts
asstart-movie-db-update-plugin.ts
and adjust it -
register the plugin via
postgraphile-middleware.ts
-
check the service console log output - notice that the endpoint is not allowed
-
add the endpoint to
Movies: Edit
permission inpermission-definition.ts
Message Handler
-
create a Movie DB account
-
get an API key and add it to the
.env
file asMOVIE_DB_API_KEY=xxxxxxxxxxxxx
-
Adjust the
config-definitions.ts
file and add a new property asmovieDbApiKey: () ⇒ env.get('MOVIE_DB_API_KEY').asString(),
-
copy one existing handler, e.g.
publish-entity-command-handler.ts
asupdate-data-from-movie-db-handler.ts
-
adjust the handler to request the data from the movie DB and update our own movie data
-
register the handler and the command in the
services\media\service\src\messaging\register-messaging.ts
-
Update the Rascal config in
services\media\service\src\messaging\register-messaging.ts
to include the subscriptionnew RascalConfigBuilder( MediaMessagingSettings.UpdateDataFromMovieDb, config, ) .sendCommand() .subscribeForCommand<UpdateDataFromMovieDbCommand>( () => new UpdateDataFromMovieDbHandler(loginPool, config), ),
Example files
update-data-from-movie-db-command.json
{
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "update_data_from_movie_db_command",
"description": "Update movie data from the movie DB schema.",
"additionalProperties": false,
"required": ["movie_id"],
"properties": {
"movie_id": {
"type": "integer",
"description": "Id of the movie to update."
}
}
}
start-movie-db-update-plugin.ts
import { ApiError } from '@axinom/mosaic-service-common';
import { gql, makeExtendSchemaPlugin } from 'graphile-utils';
import {
MediaMessagingSettings,
UpdateDataFromMovieDbCommand,
} from 'media-messages';
import { ExtendedGraphQLContext } from '../middleware';
import { getLongLivedToken } from '../security';
export const StartMovieDbUpdatePlugin = makeExtendSchemaPlugin((build) => {
return {
typeDefs: gql`
input StartMovieDbUpdateInput {
movieId: Int!
}
type StartMovieDbUpdatePayload {
message: String!
query: Query
}
extend type Mutation {
startMovieDbUpdate(
input: StartMovieDbUpdateInput!
): StartMovieDbUpdatePayload
}
`,
resolvers: {
Mutation: {
startMovieDbUpdate: async (
_query,
args,
context: ExtendedGraphQLContext,
) => {
try {
const movieId = args.input.movieId;
const token = await getLongLivedToken(
context.jwtToken ?? '',
context.config,
);
// Sending only a database ID in a scenario of detached services is an anti-pattern
// Ideally, the whole doc should have been sent and the message should be self-contained,
// but because the document can be quite big we save it to the DB and pass only its ID.
await context.messagingBroker.publish<UpdateDataFromMovieDbCommand>(
MediaMessagingSettings.UpdateDataFromMovieDb.messageType,
{ movie_id: movieId },
{ auth_token: token },
);
return {
message: 'The import from The Movie DB was started.',
query: build.$$isQuery,
};
} catch (error) {
if (error instanceof ApiError) {
throw error;
}
throw new ApiError(
error,
'The attempt to start the movie DB import failed.',
);
}
},
},
},
};
});
update-data-from-movie-db-handler.ts
import { transactionWithContext } from '@axinom/mosaic-db-common';
import { MessageInfo } from '@axinom/mosaic-message-bus';
import { Logger } from '@axinom/mosaic-service-common';
import fetch from 'cross-fetch';
import {
MediaMessagingSettings,
UpdateDataFromMovieDbCommand,
} from 'media-messages';
import { Pool } from 'pg';
import * as db from 'zapatos/db';
import { selectExactlyOne, update } from 'zapatos/db';
import { MediaGuardedMessageHandler } from '../../common';
import { Config } from '../../common/config';
export class UpdateDataFromMovieDbHandler extends MediaGuardedMessageHandler<
UpdateDataFromMovieDbCommand
> {
private readonly logger: Logger;
constructor(private readonly dbPool: Pool, config: Config) {
super(
MediaMessagingSettings.UpdateDataFromMovieDb.messageType,
['ADMIN', 'MOVIES_EDIT'],
config,
);
this.logger = new Logger(config, 'DeleteEntityCommandHandler');
}
async onMessage(
payload: UpdateDataFromMovieDbCommand,
messageInfo: MessageInfo<UpdateDataFromMovieDbCommand>,
): Promise<void> {
await transactionWithContext(
this.dbPool,
db.IsolationLevel.Serializable,
await this.getPgSettings(messageInfo),
async (tnxClient) => {
const movieId = payload.movie_id;
this.logger.debug({
message: 'Update movie from The Movie DB',
details: {
movieId,
},
});
const movie = await selectExactlyOne('movies', { id: movieId }).run(
tnxClient,
);
const searchMovieUrl = `https://api.themoviedb.org/3/search/movie?api_key=${this.config.movieDbApiKey}&language=en-US&query=${movie.title}&page=1`;
const searchMovieResponse = await fetch(searchMovieUrl);
if (searchMovieResponse.ok) {
const searchMovieData = await searchMovieResponse.json();
const firstMovie = searchMovieData['results'][0];
if (firstMovie) {
this.logger.debug(JSON.stringify(firstMovie));
await update(
'movies',
{
original_title: firstMovie.original_title,
description: firstMovie.overview,
released: firstMovie.release_date,
},
{
id: movieId,
},
).run(tnxClient);
}
}
},
);
}
}