Workflows
Workflows are durable, multi-step processes that execute a series of operations with automatic state persistence, retries, and rollbacks. They are designed for complex business processes that need to survive failures and maintain consistency.
When to Use Workflows
Use Workflows when you need:
- Durability - Steps that must complete even if your server restarts
- Rollbacks - Automatic compensation when something fails mid-process
- Long-running processes - Operations that span minutes, hours, or days
- Complex dependencies - Steps that depend on multiple previous steps
- State checkpoints - Save progress and resume from where you left off
- Multi-step business processes - Any operation with multiple steps that should succeed or fail together
Quick Example
Here's a simple order fulfillment workflow using the code-first API:
const orderWorkflow = await ductape.workflows.define({
product: 'my-product',
tag: 'order-fulfillment',
name: 'Order Fulfillment',
handler: async (ctx) => {
// Step 1: Validate the order
const validation = await ctx.step('validate', async () => {
return ctx.action.run({
app: 'orders-api',
event: 'validate-order',
input: { body: { orderId: ctx.input.orderId } },
});
});
if (!validation.valid) {
return { success: false, error: validation.reason };
}
// Step 2: Process payment (with rollback)
const payment = await ctx.step(
'process-payment',
async () => {
return ctx.action.run({
app: 'stripe',
event: 'create-charge',
input: { body: { amount: ctx.input.amount } },
});
},
// Rollback handler - refund if later steps fail
async (result) => {
await ctx.action.run({
app: 'stripe',
event: 'refund-charge',
input: { body: { chargeId: result.id } },
});
}
);
// Step 3: Reserve inventory
await ctx.step('reserve-inventory', async () => {
return ctx.database.insert({
database: 'inventory-db',
event: 'reserve-items',
data: { items: ctx.input.items, orderId: ctx.input.orderId },
});
});
// Step 4: Send confirmation email (non-critical)
await ctx.step(
'send-confirmation',
async () => {
await ctx.notification.email({
notification: 'order-notifications',
event: 'order-confirmed',
recipients: [ctx.input.email],
subject: { orderId: ctx.input.orderId },
template: { orderId: ctx.input.orderId, total: ctx.input.amount },
});
},
null, // No rollback needed
{ allow_fail: true } // Continue even if email fails
);
return { success: true, orderId: ctx.input.orderId, chargeId: payment.id };
},
});
How Workflows Work
Every workflow has:
- A handler function - Contains your business logic with steps
- Steps - Individual operations that can be retried and rolled back
- Context (ctx) - Provides access to Ductape components and workflow state
- Rollback handlers - Optional functions to undo completed work on failure
Input → ctx.step('A') → ctx.step('B') → ctx.step('C') → Output
↓ ↓ ↓
Rollback Rollback Rollback
(if C fails)
The Workflow Context
The ctx object provides everything you need inside a workflow:
Input & Metadata
ctx.input // Workflow input data
ctx.workflow_id // Unique execution ID
ctx.workflow_tag // Workflow tag
ctx.env // Environment (dev, staging, prd)
ctx.product // Product tag
Ductape Components
| Component | Description |
|---|---|
ctx.action | Call external APIs |
ctx.database | Database operations |
ctx.graph | Graph database queries |
ctx.notification | Send emails, SMS, push |
ctx.storage | File upload/download |
ctx.publish | Message broker publishing |
ctx.quota | Rate limiting |
ctx.fallback | Handle failures with alternatives |
Control Flow
| Method | Description |
|---|---|
ctx.step() | Define a step with optional rollback |
ctx.sleep() | Pause execution |
ctx.waitForSignal() | Wait for external event |
ctx.checkpoint() | Save state for recovery |
ctx.triggerRollback() | Manually trigger rollback |
State Management
ctx.setState('key', value); // Save state
ctx.getState('key'); // Retrieve state
ctx.state // All state
ctx.steps // Step results
ctx.completed_steps // List of completed step tags
Defining Steps
Steps are defined with ctx.step():
const result = await ctx.step(
'step-tag', // Unique step identifier
handler, // Async function that does the work
rollback?, // Optional rollback function
options? // Optional step options
);
Basic Step
const user = await ctx.step('create-user', async () => {
return ctx.database.insert({
database: 'users-db',
event: 'create-user',
data: { email: ctx.input.email, name: ctx.input.name },
});
});
Step with Rollback
const charge = await ctx.step(
'charge-card',
async () => {
return ctx.action.run({
app: 'stripe',
event: 'create-charge',
input: { body: { amount: ctx.input.amount } },
});
},
async (result) => {
// Called if a later step fails
await ctx.action.run({
app: 'stripe',
event: 'refund',
input: { body: { chargeId: result.id } },
});
}
);
Step with Options
await ctx.step(
'send-analytics',
async () => {
await ctx.action.run({ app: 'analytics', event: 'track', input: {} });
},
null, // No rollback
{
allow_fail: true, // Continue on failure
retries: 3, // Retry count
timeout: 5000, // Timeout in ms
}
);
Component Usage
Actions (API Calls)
const result = await ctx.action.run({
app: 'stripe',
event: 'create-charge',
input: {
body: { amount: 1000, currency: 'usd' },
headers: { 'Idempotency-Key': ctx.input.orderId },
},
retries: 3,
timeout: 10000,
});
Database Operations
// Insert
const user = await ctx.database.insert({
database: 'users-db',
event: 'create-user',
data: { email: ctx.input.email },
});
// Query
const orders = await ctx.database.query({
database: 'orders-db',
event: 'find-orders',
params: { customerId: ctx.input.customerId },
});
// Update
await ctx.database.update({
database: 'users-db',
event: 'update-user',
where: { id: user.id },
data: { status: 'active' },
});
// Delete
await ctx.database.delete({
database: 'orders-db',
event: 'delete-order',
where: { id: ctx.input.orderId },
});
Graph Database
// Create node
const userNode = await ctx.graph.createNode({
graph: 'social-graph',
labels: ['User'],
properties: { name: ctx.input.name },
});
// Create relationship
await ctx.graph.createRelationship({
graph: 'social-graph',
from: userNode.id,
to: ctx.input.friendId,
type: 'FRIENDS_WITH',
});
// Query
const friends = await ctx.graph.query({
graph: 'social-graph',
action: 'find-friends',
params: { userId: ctx.input.userId },
});
Notifications
// Email
await ctx.notification.email({
notification: 'transactional',
event: 'welcome-email',
recipients: [ctx.input.email],
subject: { name: ctx.input.name },
template: { name: ctx.input.name, link: ctx.input.activationLink },
});
// SMS
await ctx.notification.sms({
notification: 'alerts',
event: 'verification-code',
phones: [ctx.input.phone],
message: { code: ctx.input.verificationCode },
});
// Push
await ctx.notification.push({
notification: 'mobile',
event: 'order-update',
tokens: [ctx.input.deviceToken],
title: { status: 'Shipped' },
body: { orderId: ctx.input.orderId },
data: { orderId: ctx.input.orderId },
});
Storage
// Upload
const file = await ctx.storage.upload({
storage: 'documents',
event: 'upload-receipt',
input: {
buffer: ctx.input.fileData,
fileName: `receipt-${ctx.input.orderId}.pdf`,
mimeType: 'application/pdf',
},
});
// Download
const download = await ctx.storage.download({
storage: 'templates',
event: 'get-template',
input: { file_key: 'invoice-template.pdf' },
});
// Delete
await ctx.storage.delete({
storage: 'temp-files',
event: 'cleanup',
input: { file_key: ctx.input.tempFileKey },
});
Message Publishing
await ctx.publish.send({
broker: 'order-events',
event: 'new-order',
input: {
message: {
orderId: ctx.input.orderId,
items: ctx.input.items,
},
},
retries: 2,
});
Control Flow
Sleep
Pause workflow execution:
await ctx.sleep(5000); // 5 seconds
await ctx.sleep('5m'); // 5 minutes
await ctx.sleep('1h'); // 1 hour
Wait for Signal
Pause until an external event:
const approval = await ctx.waitForSignal('order-approved', {
timeout: '24h',
});
console.log('Approved by:', approval.approvedBy);
Send a signal from outside:
await ductape.workflows.signal({
product: 'my-product',
env: 'prd',
workflow_id: 'wf-123',
signal: 'order-approved',
payload: { approvedBy: 'manager@company.com' },
});
Checkpoint
Save state for recovery:
await ctx.checkpoint('payment-complete', {
chargeId: payment.id,
timestamp: Date.now(),
});
Manual Rollback
Trigger rollback programmatically:
if (fraudCheck.risk > 0.8) {
await ctx.triggerRollback('High fraud risk detected');
}
Executing Workflows
const result = await ductape.workflows.execute({
product: 'my-product',
env: 'prd',
tag: 'order-fulfillment',
input: {
orderId: 'ORD-12345',
amount: 99.99,
email: 'customer@example.com',
items: ['SKU-001', 'SKU-002'],
},
});
console.log('Status:', result.status); // 'completed' | 'failed' | 'rolled_back'
console.log('Output:', result.output);
console.log('Duration:', result.execution_time, 'ms');
console.log('Completed Steps:', result.completed_steps);
Next Steps
- Getting Started - Set up your first workflow
- Step Types - Complete reference for all component operations
- Execution & Rollbacks - How execution and rollbacks work
- Examples - Real-world workflow patterns
See Also
- Jobs - Schedule workflows to run automatically