Skip to main content
Preview
Preview Feature — This feature is currently in preview and under active development. APIs and functionality may change. We recommend testing thoroughly before using in production.

Building Workflows

This guide covers the complete code-first API for defining workflows using ductape.workflows.define().

Workflow Definition

Define a workflow by passing a configuration object with a handler function:

const workflow = await ductape.workflows.define({
product: 'my-product', // Target product
tag: 'workflow-tag', // Unique identifier
name: 'Workflow Name', // Display name
description: 'Description', // Optional
handler: async (ctx) => {
// Your workflow logic
return { result: 'data' };
},
});

Definition Options

Basic Properties

await ductape.workflows.define({
product: 'my-product',
tag: 'order-process',
name: 'Order Processing',
description: 'Handles complete order fulfillment',

// Workflow options
options: {
timeout: 300000, // 5 minutes max
rollback_strategy: 'reverse_all', // How to handle failures
},

// Environment configurations
envs: [
{ slug: 'dev', active: true },
{ slug: 'staging', active: true },
{ slug: 'prd', active: true },
],

handler: async (ctx) => {
// ...
},
});

Signals

Define signals that external systems can send to the workflow:

await ductape.workflows.define({
// ...
signals: {
'approve': { input: { approver_id: 'string', comments: 'string' } },
'cancel': { input: { reason: 'string' } },
},

handler: async (ctx) => {
// Wait for approval signal
const approval = await ctx.waitForSignal('approve', { timeout: '24h' });
console.log('Approved by:', approval.approver_id);
},
});

Queries

Define queries to check workflow state:

await ductape.workflows.define({
// ...
queries: {
'getProgress': {
handler: (ctx) => ({
completed_steps: ctx.completed_steps,
current_step: ctx.current_step,
}),
},
},
handler: async (ctx) => {
// ...
},
});

Working with Steps

The ctx.step() Function

Steps are the building blocks of workflows:

const result = await ctx.step(
'step-tag', // Unique identifier
handler, // Async function
rollback?, // Optional rollback function
options? // Optional step options
);

Basic Step

const user = await ctx.step('create-user', async () => {
return ctx.database.insert({
database: 'users-db',
event: 'create-user',
data: { email: ctx.input.email },
});
});

// Use the result in subsequent steps
console.log('Created user:', user.id);

Step with Rollback

const charge = await ctx.step(
'charge-card',
// Handler
async () => {
return ctx.action.run({
app: 'stripe',
event: 'create-charge',
input: { body: { amount: ctx.input.amount } },
});
},
// Rollback - called if a later step fails
async (result) => {
await ctx.action.run({
app: 'stripe',
event: 'refund',
input: { body: { chargeId: result.id } },
});
}
);

Step Options

await ctx.step(
'send-notification',
async () => {
await ctx.notification.email({ /* ... */ });
},
null, // No rollback
{
allow_fail: true, // Continue on failure
retries: 3, // Retry count
retry_delay: 1000, // Delay between retries (ms)
timeout: 30000, // Step timeout (ms)
critical: false, // Mark as critical for rollback
}
);

Using Ductape Components

ctx.action - API Calls

Call external APIs through connected apps:

// Basic call
const result = await ctx.action.run({
app: 'stripe',
event: 'create-charge',
input: {
body: { amount: 1000, currency: 'usd' },
},
});

// With retries and timeout
const result = await ctx.action.run({
app: 'external-api',
event: 'fetch-data',
input: {
body: { query: ctx.input.query },
headers: { 'X-Request-ID': ctx.workflow_id },
},
retries: 3,
timeout: 10000,
});

ctx.database - Database Operations

// Insert
const record = await ctx.database.insert({
database: 'orders-db',
event: 'create-order',
data: {
customer_id: ctx.input.customerId,
total: ctx.input.amount,
items: ctx.input.items,
},
});

// Query
const orders = await ctx.database.query({
database: 'orders-db',
event: 'find-by-customer',
params: { customerId: ctx.input.customerId },
});

// Execute (for custom operations)
const result = await ctx.database.execute({
database: 'analytics-db',
event: 'aggregate-sales',
input: { startDate: ctx.input.from, endDate: ctx.input.to },
});

// Update
await ctx.database.update({
database: 'orders-db',
event: 'update-status',
where: { id: ctx.input.orderId },
data: { status: 'shipped' },
});

// Delete
await ctx.database.delete({
database: 'orders-db',
event: 'remove-order',
where: { id: ctx.input.orderId },
});

ctx.graph - Graph Database

// Create node
const node = await ctx.graph.createNode({
graph: 'social-graph',
labels: ['User'],
properties: { name: ctx.input.name, email: ctx.input.email },
});

// Update node
await ctx.graph.updateNode({
graph: 'social-graph',
id: node.id,
properties: { verified: true },
});

// Create relationship
await ctx.graph.createRelationship({
graph: 'social-graph',
from: ctx.input.userId,
to: ctx.input.friendId,
type: 'FOLLOWS',
properties: { since: new Date().toISOString() },
});

// Query
const connections = await ctx.graph.query({
graph: 'social-graph',
action: 'find-connections',
params: { userId: ctx.input.userId, depth: 2 },
});

// Execute custom action
const result = await ctx.graph.execute({
graph: 'social-graph',
action: 'compute-influence',
input: { userId: ctx.input.userId },
});

// Delete node
await ctx.graph.deleteNode({
graph: 'social-graph',
id: ctx.input.nodeId,
});

// Delete relationship
await ctx.graph.deleteRelationship({
graph: 'social-graph',
id: ctx.input.relationshipId,
});

ctx.notification - Send Notifications

// Email
await ctx.notification.email({
notification: 'transactional',
event: 'order-confirmation',
recipients: [ctx.input.email],
subject: { orderId: ctx.input.orderId },
template: {
orderId: ctx.input.orderId,
items: ctx.input.items,
total: ctx.input.total,
},
});

// SMS
await ctx.notification.sms({
notification: 'alerts',
event: 'verification-code',
phones: [ctx.input.phone],
message: { code: ctx.input.code },
});

// Push notification
await ctx.notification.push({
notification: 'mobile-app',
event: 'order-shipped',
tokens: [ctx.input.deviceToken],
title: { status: 'Shipped' },
body: { trackingNumber: ctx.input.trackingNumber },
data: { orderId: ctx.input.orderId },
});

// Generic send
await ctx.notification.send({
notification: 'multi-channel',
event: 'urgent-alert',
input: {
email: { recipients: [ctx.input.email] },
sms: { phones: [ctx.input.phone] },
push: { tokens: [ctx.input.token] },
},
retries: 3,
});

ctx.storage - File Operations

// Upload file
const uploaded = await ctx.storage.upload({
storage: 'documents',
event: 'upload-invoice',
input: {
buffer: ctx.input.fileData,
fileName: `invoice-${ctx.input.orderId}.pdf`,
mimeType: 'application/pdf',
},
retries: 2,
});

console.log('File URL:', uploaded.url);
console.log('File key:', uploaded.file_key);

// Download file
const file = await ctx.storage.download({
storage: 'templates',
event: 'get-template',
input: { file_key: 'invoice-template.pdf' },
});

console.log('Content:', file.content);
console.log('Size:', file.size);

// Delete file
await ctx.storage.delete({
storage: 'temp-files',
event: 'cleanup',
input: { file_key: ctx.input.tempFileKey },
});

ctx.publish - Message Broker

// Send message
await ctx.publish.send({
broker: 'order-events',
event: 'order-created',
input: {
message: {
orderId: ctx.input.orderId,
customerId: ctx.input.customerId,
items: ctx.input.items,
timestamp: Date.now(),
},
},
retries: 2,
});

ctx.quota - Rate Limiting

// Check quota before proceeding
const quotaResult = await ctx.quota.execute({
quota: 'api-rate-limit',
input: { key: ctx.input.userId },
timeout: 5000,
});

if (!quotaResult.allowed) {
throw new Error('Rate limit exceeded');
}

ctx.fallback - Failure Handling

// Use fallback for resilient operations
const result = await ctx.fallback.execute({
fallback: 'payment-fallback',
input: {
amount: ctx.input.amount,
method: ctx.input.paymentMethod,
},
timeout: 30000,
});

Control Flow

ctx.sleep() - Pause Execution

// Wait 5 seconds
await ctx.sleep(5000);

// Wait using duration string
await ctx.sleep('5m'); // 5 minutes
await ctx.sleep('1h'); // 1 hour
await ctx.sleep('1d'); // 1 day

ctx.waitForSignal() - Wait for External Event

// Wait for a single signal
const approval = await ctx.waitForSignal('order-approved', {
timeout: '24h',
});

// Wait for any of multiple signals
const result = await ctx.waitForSignal(['approved', 'rejected'], {
timeout: '48h',
});

// Access signal payload
console.log('Approved by:', result.approvedBy);

ctx.checkpoint() - Save Progress

// Save state for recovery
await ctx.checkpoint('payment-complete', {
chargeId: payment.id,
amount: ctx.input.amount,
timestamp: Date.now(),
});

ctx.triggerRollback() - Manual Rollback

// Manually trigger rollback
if (fraudCheck.score > 0.8) {
const result = await ctx.triggerRollback('High fraud risk detected');
console.log('Rolled back steps:', result.rolled_back_steps);
return { success: false, reason: 'fraud_detected' };
}

ctx.workflow() - Child Workflows

// Run another workflow as a child
const paymentResult = await ctx.workflow(
'child-payment-123', // Child workflow ID
'payment-processing', // Workflow tag
{ amount: ctx.input.amount }, // Input
{
timeout: '5m',
retries: 2,
parent_close_policy: 'terminate',
}
);

State Management

ctx.setState() and ctx.getState()

// Save state
ctx.setState('retryCount', 0);
ctx.setState('lastError', null);

// Later, retrieve state
const retryCount = ctx.getState<number>('retryCount') || 0;
if (retryCount < 3) {
ctx.setState('retryCount', retryCount + 1);
// Retry logic...
}

// Access all state
console.log('Current state:', ctx.state);

ctx.steps - Access Step Results

// Access results from previous steps
const userResult = ctx.steps['create-user'];
console.log('User ID:', userResult.id);

// Check completed steps
console.log('Completed:', ctx.completed_steps);
console.log('Current:', ctx.current_step);

Data Access

ctx.variable() and ctx.constant()

// Access app variables
const apiVersion = ctx.variable('stripe', 'api_version');

// Access app constants
const defaultCurrency = ctx.constant('stripe', 'default_currency');

ctx.default() - Fallback Values

// Provide default values
const currency = ctx.default(ctx.input.currency, 'usd');
const retries = ctx.default(ctx.input.retries, 3);

ctx.transform - Data Transformations

// String transformations
const upper = ctx.transform.upper(ctx.input.name);
const lower = ctx.transform.lower(ctx.input.email);
const trimmed = ctx.transform.trim(ctx.input.text);

// JSON operations
const parsed = ctx.transform.parseJson<MyType>(ctx.input.jsonString);
const stringified = ctx.transform.stringify(ctx.input.data);

// Date operations
const now = ctx.transform.now();
const formatted = ctx.transform.formatDate(Date.now(), 'YYYY-MM-DD');

// Array/object operations
const length = ctx.transform.length(ctx.input.items);
const size = ctx.transform.size(ctx.input.metadata);
const parts = ctx.transform.split(ctx.input.csv, ',');
const joined = ctx.transform.join(ctx.input.tags, ', ');

Logging

// Log messages at different levels
ctx.log.debug('Processing step', { step: 'create-user' });
ctx.log.info('User created', { userId: user.id });
ctx.log.warn('Retry attempt', { attempt: 2, maxRetries: 3 });
ctx.log.error('Step failed', { error: err.message });

Complete Example

await ductape.workflows.define({
product: 'my-product',
tag: 'order-fulfillment',
name: 'Order Fulfillment',
description: 'Complete order processing workflow',

options: {
timeout: 300000,
rollback_strategy: 'reverse_all',
},

signals: {
'cancel-order': { input: { reason: 'string' } },
},

handler: async (ctx) => {
ctx.log.info('Starting order fulfillment', { orderId: ctx.input.orderId });

// Step 1: Validate order
const validation = await ctx.step('validate', async () => {
return ctx.action.run({
app: 'inventory-api',
event: 'check-availability',
input: { body: { items: ctx.input.items } },
});
});

if (!validation.available) {
return { success: false, reason: 'items_unavailable' };
}

// Step 2: Process payment (with rollback)
const payment = await ctx.step(
'payment',
async () => {
return ctx.action.run({
app: 'stripe',
event: 'create-charge',
input: {
body: {
amount: ctx.input.amount,
customer: ctx.input.customerId,
},
},
retries: 3,
});
},
async (result) => {
await ctx.action.run({
app: 'stripe',
event: 'refund',
input: { body: { chargeId: result.id } },
});
}
);

// Checkpoint after payment
await ctx.checkpoint('payment-complete', { chargeId: payment.id });

// Step 3: Reserve inventory (with rollback)
await ctx.step(
'reserve-inventory',
async () => {
return ctx.action.run({
app: 'inventory-api',
event: 'reserve',
input: { body: { items: ctx.input.items } },
});
},
async () => {
await ctx.action.run({
app: 'inventory-api',
event: 'release',
input: { body: { items: ctx.input.items } },
});
}
);

// Step 4: Create order record
const order = await ctx.step('create-order', async () => {
return ctx.database.insert({
database: 'orders-db',
event: 'create',
data: {
customer_id: ctx.input.customerId,
items: ctx.input.items,
total: ctx.input.amount,
payment_id: payment.id,
status: 'processing',
},
});
});

// Step 5: Send confirmation (non-critical)
await ctx.step(
'send-confirmation',
async () => {
await ctx.notification.email({
notification: 'transactional',
event: 'order-confirmed',
recipients: [ctx.input.email],
template: {
orderId: order.id,
items: ctx.input.items,
total: ctx.input.amount,
},
});
},
null,
{ allow_fail: true }
);

// Step 6: Queue for fulfillment
await ctx.step('queue-fulfillment', async () => {
await ctx.publish.send({
broker: 'warehouse',
event: 'new-order',
input: {
message: {
orderId: order.id,
items: ctx.input.items,
},
},
});
});

ctx.log.info('Order fulfilled successfully', { orderId: order.id });

return {
success: true,
orderId: order.id,
chargeId: payment.id,
};
},
});

Next Steps