Skip to main content

Building a Real-time Chat Backend

Learn how to build a scalable real-time chat backend with message brokers, workflows, presence detection, and notification systems using Ductape SDK.

What You'll Build

  • Real-time messaging with message brokers
  • User presence and typing indicators
  • Message delivery and read receipts
  • File upload handling
  • Group chat management
  • Message search and history
  • Automated moderation workflows
  • Push notifications
  • Message encryption

Prerequisites

  • Node.js and npm installed
  • Ductape account and API credentials
  • Basic understanding of TypeScript/JavaScript
  • WebSocket knowledge (helpful)

Setup

npm install @ductape/sdk express socket.io dotenv
npm install --save-dev @types/express @types/node typescript

Create .env:

DUCTAPE_API_KEY=your_api_key
PORT=3000

Initialize Ductape and Express

import { Ductape } from '@ductape/sdk';
import express from 'express';
import { createServer } from 'http';
import { Server as SocketIOServer } from 'socket.io';

const ductape = new Ductape({
apiKey: process.env.DUCTAPE_API_KEY!
});

const app = express();
const httpServer = createServer(app);
const io = new SocketIOServer(httpServer, {
cors: {
origin: '*'
}
});

app.use(express.json());

Database Schema

// Users table
await ductape.databases.schema.create('users', {
username: { type: 'String', unique: true, required: true },
email: { type: 'String', unique: true, required: true },
display_name: { type: 'String', required: true },
avatar_url: { type: 'String' },
status: { type: 'String', default: 'offline' }, // online, offline, away
last_seen: { type: 'Date' },
created_at: { type: 'Date', default: 'now' }
});

// Conversations table
await ductape.databases.schema.create('conversations', {
type: { type: 'String', required: true }, // direct, group, channel
name: { type: 'String' },
avatar_url: { type: 'String' },
participant_ids: { type: 'Array', required: true },
admin_ids: { type: 'Array' },
last_message_id: { type: 'String' },
last_message_at: { type: 'Date' },
created_by: { type: 'String', required: true },
settings: { type: 'JSON', default: {} },
created_at: { type: 'Date', default: 'now' },
updated_at: { type: 'Date', default: 'now' }
});

// Messages table
await ductape.databases.schema.create('messages', {
conversation_id: { type: 'String', required: true },
sender_id: { type: 'String', required: true },
content: { type: 'String' },
type: { type: 'String', default: 'text' }, // text, image, file, system, call
encrypted_content: { type: 'String' },
file_url: { type: 'String' },
file_name: { type: 'String' },
file_size: { type: 'Number' },
metadata: { type: 'JSON' },
reactions: { type: 'JSON', default: {} },
reply_to_id: { type: 'String' },
is_edited: { type: 'Boolean', default: false },
is_deleted: { type: 'Boolean', default: false },
is_flagged: { type: 'Boolean', default: false },
edited_at: { type: 'Date' },
deleted_at: { type: 'Date' },
created_at: { type: 'Date', default: 'now' }
});

// Message status table
await ductape.databases.schema.create('message_status', {
message_id: { type: 'String', required: true },
user_id: { type: 'String', required: true },
status: { type: 'String', required: true }, // sent, delivered, read
delivered_at: { type: 'Date' },
read_at: { type: 'Date' },
created_at: { type: 'Date', default: 'now' }
});

// Presence table
await ductape.databases.schema.create('presence', {
user_id: { type: 'String', unique: true, required: true },
status: { type: 'String', required: true }, // online, offline, away
conversation_id: { type: 'String' },
is_typing: { type: 'Boolean', default: false },
last_activity: { type: 'Date', default: 'now' },
device_info: { type: 'JSON' }
});

// Create indexes
await ductape.databases.schema.createIndex('conversations', ['participant_ids']);
await ductape.databases.schema.createIndex('messages', ['conversation_id', 'created_at']);
await ductape.databases.schema.createIndex('messages', ['sender_id']);
await ductape.databases.schema.createIndex('message_status', ['message_id', 'user_id']);
await ductape.databases.schema.createIndex('presence', ['user_id']);

Message Broker Setup

// Configure message broker topics
const MESSAGE_TOPICS = {
NEW_MESSAGE: 'chat.message.new',
MESSAGE_UPDATED: 'chat.message.updated',
MESSAGE_DELETED: 'chat.message.deleted',
TYPING: 'chat.typing',
PRESENCE: 'chat.presence',
READ_RECEIPT: 'chat.receipt.read'
};

// Publish message to broker
async function publishMessage(topic: string, data: any) {
await ductape.messageBrokers.publish({
topic,
message: {
...data,
timestamp: new Date().toISOString()
}
});
}

// Subscribe to message events
async function subscribeToMessages(conversationId: string, callback: (message: any) => void) {
await ductape.messageBrokers.subscribe({
topic: MESSAGE_TOPICS.NEW_MESSAGE,
filter: { conversation_id: conversationId },
handler: async (message) => {
callback(message);
}
});
}

Sending Messages with Workflows

// Send message function
async function sendMessage(data: {
conversation_id: string;
sender_id: string;
content: string;
type?: string;
reply_to_id?: string;
}) {
// Start message workflow
const workflow = await ductape.workflows.execute({
workflow: 'send-message',
input: {
conversation_id: data.conversation_id,
sender_id: data.sender_id,
content: data.content,
type: data.type || 'text',
reply_to_id: data.reply_to_id
}
});

return workflow;
}

Message Sending Workflow

Create workflows/send-message.ts:

export const sendMessageWorkflow = {
name: 'send-message',
version: '1.0.0',
steps: [
// Step 1: Validate conversation access
{
name: 'validate-access',
type: 'function',
handler: async (context: any) => {
const conversation = await ductape.databases.findOne({
table: 'conversations',
where: { id: context.input.conversation_id }
});

if (!conversation.row) {
throw new Error('Conversation not found');
}

// Check if user is participant
if (!conversation.row.participant_ids.includes(context.input.sender_id)) {
throw new Error('User not authorized');
}

return { conversation: conversation.row };
}
},

// Step 2: Check content moderation
{
name: 'content-moderation',
type: 'function',
handler: async (context: any) => {
// Simple profanity check (in production, use AI moderation)
const profanityWords = ['spam', 'abuse']; // Add more
const content = context.input.content.toLowerCase();

const hasProfanity = profanityWords.some(word => content.includes(word));

if (hasProfanity) {
return {
flagged: true,
reason: 'Content contains inappropriate language'
};
}

return { flagged: false };
}
},

// Step 3: Create message
{
name: 'create-message',
type: 'function',
handler: async (context: any) => {
const message = await ductape.databases.insert({
table: 'messages',
data: {
conversation_id: context.input.conversation_id,
sender_id: context.input.sender_id,
content: context.input.content,
type: context.input.type,
reply_to_id: context.input.reply_to_id,
is_flagged: context.steps['content-moderation'].flagged
}
});

return { message: message.rows[0] };
}
},

// Step 4: Update conversation
{
name: 'update-conversation',
type: 'function',
handler: async (context: any) => {
const message = context.steps['create-message'].message;

await ductape.databases.update({
table: 'conversations',
where: { id: context.input.conversation_id },
data: {
last_message_id: message.id,
last_message_at: new Date(),
updated_at: new Date()
}
});

return { updated: true };
}
},

// Step 5: Create message status for all participants
{
name: 'create-status',
type: 'function',
handler: async (context: any) => {
const conversation = context.steps['validate-access'].conversation;
const message = context.steps['create-message'].message;

for (const participantId of conversation.participant_ids) {
await ductape.databases.insert({
table: 'message_status',
data: {
message_id: message.id,
user_id: participantId,
status: participantId === context.input.sender_id ? 'read' : 'sent',
read_at: participantId === context.input.sender_id ? new Date() : null
}
});
}

return { created: true };
}
},

// Step 6: Publish to message broker
{
name: 'publish-message',
type: 'function',
handler: async (context: any) => {
const message = context.steps['create-message'].message;

await ductape.messageBrokers.publish({
topic: MESSAGE_TOPICS.NEW_MESSAGE,
message: {
conversation_id: context.input.conversation_id,
message: message
}
});

return { published: true };
}
},

// Step 7: Send push notifications
{
name: 'send-notifications',
type: 'function',
handler: async (context: any) => {
const conversation = context.steps['validate-access'].conversation;
const message = context.steps['create-message'].message;

// Get sender info
const sender = await ductape.databases.findOne({
table: 'users',
where: { id: context.input.sender_id }
});

// Send to all participants except sender
for (const participantId of conversation.participant_ids) {
if (participantId !== context.input.sender_id) {
// Check if user is online
const presence = await ductape.databases.findOne({
table: 'presence',
where: { user_id: participantId }
});

// Only send push if user is offline or away
if (!presence.row || presence.row.status !== 'online') {
await ductape.notifications.send({
channel: 'push',
to: participantId,
data: {
title: sender.row.display_name,
body: message.content.substring(0, 100),
conversation_id: context.input.conversation_id,
message_id: message.id
}
});
}
}
}

return { sent: true };
}
}
],

errorHandlers: [
{
step: 'validate-access',
handler: async (context: any, error: any) => {
console.error('Access validation failed:', error.message);
throw error;
}
},

{
step: 'content-moderation',
handler: async (context: any, error: any) => {
// Log moderation failure but don't block message
console.error('Moderation failed:', error.message);
return { flagged: false };
}
}
]
};

WebSocket Server with Socket.IO

// Socket.IO connection handling
io.on('connection', (socket) => {
console.log('User connected:', socket.id);

let currentUserId: string;

// Authenticate user
socket.on('authenticate', async (data: { user_id: string; token: string }) => {
// Verify token (implement your auth logic)
currentUserId = data.user_id;

// Update presence
await updatePresence(currentUserId, 'online', socket.id);

// Join user's conversations
const conversations = await getUserConversations(currentUserId);
conversations.forEach((conv: any) => {
socket.join(`conversation:${conv.id}`);
});

socket.emit('authenticated', { user_id: currentUserId });
});

// Send message
socket.on('send-message', async (data: {
conversation_id: string;
content: string;
type?: string;
reply_to_id?: string;
}) => {
try {
await sendMessage({
conversation_id: data.conversation_id,
sender_id: currentUserId,
content: data.content,
type: data.type,
reply_to_id: data.reply_to_id
});
} catch (error: any) {
socket.emit('error', { message: error.message });
}
});

// Typing indicator
socket.on('typing', async (data: { conversation_id: string; is_typing: boolean }) => {
await updateTypingStatus(currentUserId, data.conversation_id, data.is_typing);

// Publish to message broker
await publishMessage(MESSAGE_TOPICS.TYPING, {
user_id: currentUserId,
conversation_id: data.conversation_id,
is_typing: data.is_typing
});

// Broadcast to conversation
socket.to(`conversation:${data.conversation_id}`).emit('user-typing', {
user_id: currentUserId,
is_typing: data.is_typing
});
});

// Mark message as read
socket.on('mark-read', async (data: { message_id: string }) => {
await markMessageAsRead(data.message_id, currentUserId);

// Publish read receipt
await publishMessage(MESSAGE_TOPICS.READ_RECEIPT, {
message_id: data.message_id,
user_id: currentUserId,
read_at: new Date()
});
});

// Join conversation
socket.on('join-conversation', (data: { conversation_id: string }) => {
socket.join(`conversation:${data.conversation_id}`);
});

// Leave conversation
socket.on('leave-conversation', (data: { conversation_id: string }) => {
socket.leave(`conversation:${data.conversation_id}`);
});

// Disconnect
socket.on('disconnect', async () => {
if (currentUserId) {
await updatePresence(currentUserId, 'offline', null);
}
console.log('User disconnected:', socket.id);
});
});

Presence Management

// Update user presence
async function updatePresence(
userId: string,
status: 'online' | 'offline' | 'away',
socketId: string | null
) {
const existing = await ductape.databases.findOne({
table: 'presence',
where: { user_id: userId }
});

if (existing.row) {
await ductape.databases.update({
table: 'presence',
where: { user_id: userId },
data: {
status,
last_activity: new Date(),
device_info: socketId ? { socket_id: socketId } : existing.row.device_info
}
});
} else {
await ductape.databases.insert({
table: 'presence',
data: {
user_id: userId,
status,
device_info: socketId ? { socket_id: socketId } : {}
}
});
}

// Update user record
await ductape.databases.update({
table: 'users',
where: { id: userId },
data: {
status,
last_seen: new Date()
}
});

// Publish presence update
await publishMessage(MESSAGE_TOPICS.PRESENCE, {
user_id: userId,
status,
timestamp: new Date()
});
}

// Update typing status
async function updateTypingStatus(
userId: string,
conversationId: string,
isTyping: boolean
) {
await ductape.databases.update({
table: 'presence',
where: { user_id: userId },
data: {
conversation_id: isTyping ? conversationId : null,
is_typing: isTyping,
last_activity: new Date()
}
});
}

// Auto-clear inactive typing indicators
await ductape.jobs.schedule({
name: 'clear-stale-typing',
schedule: '*/30 * * * * *', // Every 30 seconds
handler: async () => {
const thirtySecondsAgo = new Date(Date.now() - 30000);

await ductape.databases.update({
table: 'presence',
where: {
is_typing: true,
last_activity: { $lt: thirtySecondsAgo }
},
data: {
is_typing: false,
conversation_id: null
}
});
}
});

File Upload with Storage

// Upload file for message
app.post('/upload', async (req, res) => {
try {
const { conversation_id, user_id } = req.body;
const file = req.files?.file; // Using express-fileupload middleware

if (!file) {
return res.status(400).json({ error: 'No file provided' });
}

// Upload to storage
const upload = await ductape.storage.upload({
file: file as any,
path: `chat/${conversation_id}/${Date.now()}`,
metadata: {
conversation_id,
user_id,
original_name: file.name
}
});

res.json({
url: upload.url,
file_name: file.name,
file_size: file.size,
file_type: file.mimetype
});
} catch (error: any) {
res.status(500).json({ error: error.message });
}
});

// Get file with signed URL
app.get('/files/:fileId', async (req, res) => {
try {
const signedUrl = await ductape.storage.getSignedUrl({
path: `chat/files/${req.params.fileId}`,
expiresIn: 3600 // 1 hour
});

res.json({ url: signedUrl });
} catch (error: any) {
res.status(404).json({ error: 'File not found' });
}
});
// Search messages in conversation
app.get('/conversations/:conversationId/search', async (req, res) => {
const { q, limit = 20, offset = 0 } = req.query;

const messages = await ductape.databases.find({
table: 'messages',
where: {
conversation_id: req.params.conversationId,
content: { $regex: q as string, $options: 'i' },
is_deleted: false
},
limit: Number(limit),
offset: Number(offset),
orderBy: { created_at: 'desc' }
});

res.json(messages.rows);
});

// Global search across all user's conversations
app.get('/search', async (req, res) => {
const { q, user_id, limit = 20 } = req.query;

// Get user's conversations
const conversations = await getUserConversations(user_id as string);
const conversationIds = conversations.map((c: any) => c.id);

// Search messages
const messages = await ductape.databases.find({
table: 'messages',
where: {
conversation_id: { $in: conversationIds },
content: { $regex: q as string, $options: 'i' },
is_deleted: false
},
limit: Number(limit),
orderBy: { created_at: 'desc' }
});

res.json(messages.rows);
});

Rate Limiting with Quotas

// Configure quotas
await ductape.quotas.configure({
'chat:send-message': {
limit: 100,
window: '1m',
per: 'user_id'
},
'chat:create-group': {
limit: 5,
window: '1h',
per: 'user_id'
},
'chat:upload-file': {
limit: 20,
window: '1h',
per: 'user_id'
}
});

// Check quota before sending message
async function checkSendMessageQuota(userId: string): Promise<boolean> {
const check = await ductape.quotas.check({
key: 'chat:send-message',
identifier: userId
});

if (!check.allowed) {
throw new Error('Rate limit exceeded. Please slow down.');
}

// Increment quota
await ductape.quotas.increment({
key: 'chat:send-message',
identifier: userId
});

return true;
}

Message Broker Consumers

// Subscribe to new messages for real-time delivery
await ductape.messageBrokers.subscribe({
topic: MESSAGE_TOPICS.NEW_MESSAGE,
handler: async (message) => {
const { conversation_id, message: msg } = message;

// Broadcast to all clients in conversation room
io.to(`conversation:${conversation_id}`).emit('new-message', msg);
}
});

// Subscribe to typing events
await ductape.messageBrokers.subscribe({
topic: MESSAGE_TOPICS.TYPING,
handler: async (message) => {
const { user_id, conversation_id, is_typing } = message;

io.to(`conversation:${conversation_id}`).emit('user-typing', {
user_id,
is_typing
});
}
});

// Subscribe to presence updates
await ductape.messageBrokers.subscribe({
topic: MESSAGE_TOPICS.PRESENCE,
handler: async (message) => {
const { user_id, status } = message;

// Broadcast to all users who have conversations with this user
const conversations = await getUserConversations(user_id);

conversations.forEach((conv: any) => {
io.to(`conversation:${conv.id}`).emit('user-presence', {
user_id,
status
});
});
}
});

// Subscribe to read receipts
await ductape.messageBrokers.subscribe({
topic: MESSAGE_TOPICS.READ_RECEIPT,
handler: async (message) => {
const { message_id, user_id, read_at } = message;

// Get message to find conversation
const msg = await ductape.databases.findOne({
table: 'messages',
where: { id: message_id }
});

if (msg.row) {
io.to(`conversation:${msg.row.conversation_id}`).emit('message-read', {
message_id,
user_id,
read_at
});
}
}
});

Automated Cleanup Jobs

// Delete old messages (data retention)
await ductape.jobs.schedule({
name: 'cleanup-old-messages',
schedule: '0 2 * * *', // Every day at 2 AM
handler: async () => {
const sixMonthsAgo = new Date();
sixMonthsAgo.setMonth(sixMonthsAgo.getMonth() - 6);

// Soft delete old messages
await ductape.databases.update({
table: 'messages',
where: {
created_at: { $lt: sixMonthsAgo },
is_deleted: false
},
data: {
is_deleted: true,
deleted_at: new Date(),
content: '[Message deleted]'
}
});
}
});

// Clean up offline users
await ductape.jobs.schedule({
name: 'cleanup-offline-presence',
schedule: '*/5 * * * *', // Every 5 minutes
handler: async () => {
const fiveMinutesAgo = new Date(Date.now() - 5 * 60 * 1000);

await ductape.databases.update({
table: 'presence',
where: {
status: 'online',
last_activity: { $lt: fiveMinutesAgo }
},
data: {
status: 'away'
}
});
}
});

Helper Functions

// Get user conversations
async function getUserConversations(userId: string) {
const conversations = await ductape.databases.find({
table: 'conversations',
where: {
participant_ids: { $in: [userId] }
},
orderBy: { last_message_at: 'desc' }
});

return conversations.rows;
}

// Mark message as read
async function markMessageAsRead(messageId: string, userId: string) {
await ductape.databases.update({
table: 'message_status',
where: {
message_id: messageId,
user_id: userId
},
data: {
status: 'read',
read_at: new Date()
}
});
}

// Get conversation messages
async function getConversationMessages(
conversationId: string,
limit: number = 50,
before?: Date
) {
const where: any = {
conversation_id: conversationId,
is_deleted: false
};

if (before) {
where.created_at = { $lt: before };
}

const messages = await ductape.databases.find({
table: 'messages',
where,
limit,
orderBy: { created_at: 'desc' }
});

return messages.rows.reverse();
}

Start Server

const PORT = process.env.PORT || 3000;

httpServer.listen(PORT, () => {
console.log(`Chat server running on port ${PORT}`);
console.log(`WebSocket server ready`);
});

Next Steps

  • Implement end-to-end encryption
  • Add voice and video calling
  • Create message threading
  • Implement reactions and emoji support
  • Add message forwarding
  • Create channel broadcasting
  • Set up analytics and monitoring
  • Implement AI-powered moderation
  • Add message translation