Building Workflows
This guide covers the complete code-first API for defining workflows using ductape.workflows.define().
Workflow Definition
Define a workflow by passing a configuration object with a handler function:
const workflow = await ductape.workflows.define({
product: 'my-product', // Target product
tag: 'workflow-tag', // Unique identifier
name: 'Workflow Name', // Display name
description: 'Description', // Optional
handler: async (ctx) => {
// Your workflow logic
return { result: 'data' };
},
});
Definition Options
Basic Properties
await ductape.workflows.define({
product: 'my-product',
tag: 'order-process',
name: 'Order Processing',
description: 'Handles complete order fulfillment',
// Workflow options
options: {
timeout: 300000, // 5 minutes max
rollback_strategy: 'reverse_all', // How to handle failures
},
// Environment configurations
envs: [
{ slug: 'dev', active: true },
{ slug: 'staging', active: true },
{ slug: 'prd', active: true },
],
handler: async (ctx) => {
// ...
},
});
Signals
Define signals that external systems can send to the workflow:
await ductape.workflows.define({
// ...
signals: {
'approve': { input: { approver_id: 'string', comments: 'string' } },
'cancel': { input: { reason: 'string' } },
},
handler: async (ctx) => {
// Wait for approval signal
const approval = await ctx.waitForSignal('approve', { timeout: '24h' });
console.log('Approved by:', approval.approver_id);
},
});
Queries
Define queries to check workflow state:
await ductape.workflows.define({
// ...
queries: {
'getProgress': {
handler: (ctx) => ({
completed_steps: ctx.completed_steps,
current_step: ctx.current_step,
}),
},
},
handler: async (ctx) => {
// ...
},
});
Working with Steps
The ctx.step() Function
Steps are the building blocks of workflows:
const result = await ctx.step(
'step-tag', // Unique identifier
handler, // Async function
rollback?, // Optional rollback function
options? // Optional step options
);
Basic Step
const user = await ctx.step('create-user', async () => {
return ctx.database.insert({
database: 'users-db',
event: 'create-user',
data: { email: ctx.input.email },
});
});
// Use the result in subsequent steps
console.log('Created user:', user.id);
Step with Rollback
const charge = await ctx.step(
'charge-card',
// Handler
async () => {
return ctx.action.run({
app: 'stripe',
event: 'create-charge',
input: { body: { amount: ctx.input.amount } },
});
},
// Rollback - called if a later step fails
async (result) => {
await ctx.action.run({
app: 'stripe',
event: 'refund',
input: { body: { chargeId: result.id } },
});
}
);
Step Options
await ctx.step(
'send-notification',
async () => {
await ctx.notification.email({ /* ... */ });
},
null, // No rollback
{
allow_fail: true, // Continue on failure
retries: 3, // Retry count
retry_delay: 1000, // Delay between retries (ms)
timeout: 30000, // Step timeout (ms)
critical: false, // Mark as critical for rollback
}
);
Using Ductape Components
ctx.action - API Calls
Call external APIs through connected apps:
// Basic call
const result = await ctx.action.run({
app: 'stripe',
event: 'create-charge',
input: {
body: { amount: 1000, currency: 'usd' },
},
});
// With retries and timeout
const result = await ctx.action.run({
app: 'external-api',
event: 'fetch-data',
input: {
body: { query: ctx.input.query },
headers: { 'X-Request-ID': ctx.workflow_id },
},
retries: 3,
timeout: 10000,
});
ctx.database - Database Operations
// Insert
const record = await ctx.database.insert({
database: 'orders-db',
event: 'create-order',
data: {
customer_id: ctx.input.customerId,
total: ctx.input.amount,
items: ctx.input.items,
},
});
// Query
const orders = await ctx.database.query({
database: 'orders-db',
event: 'find-by-customer',
params: { customerId: ctx.input.customerId },
});
// Execute (for custom operations)
const result = await ctx.database.execute({
database: 'analytics-db',
event: 'aggregate-sales',
input: { startDate: ctx.input.from, endDate: ctx.input.to },
});
// Update
await ctx.database.update({
database: 'orders-db',
event: 'update-status',
where: { id: ctx.input.orderId },
data: { status: 'shipped' },
});
// Delete
await ctx.database.delete({
database: 'orders-db',
event: 'remove-order',
where: { id: ctx.input.orderId },
});
ctx.graph - Graph Database
// Create node
const node = await ctx.graph.createNode({
graph: 'social-graph',
labels: ['User'],
properties: { name: ctx.input.name, email: ctx.input.email },
});
// Update node
await ctx.graph.updateNode({
graph: 'social-graph',
id: node.id,
properties: { verified: true },
});
// Create relationship
await ctx.graph.createRelationship({
graph: 'social-graph',
from: ctx.input.userId,
to: ctx.input.friendId,
type: 'FOLLOWS',
properties: { since: new Date().toISOString() },
});
// Query
const connections = await ctx.graph.query({
graph: 'social-graph',
action: 'find-connections',
params: { userId: ctx.input.userId, depth: 2 },
});
// Execute custom action
const result = await ctx.graph.execute({
graph: 'social-graph',
action: 'compute-influence',
input: { userId: ctx.input.userId },
});
// Delete node
await ctx.graph.deleteNode({
graph: 'social-graph',
id: ctx.input.nodeId,
});
// Delete relationship
await ctx.graph.deleteRelationship({
graph: 'social-graph',
id: ctx.input.relationshipId,
});
ctx.notification - Send Notifications
// Email
await ctx.notification.email({
notification: 'transactional',
event: 'order-confirmation',
recipients: [ctx.input.email],
subject: { orderId: ctx.input.orderId },
template: {
orderId: ctx.input.orderId,
items: ctx.input.items,
total: ctx.input.total,
},
});
// SMS
await ctx.notification.sms({
notification: 'alerts',
event: 'verification-code',
phones: [ctx.input.phone],
message: { code: ctx.input.code },
});
// Push notification
await ctx.notification.push({
notification: 'mobile-app',
event: 'order-shipped',
tokens: [ctx.input.deviceToken],
title: { status: 'Shipped' },
body: { trackingNumber: ctx.input.trackingNumber },
data: { orderId: ctx.input.orderId },
});
// Generic send
await ctx.notification.send({
notification: 'multi-channel',
event: 'urgent-alert',
input: {
email: { recipients: [ctx.input.email] },
sms: { phones: [ctx.input.phone] },
push: { tokens: [ctx.input.token] },
},
retries: 3,
});
ctx.storage - File Operations
// Upload file
const uploaded = await ctx.storage.upload({
storage: 'documents',
event: 'upload-invoice',
input: {
buffer: ctx.input.fileData,
fileName: `invoice-${ctx.input.orderId}.pdf`,
mimeType: 'application/pdf',
},
retries: 2,
});
console.log('File URL:', uploaded.url);
console.log('File key:', uploaded.file_key);
// Download file
const file = await ctx.storage.download({
storage: 'templates',
event: 'get-template',
input: { file_key: 'invoice-template.pdf' },
});
console.log('Content:', file.content);
console.log('Size:', file.size);
// Delete file
await ctx.storage.delete({
storage: 'temp-files',
event: 'cleanup',
input: { file_key: ctx.input.tempFileKey },
});
ctx.publish - Message Broker
// Send message
await ctx.publish.send({
broker: 'order-events',
event: 'order-created',
input: {
message: {
orderId: ctx.input.orderId,
customerId: ctx.input.customerId,
items: ctx.input.items,
timestamp: Date.now(),
},
},
retries: 2,
});
ctx.quota - Rate Limiting
// Check quota before proceeding
const quotaResult = await ctx.quota.execute({
quota: 'api-rate-limit',
input: { key: ctx.input.userId },
timeout: 5000,
});
if (!quotaResult.allowed) {
throw new Error('Rate limit exceeded');
}
ctx.fallback - Failure Handling
// Use fallback for resilient operations
const result = await ctx.fallback.execute({
fallback: 'payment-fallback',
input: {
amount: ctx.input.amount,
method: ctx.input.paymentMethod,
},
timeout: 30000,
});
Control Flow
ctx.sleep() - Pause Execution
// Wait 5 seconds
await ctx.sleep(5000);
// Wait using duration string
await ctx.sleep('5m'); // 5 minutes
await ctx.sleep('1h'); // 1 hour
await ctx.sleep('1d'); // 1 day
ctx.waitForSignal() - Wait for External Event
// Wait for a single signal
const approval = await ctx.waitForSignal('order-approved', {
timeout: '24h',
});
// Wait for any of multiple signals
const result = await ctx.waitForSignal(['approved', 'rejected'], {
timeout: '48h',
});
// Access signal payload
console.log('Approved by:', result.approvedBy);
ctx.checkpoint() - Save Progress
// Save state for recovery
await ctx.checkpoint('payment-complete', {
chargeId: payment.id,
amount: ctx.input.amount,
timestamp: Date.now(),
});
ctx.triggerRollback() - Manual Rollback
// Manually trigger rollback
if (fraudCheck.score > 0.8) {
const result = await ctx.triggerRollback('High fraud risk detected');
console.log('Rolled back steps:', result.rolled_back_steps);
return { success: false, reason: 'fraud_detected' };
}
ctx.workflow() - Child Workflows
// Run another workflow as a child
const paymentResult = await ctx.workflow(
'child-payment-123', // Child workflow ID
'payment-processing', // Workflow tag
{ amount: ctx.input.amount }, // Input
{
timeout: '5m',
retries: 2,
parent_close_policy: 'terminate',
}
);
State Management
ctx.setState() and ctx.getState()
// Save state
ctx.setState('retryCount', 0);
ctx.setState('lastError', null);
// Later, retrieve state
const retryCount = ctx.getState<number>('retryCount') || 0;
if (retryCount < 3) {
ctx.setState('retryCount', retryCount + 1);
// Retry logic...
}
// Access all state
console.log('Current state:', ctx.state);
ctx.steps - Access Step Results
// Access results from previous steps
const userResult = ctx.steps['create-user'];
console.log('User ID:', userResult.id);
// Check completed steps
console.log('Completed:', ctx.completed_steps);
console.log('Current:', ctx.current_step);
Data Access
ctx.variable() and ctx.constant()
// Access app variables
const apiVersion = ctx.variable('stripe', 'api_version');
// Access app constants
const defaultCurrency = ctx.constant('stripe', 'default_currency');
ctx.default() - Fallback Values
// Provide default values
const currency = ctx.default(ctx.input.currency, 'usd');
const retries = ctx.default(ctx.input.retries, 3);
ctx.transform - Data Transformations
// String transformations
const upper = ctx.transform.upper(ctx.input.name);
const lower = ctx.transform.lower(ctx.input.email);
const trimmed = ctx.transform.trim(ctx.input.text);
// JSON operations
const parsed = ctx.transform.parseJson<MyType>(ctx.input.jsonString);
const stringified = ctx.transform.stringify(ctx.input.data);
// Date operations
const now = ctx.transform.now();
const formatted = ctx.transform.formatDate(Date.now(), 'YYYY-MM-DD');
// Array/object operations
const length = ctx.transform.length(ctx.input.items);
const size = ctx.transform.size(ctx.input.metadata);
const parts = ctx.transform.split(ctx.input.csv, ',');
const joined = ctx.transform.join(ctx.input.tags, ', ');
Logging
// Log messages at different levels
ctx.log.debug('Processing step', { step: 'create-user' });
ctx.log.info('User created', { userId: user.id });
ctx.log.warn('Retry attempt', { attempt: 2, maxRetries: 3 });
ctx.log.error('Step failed', { error: err.message });
Complete Example
await ductape.workflows.define({
product: 'my-product',
tag: 'order-fulfillment',
name: 'Order Fulfillment',
description: 'Complete order processing workflow',
options: {
timeout: 300000,
rollback_strategy: 'reverse_all',
},
signals: {
'cancel-order': { input: { reason: 'string' } },
},
handler: async (ctx) => {
ctx.log.info('Starting order fulfillment', { orderId: ctx.input.orderId });
// Step 1: Validate order
const validation = await ctx.step('validate', async () => {
return ctx.action.run({
app: 'inventory-api',
event: 'check-availability',
input: { body: { items: ctx.input.items } },
});
});
if (!validation.available) {
return { success: false, reason: 'items_unavailable' };
}
// Step 2: Process payment (with rollback)
const payment = await ctx.step(
'payment',
async () => {
return ctx.action.run({
app: 'stripe',
event: 'create-charge',
input: {
body: {
amount: ctx.input.amount,
customer: ctx.input.customerId,
},
},
retries: 3,
});
},
async (result) => {
await ctx.action.run({
app: 'stripe',
event: 'refund',
input: { body: { chargeId: result.id } },
});
}
);
// Checkpoint after payment
await ctx.checkpoint('payment-complete', { chargeId: payment.id });
// Step 3: Reserve inventory (with rollback)
await ctx.step(
'reserve-inventory',
async () => {
return ctx.action.run({
app: 'inventory-api',
event: 'reserve',
input: { body: { items: ctx.input.items } },
});
},
async () => {
await ctx.action.run({
app: 'inventory-api',
event: 'release',
input: { body: { items: ctx.input.items } },
});
}
);
// Step 4: Create order record
const order = await ctx.step('create-order', async () => {
return ctx.database.insert({
database: 'orders-db',
event: 'create',
data: {
customer_id: ctx.input.customerId,
items: ctx.input.items,
total: ctx.input.amount,
payment_id: payment.id,
status: 'processing',
},
});
});
// Step 5: Send confirmation (non-critical)
await ctx.step(
'send-confirmation',
async () => {
await ctx.notification.email({
notification: 'transactional',
event: 'order-confirmed',
recipients: [ctx.input.email],
template: {
orderId: order.id,
items: ctx.input.items,
total: ctx.input.amount,
},
});
},
null,
{ allow_fail: true }
);
// Step 6: Queue for fulfillment
await ctx.step('queue-fulfillment', async () => {
await ctx.publish.send({
broker: 'warehouse',
event: 'new-order',
input: {
message: {
orderId: order.id,
items: ctx.input.items,
},
},
});
});
ctx.log.info('Order fulfilled successfully', { orderId: order.id });
return {
success: true,
orderId: order.id,
chargeId: payment.id,
};
},
});
Next Steps
- Step Types - Detailed reference for all ctx methods
- Execution & Rollbacks - How execution works
- Examples - Real-world patterns