Message Brokers
Message brokers let you build event-driven applications by sending and receiving messages between services. Ductape supports multiple providers through a unified interface.
Quick Example
Use the messaging API on the Ductape instance. Events use the format brokerTag:topicTag (e.g. order-events:order-created).
import Ductape from '@ductape/sdk';
const ductape = new Ductape({
accessKey: 'your-access-key'
});
const { messaging } = ductape;
// Publish a message
await messaging.produce({
product: 'my-product',
env: 'prd',
event: 'order-events:order-created',
message: { orderId: '123', amount: 99.99 },
});
// Subscribe to messages (consumer)
await messaging.consume({
product: 'my-product',
env: 'prd',
event: 'order-events:order-created',
callback: async (message) => {
console.log('Received:', message);
},
});
What You Can Do
- Produce messages to topics with
messaging.produce() - Consume messages from topics with
messaging.consume() - Manage brokers and topics with
messaging.create(),messaging.list(),messaging.fetch(),messaging.topics.create(),messaging.topics.list(), etc. - Optional session context when producing (e.g. user token from
sessions.start()) - Switch providers per environment without changing your code
- Track and query messages via
messaging.messages.query(),getProducers,getConsumers,getDeadLetters,getStats,getDashboard - Advanced (BrokersService): event history, replay, DLQ reprocess, idempotent publish
Supported Providers
| Provider | Type | Best For |
|---|---|---|
| Kafka | KAFKA | High-throughput distributed streaming |
| RabbitMQ | RABBITMQ | Flexible routing, reliable delivery |
| Redis | REDIS | Simple pub/sub, low latency |
| AWS SQS | AWS_SQS | Serverless managed queues |
| Google Pub/Sub | GOOGLE_PUBSUB | Google Cloud integration |
| NATS | NATS | Lightweight, high-performance messaging |
Producing Messages
Send messages to a topic with messaging.produce(). The event must be brokerTag:topicTag.
const result = await ductape.messaging.produce({
product: 'my-product',
env: 'prd',
event: 'order-events:order-created',
message: {
orderId: '12345',
customerId: 'cust_789',
total: 99.99,
createdAt: new Date().toISOString(),
},
});
// Returns { success: true, process_id: '...' }
With session (user context)
You can pass a session token (e.g. from sessions.start()) so the message is associated with a user context:
const session = await ductape.sessions.start({
product: 'my-product',
env: 'prd',
tag: 'user-session',
data: { userId: 'u1', email: 'user@example.com' },
});
await ductape.messaging.produce({
product: 'my-product',
env: 'prd',
event: 'order-events:order-created',
message: { orderId: '123', total: 99.99 },
session: `${session.sessionId}:${session.token}`, // or tag:token format
});
Consuming Messages
Subscribe to a topic with messaging.consume(). Your callback runs for each message.
await ductape.messaging.consume({
product: 'my-product',
env: 'prd',
event: 'order-events:order-created',
callback: async (message) => {
console.log('Received order:', message);
await processNewOrder(message);
},
});
Listing and Fetching Brokers and Topics
List all brokers for a product
const brokers = await ductape.messaging.list('my-product');
for (const b of brokers) {
console.log('Broker:', b.tag, b.name);
}
Fetch a single broker
const broker = await ductape.messaging.fetch('my-product', 'order-events');
if (broker) {
console.log('Name:', broker.name);
console.log('Environments:', broker.envs?.map((e) => e.slug));
}
List topics for a broker
const topics = await ductape.messaging.topics.list('my-product', 'order-events');
for (const t of topics) {
console.log('Topic:', t.tag, t.name);
}
Fetch a single topic
Topic tag is the full event string brokerTag:topicTag.
const topic = await ductape.messaging.topics.fetch('my-product', 'order-events:order-created');
if (topic) {
console.log('Name:', topic.name);
console.log('Sample:', topic.sample);
}
Creating a Message Broker
Create a broker with environment-specific configurations. Pass product in the data object.
import { MessageBrokerTypes } from '@ductape/sdk';
await ductape.messaging.create({
product: 'my-product',
name: 'Order Events',
tag: 'order-events',
description: 'Handles all order-related messages',
envs: [
{
slug: 'prd',
type: MessageBrokerTypes.KAFKA,
config: {
brokers: ['kafka-prod.example.com:9092'],
clientId: 'order-service',
groupId: 'order-consumers',
topic: 'orders',
ssl: true,
sasl: {
mechanism: 'scram-sha-256',
username: 'prod-user',
password: 'prod-password',
},
},
},
{
slug: 'dev',
type: MessageBrokerTypes.REDIS,
config: {
host: 'localhost',
port: 6379,
},
},
],
});
Provider config shapes
Kafka
{
brokers: string[]; // ['broker1:9092', 'broker2:9092']
clientId: string;
groupId: string;
topic: string;
ssl?: boolean;
sasl?: {
mechanism: 'plain' | 'scram-sha-256' | 'scram-sha-512';
username: string;
password: string;
};
}
RabbitMQ
{
url: string; // e.g. amqps://user:pass@host/vhost
// or host, port, username, password, queue, routingKey
}
Redis
{
host: string;
port: number;
password?: string;
}
AWS SQS
{
region: string;
accessKeyId: string;
secretAccessKey: string;
}
Google Pub/Sub
{
projectId: string;
topicName: string;
subscriptionName?: string;
keyFilename?: string;
}
NATS
{
servers: string | string[];
user?: string;
pass?: string;
token?: string;
name?: string;
maxReconnectAttempts?: number;
reconnectTimeWait?: number;
}
Managing Topics
Create and update topics with messaging.topics. The topic tag must be the full identifier: brokerTag:topicTag.
Create a topic
await ductape.messaging.topics.create('my-product', {
name: 'Order Created',
tag: 'order-events:order-created',
description: 'Emitted when an order is created',
sample: {
orderId: '12345',
customerId: 'cust_789',
total: 99.99,
createdAt: '2024-01-15T10:30:00Z',
},
});
For AWS SQS, you can provide queue URLs per environment via queueUrls:
await ductape.messaging.topics.create('my-product', {
name: 'Order Created',
tag: 'order-events:order-created',
queueUrls: [
{ env_slug: 'prd', url: 'https://sqs.us-east-1.amazonaws.com/123/orders-prd' },
{ env_slug: 'dev', url: 'https://sqs.us-east-1.amazonaws.com/123/orders-dev' },
],
sample: { orderId: '12345' },
});
Update a topic
await ductape.messaging.topics.update('my-product', 'order-events:order-created', {
description: 'Updated description',
sample: {
orderId: '12345',
customerId: 'cust_789',
total: 99.99,
status: 'pending',
},
});
Event Format
Events always use brokerTag:topicTag:
| Event | Broker tag | Topic tag |
|---|---|---|
order-events:order-created | order-events | order-created |
order-events:payment-processed | order-events | payment-processed |
notifications:user-alerts | notifications | user-alerts |
Message tracking (messaging.messages)
You can query produced/consumed messages and dashboard stats via ductape.messaging.messages:
// Query messages with filters
const { messages, total, page, hasMore } = await ductape.messaging.messages.query({
product: 'my-product',
env: 'prd',
brokerTag: 'order-events',
topicTag: 'order-created',
page: 1,
limit: 20,
});
// Producers / consumers
const { producers } = await ductape.messaging.messages.getProducers({
product: 'my-product',
env: 'prd',
brokerTag: 'order-events',
});
const { consumers } = await ductape.messaging.messages.getConsumers({
product: 'my-product',
env: 'prd',
brokerTag: 'order-events',
});
// Dead letters and stats
const { deadLetters } = await ductape.messaging.messages.getDeadLetters({
product: 'my-product',
env: 'prd',
brokerTag: 'order-events',
});
const stats = await ductape.messaging.messages.getStats({
product: 'my-product',
env: 'prd',
brokerTag: 'order-events',
});
const dashboard = await ductape.messaging.messages.getDashboard({
product: 'my-product',
env: 'prd',
brokerTag: 'order-events',
});
Advanced: BrokersService (events, replay, DLQ, idempotency)
For event history, replay, Dead Letter Queue reprocessing, and idempotent publish, use BrokersService directly. Initialize it with the same credentials as Ductape (typically derived from your access key).
| BrokersService method | Description |
|---|---|
publish(options) | Same as messaging.produce() |
subscribe(options) | Same as messaging.consume() |
getBrokers(product) | Same as messaging.list(product) |
getBroker(product, brokerTag) | Same as messaging.fetch(product, brokerTag) |
getTopics(product, brokerTag) | Same as messaging.topics.list(product, brokerTag) |
getTopic(product, event) | Same as messaging.topics.fetch(product, event) |
getEvents(options) | Get broker events with filters (status, category, topic, dates) |
getEvent({ product, eventId }) | Get a single event by ID |
getEventStats(options) | Aggregated stats for a broker |
replayEvent(options) | Replay a failed or successful event |
reprocessDLQ(options) | Reprocess messages from the Dead Letter Queue |
checkIdempotency(options) | Check if an idempotency key was already processed |
publishIdempotent(options) | Publish with idempotency key |
testConnection(options) | Test broker connectivity |
Example: event stats and replay
import { BrokersService } from '@ductape/sdk';
const brokers = new BrokersService({
access_key: 'your-access-key',
env_type: 'prd',
});
const stats = await brokers.getEventStats({
product: 'my-product',
env: 'prd',
brokerTag: 'order-events',
});
const result = await brokers.replayEvent({
product: 'my-product',
env: 'prd',
eventId: 'event-123',
force: true,
});
Error handling
import { BrokerError } from '@ductape/sdk';
try {
await ductape.messaging.produce({ product: '...', env: 'prd', event: '...', message: {} });
} catch (error) {
if (error instanceof BrokerError) {
console.log('Code:', error.code);
console.log('Message:', error.message);
}
}
Common codes: BROKER_NOT_FOUND, BROKER_ENV_NOT_FOUND, TOPIC_NOT_FOUND, PUBLISH_FAILED, SUBSCRIBE_FAILED, SESSION_INVALID, and others.
Best practices
- Use the event format
brokerTag:topicTageverywhere (produce, consume, topics). - Create brokers and topics via
messaging.create()andmessaging.topics.create()so they exist before producing/consuming. - Use environment-specific configs (e.g. different credentials per
sluginenvs). - Document topic payloads with the
samplefield when creating topics. - Handle failures in consume callbacks (retries, logging) so the broker can track success/failure.
- Use session when producing on behalf of a user so messages are tied to user context.
- Use BrokersService for replay, DLQ, and idempotent publish when needed.