Skip to main content
Preview
Preview Feature — This feature is currently in preview and under active development. APIs and functionality may change. We recommend testing thoroughly before using in production.

Message Brokers

Message brokers let you build event-driven applications by sending and receiving messages between services. Ductape supports multiple providers through a unified interface.

Quick Example

Use the messaging API on the Ductape instance. Events use the format brokerTag:topicTag (e.g. order-events:order-created).

import Ductape from '@ductape/sdk';

const ductape = new Ductape({
accessKey: 'your-access-key'
});

const { messaging } = ductape;

// Publish a message
await messaging.produce({
product: 'my-product',
env: 'prd',
event: 'order-events:order-created',
message: { orderId: '123', amount: 99.99 },
});

// Subscribe to messages (consumer)
await messaging.consume({
product: 'my-product',
env: 'prd',
event: 'order-events:order-created',
callback: async (message) => {
console.log('Received:', message);
},
});

What You Can Do

  • Produce messages to topics with messaging.produce()
  • Consume messages from topics with messaging.consume()
  • Manage brokers and topics with messaging.create(), messaging.list(), messaging.fetch(), messaging.topics.create(), messaging.topics.list(), etc.
  • Optional session context when producing (e.g. user token from sessions.start())
  • Switch providers per environment without changing your code
  • Track and query messages via messaging.messages.query(), getProducers, getConsumers, getDeadLetters, getStats, getDashboard
  • Advanced (BrokersService): event history, replay, DLQ reprocess, idempotent publish

Supported Providers

ProviderTypeBest For
KafkaKAFKAHigh-throughput distributed streaming
RabbitMQRABBITMQFlexible routing, reliable delivery
RedisREDISSimple pub/sub, low latency
AWS SQSAWS_SQSServerless managed queues
Google Pub/SubGOOGLE_PUBSUBGoogle Cloud integration
NATSNATSLightweight, high-performance messaging

Producing Messages

Send messages to a topic with messaging.produce(). The event must be brokerTag:topicTag.

const result = await ductape.messaging.produce({
product: 'my-product',
env: 'prd',
event: 'order-events:order-created',
message: {
orderId: '12345',
customerId: 'cust_789',
total: 99.99,
createdAt: new Date().toISOString(),
},
});

// Returns { success: true, process_id: '...' }

With session (user context)

You can pass a session token (e.g. from sessions.start()) so the message is associated with a user context:

const session = await ductape.sessions.start({
product: 'my-product',
env: 'prd',
tag: 'user-session',
data: { userId: 'u1', email: 'user@example.com' },
});

await ductape.messaging.produce({
product: 'my-product',
env: 'prd',
event: 'order-events:order-created',
message: { orderId: '123', total: 99.99 },
session: `${session.sessionId}:${session.token}`, // or tag:token format
});

Consuming Messages

Subscribe to a topic with messaging.consume(). Your callback runs for each message.

await ductape.messaging.consume({
product: 'my-product',
env: 'prd',
event: 'order-events:order-created',
callback: async (message) => {
console.log('Received order:', message);
await processNewOrder(message);
},
});

Listing and Fetching Brokers and Topics

List all brokers for a product

const brokers = await ductape.messaging.list('my-product');

for (const b of brokers) {
console.log('Broker:', b.tag, b.name);
}

Fetch a single broker

const broker = await ductape.messaging.fetch('my-product', 'order-events');

if (broker) {
console.log('Name:', broker.name);
console.log('Environments:', broker.envs?.map((e) => e.slug));
}

List topics for a broker

const topics = await ductape.messaging.topics.list('my-product', 'order-events');

for (const t of topics) {
console.log('Topic:', t.tag, t.name);
}

Fetch a single topic

Topic tag is the full event string brokerTag:topicTag.

const topic = await ductape.messaging.topics.fetch('my-product', 'order-events:order-created');

if (topic) {
console.log('Name:', topic.name);
console.log('Sample:', topic.sample);
}

Creating a Message Broker

Create a broker with environment-specific configurations. Pass product in the data object.

import { MessageBrokerTypes } from '@ductape/sdk';

await ductape.messaging.create({
product: 'my-product',
name: 'Order Events',
tag: 'order-events',
description: 'Handles all order-related messages',
envs: [
{
slug: 'prd',
type: MessageBrokerTypes.KAFKA,
config: {
brokers: ['kafka-prod.example.com:9092'],
clientId: 'order-service',
groupId: 'order-consumers',
topic: 'orders',
ssl: true,
sasl: {
mechanism: 'scram-sha-256',
username: 'prod-user',
password: 'prod-password',
},
},
},
{
slug: 'dev',
type: MessageBrokerTypes.REDIS,
config: {
host: 'localhost',
port: 6379,
},
},
],
});

Provider config shapes

Kafka

{
brokers: string[]; // ['broker1:9092', 'broker2:9092']
clientId: string;
groupId: string;
topic: string;
ssl?: boolean;
sasl?: {
mechanism: 'plain' | 'scram-sha-256' | 'scram-sha-512';
username: string;
password: string;
};
}

RabbitMQ

{
url: string; // e.g. amqps://user:pass@host/vhost
// or host, port, username, password, queue, routingKey
}

Redis

{
host: string;
port: number;
password?: string;
}

AWS SQS

{
region: string;
accessKeyId: string;
secretAccessKey: string;
}

Google Pub/Sub

{
projectId: string;
topicName: string;
subscriptionName?: string;
keyFilename?: string;
}

NATS

{
servers: string | string[];
user?: string;
pass?: string;
token?: string;
name?: string;
maxReconnectAttempts?: number;
reconnectTimeWait?: number;
}

Managing Topics

Create and update topics with messaging.topics. The topic tag must be the full identifier: brokerTag:topicTag.

Create a topic

await ductape.messaging.topics.create('my-product', {
name: 'Order Created',
tag: 'order-events:order-created',
description: 'Emitted when an order is created',
sample: {
orderId: '12345',
customerId: 'cust_789',
total: 99.99,
createdAt: '2024-01-15T10:30:00Z',
},
});

For AWS SQS, you can provide queue URLs per environment via queueUrls:

await ductape.messaging.topics.create('my-product', {
name: 'Order Created',
tag: 'order-events:order-created',
queueUrls: [
{ env_slug: 'prd', url: 'https://sqs.us-east-1.amazonaws.com/123/orders-prd' },
{ env_slug: 'dev', url: 'https://sqs.us-east-1.amazonaws.com/123/orders-dev' },
],
sample: { orderId: '12345' },
});

Update a topic

await ductape.messaging.topics.update('my-product', 'order-events:order-created', {
description: 'Updated description',
sample: {
orderId: '12345',
customerId: 'cust_789',
total: 99.99,
status: 'pending',
},
});

Event Format

Events always use brokerTag:topicTag:

EventBroker tagTopic tag
order-events:order-createdorder-eventsorder-created
order-events:payment-processedorder-eventspayment-processed
notifications:user-alertsnotificationsuser-alerts

Message tracking (messaging.messages)

You can query produced/consumed messages and dashboard stats via ductape.messaging.messages:

// Query messages with filters
const { messages, total, page, hasMore } = await ductape.messaging.messages.query({
product: 'my-product',
env: 'prd',
brokerTag: 'order-events',
topicTag: 'order-created',
page: 1,
limit: 20,
});

// Producers / consumers
const { producers } = await ductape.messaging.messages.getProducers({
product: 'my-product',
env: 'prd',
brokerTag: 'order-events',
});

const { consumers } = await ductape.messaging.messages.getConsumers({
product: 'my-product',
env: 'prd',
brokerTag: 'order-events',
});

// Dead letters and stats
const { deadLetters } = await ductape.messaging.messages.getDeadLetters({
product: 'my-product',
env: 'prd',
brokerTag: 'order-events',
});

const stats = await ductape.messaging.messages.getStats({
product: 'my-product',
env: 'prd',
brokerTag: 'order-events',
});

const dashboard = await ductape.messaging.messages.getDashboard({
product: 'my-product',
env: 'prd',
brokerTag: 'order-events',
});

Advanced: BrokersService (events, replay, DLQ, idempotency)

For event history, replay, Dead Letter Queue reprocessing, and idempotent publish, use BrokersService directly. Initialize it with the same credentials as Ductape (typically derived from your access key).

BrokersService methodDescription
publish(options)Same as messaging.produce()
subscribe(options)Same as messaging.consume()
getBrokers(product)Same as messaging.list(product)
getBroker(product, brokerTag)Same as messaging.fetch(product, brokerTag)
getTopics(product, brokerTag)Same as messaging.topics.list(product, brokerTag)
getTopic(product, event)Same as messaging.topics.fetch(product, event)
getEvents(options)Get broker events with filters (status, category, topic, dates)
getEvent({ product, eventId })Get a single event by ID
getEventStats(options)Aggregated stats for a broker
replayEvent(options)Replay a failed or successful event
reprocessDLQ(options)Reprocess messages from the Dead Letter Queue
checkIdempotency(options)Check if an idempotency key was already processed
publishIdempotent(options)Publish with idempotency key
testConnection(options)Test broker connectivity

Example: event stats and replay

import { BrokersService } from '@ductape/sdk';

const brokers = new BrokersService({
access_key: 'your-access-key',
env_type: 'prd',
});

const stats = await brokers.getEventStats({
product: 'my-product',
env: 'prd',
brokerTag: 'order-events',
});

const result = await brokers.replayEvent({
product: 'my-product',
env: 'prd',
eventId: 'event-123',
force: true,
});

Error handling

import { BrokerError } from '@ductape/sdk';

try {
await ductape.messaging.produce({ product: '...', env: 'prd', event: '...', message: {} });
} catch (error) {
if (error instanceof BrokerError) {
console.log('Code:', error.code);
console.log('Message:', error.message);
}
}

Common codes: BROKER_NOT_FOUND, BROKER_ENV_NOT_FOUND, TOPIC_NOT_FOUND, PUBLISH_FAILED, SUBSCRIBE_FAILED, SESSION_INVALID, and others.


Best practices

  1. Use the event format brokerTag:topicTag everywhere (produce, consume, topics).
  2. Create brokers and topics via messaging.create() and messaging.topics.create() so they exist before producing/consuming.
  3. Use environment-specific configs (e.g. different credentials per slug in envs).
  4. Document topic payloads with the sample field when creating topics.
  5. Handle failures in consume callbacks (retries, logging) so the broker can track success/failure.
  6. Use session when producing on behalf of a user so messages are tied to user context.
  7. Use BrokersService for replay, DLQ, and idempotent publish when needed.

See also

  • Workflows – Use message brokers in workflow steps
  • Jobs – Schedule recurring produce operations