Skip to main content

Processing Message Brokers

Ductape provides a Message Broker interface to handle event-driven communication between your system components. It supports two main operations:

  • Publishing Messages – Send messages to a specific event/topic.
  • Subscribing to Events – Listen for messages on a specific event/topic and process them with a callback.

Publishing Messages

Publishing allows you to send structured data (messages) to a broker, which can then be consumed by other services or components subscribed to that event.

await ductape.processor.messageBroker.publish(data: IMessageBrokerPublishInput)

IMessageBrokerPublishInput

FieldTypeRequiredDescription
envstringYesEnvironment where the message should be published.
eventstringYesEvent identifier in the format brokerTag:topicTag.
product_tagstringYesUnique identifier for the product sending the message.
inputobjectYesPayload containing the message data to publish.
cachestringNoCache tag to use if request caching is needed.
sessionISessionNoSession object with token and tag.

Note: Optional fields can be omitted or passed as empty {}.

Subscribing to Events

Subscribing allows you to listen for incoming messages on a specified event/topic and process them with a callback function.

await ductape.processor.messageBroker.subscribe(data: IMessageBrokerSubscribeInput)

IMessageBrokerSubscribeInput

FieldTypeRequiredDescription
envstringYesEnvironment where the subscription should be active.
eventstringYesEvent identifier in the format brokerTag:topicTag.
product_tagstringYesUnique identifier for the product receiving the message.
inputobjectYesSubscription details, including the callback function.
sessionISessionNoSession object with token and tag.

The input object must include:

FieldTypeRequiredDescription
callbackFunctionYesAsync function to handle received messages.

ISession Schema

The session field enables optional session tracking for any message broker operation.

interface ISession {
tag: string; // session tag
token: string; // session token (e.g. signed JWT)
}
FieldTypeRequiredDescription
tagstringYesTag identifying the session type.
tokenstringYesToken generated when the session was created.

Returns

Both publish and subscribe return a Promise<unknown> resolving with the result of the operation. The structure depends on the broker and event implementation.

Example

Publishing a message:

await ductape.processor.messageBroker.publish({
env: "prd",
event: "sqsbroker:new-orders",
product_tag: "my_product",
session: {
token: "abc123xyz",
tag: "session-tag-1"
},
input: {
message: {
orderId: "12345",
status: "pending",
customer: {
name: "John Doe",
email: "john@example.com"
}
}
}
});

Subscribing to an event:

await ductape.processor.messageBroker.subscribe({
env: "prd",
event: "sqsbroker:new-orders",
product_tag: "my_product",
session: {
token: "abc123xyz",
tag: "session-tag-1"
},
input: {
callback: async (message) => {
console.log("Received message:", message);
// Implement processing logic here (e.g., update order status)
}
}
});

See Also