Skip to main content

Workflows

Workflows are durable, multi-step processes that execute a series of operations with automatic state persistence, retries, and rollbacks. They are designed for complex business processes that need to survive failures and maintain consistency.

When to Use Workflows

Use Workflows when you need:

  • Durability - Steps that must complete even if your server restarts
  • Rollbacks - Automatic compensation when something fails mid-process
  • Long-running processes - Operations that span minutes, hours, or days
  • Complex dependencies - Steps that depend on multiple previous steps
  • State checkpoints - Save progress and resume from where you left off
  • Multi-step business processes - Any operation with multiple steps that should succeed or fail together

Quick Example

Here's a simple order fulfillment workflow using the code-first API:

const orderWorkflow = await ductape.workflows.define({
product: 'my-product',
tag: 'order-fulfillment',
name: 'Order Fulfillment',
handler: async (ctx) => {
// Step 1: Validate the order
const validation = await ctx.step('validate', async () => {
return ctx.action.run({
app: 'orders-api',
event: 'validate-order',
input: { body: { orderId: ctx.input.orderId } },
});
});

if (!validation.valid) {
return { success: false, error: validation.reason };
}

// Step 2: Process payment (with rollback)
const payment = await ctx.step(
'process-payment',
async () => {
return ctx.action.run({
app: 'stripe',
event: 'create-charge',
input: { body: { amount: ctx.input.amount } },
});
},
// Rollback handler - refund if later steps fail
async (result) => {
await ctx.action.run({
app: 'stripe',
event: 'refund-charge',
input: { body: { chargeId: result.id } },
});
}
);

// Step 3: Reserve inventory
await ctx.step('reserve-inventory', async () => {
return ctx.database.insert({
database: 'inventory-db',
event: 'reserve-items',
data: { items: ctx.input.items, orderId: ctx.input.orderId },
});
});

// Step 4: Send confirmation email (non-critical)
await ctx.step(
'send-confirmation',
async () => {
await ctx.notification.email({
notification: 'order-notifications',
event: 'order-confirmed',
recipients: [ctx.input.email],
subject: { orderId: ctx.input.orderId },
template: { orderId: ctx.input.orderId, total: ctx.input.amount },
});
},
null, // No rollback needed
{ allow_fail: true } // Continue even if email fails
);

return { success: true, orderId: ctx.input.orderId, chargeId: payment.id };
},
});

How Workflows Work

Every workflow has:

  1. A handler function - Contains your business logic with steps
  2. Steps - Individual operations that can be retried and rolled back
  3. Context (ctx) - Provides access to Ductape components and workflow state
  4. Rollback handlers - Optional functions to undo completed work on failure
Input → ctx.step('A') → ctx.step('B') → ctx.step('C') → Output
↓ ↓ ↓
Rollback Rollback Rollback
(if C fails)

The Workflow Context

The ctx object provides everything you need inside a workflow:

Input & Metadata

ctx.input           // Workflow input data
ctx.workflow_id // Unique execution ID
ctx.workflow_tag // Workflow tag
ctx.env // Environment (dev, staging, prd)
ctx.product // Product tag

Ductape Components

ComponentDescription
ctx.actionCall external APIs
ctx.databaseDatabase operations
ctx.graphGraph database queries
ctx.notificationSend emails, SMS, push
ctx.storageFile upload/download
ctx.publishMessage broker publishing
ctx.quotaRate limiting
ctx.fallbackHandle failures with alternatives

Control Flow

MethodDescription
ctx.step()Define a step with optional rollback
ctx.sleep()Pause execution
ctx.waitForSignal()Wait for external event
ctx.checkpoint()Save state for recovery
ctx.triggerRollback()Manually trigger rollback

State Management

ctx.setState('key', value);     // Save state
ctx.getState('key'); // Retrieve state
ctx.state // All state
ctx.steps // Step results
ctx.completed_steps // List of completed step tags

Defining Steps

Steps are defined with ctx.step():

const result = await ctx.step(
'step-tag', // Unique step identifier
handler, // Async function that does the work
rollback?, // Optional rollback function
options? // Optional step options
);

Basic Step

const user = await ctx.step('create-user', async () => {
return ctx.database.insert({
database: 'users-db',
event: 'create-user',
data: { email: ctx.input.email, name: ctx.input.name },
});
});

Step with Rollback

const charge = await ctx.step(
'charge-card',
async () => {
return ctx.action.run({
app: 'stripe',
event: 'create-charge',
input: { body: { amount: ctx.input.amount } },
});
},
async (result) => {
// Called if a later step fails
await ctx.action.run({
app: 'stripe',
event: 'refund',
input: { body: { chargeId: result.id } },
});
}
);

Step with Options

await ctx.step(
'send-analytics',
async () => {
await ctx.action.run({ app: 'analytics', event: 'track', input: {} });
},
null, // No rollback
{
allow_fail: true, // Continue on failure
retries: 3, // Retry count
timeout: 5000, // Timeout in ms
}
);

Component Usage

Actions (API Calls)

const result = await ctx.action.run({
app: 'stripe',
event: 'create-charge',
input: {
body: { amount: 1000, currency: 'usd' },
headers: { 'Idempotency-Key': ctx.input.orderId },
},
retries: 3,
timeout: 10000,
});

Database Operations

// Insert
const user = await ctx.database.insert({
database: 'users-db',
event: 'create-user',
data: { email: ctx.input.email },
});

// Query
const orders = await ctx.database.query({
database: 'orders-db',
event: 'find-orders',
params: { customerId: ctx.input.customerId },
});

// Update
await ctx.database.update({
database: 'users-db',
event: 'update-user',
where: { id: user.id },
data: { status: 'active' },
});

// Delete
await ctx.database.delete({
database: 'orders-db',
event: 'delete-order',
where: { id: ctx.input.orderId },
});

Graph Database

// Create node
const userNode = await ctx.graph.createNode({
graph: 'social-graph',
labels: ['User'],
properties: { name: ctx.input.name },
});

// Create relationship
await ctx.graph.createRelationship({
graph: 'social-graph',
from: userNode.id,
to: ctx.input.friendId,
type: 'FRIENDS_WITH',
});

// Query
const friends = await ctx.graph.query({
graph: 'social-graph',
action: 'find-friends',
params: { userId: ctx.input.userId },
});

Notifications

// Email
await ctx.notification.email({
notification: 'transactional',
event: 'welcome-email',
recipients: [ctx.input.email],
subject: { name: ctx.input.name },
template: { name: ctx.input.name, link: ctx.input.activationLink },
});

// SMS
await ctx.notification.sms({
notification: 'alerts',
event: 'verification-code',
phones: [ctx.input.phone],
message: { code: ctx.input.verificationCode },
});

// Push
await ctx.notification.push({
notification: 'mobile',
event: 'order-update',
tokens: [ctx.input.deviceToken],
title: { status: 'Shipped' },
body: { orderId: ctx.input.orderId },
data: { orderId: ctx.input.orderId },
});

Storage

// Upload
const file = await ctx.storage.upload({
storage: 'documents',
event: 'upload-receipt',
input: {
buffer: ctx.input.fileData,
fileName: `receipt-${ctx.input.orderId}.pdf`,
mimeType: 'application/pdf',
},
});

// Download
const download = await ctx.storage.download({
storage: 'templates',
event: 'get-template',
input: { file_key: 'invoice-template.pdf' },
});

// Delete
await ctx.storage.delete({
storage: 'temp-files',
event: 'cleanup',
input: { file_key: ctx.input.tempFileKey },
});

Message Publishing

await ctx.publish.send({
broker: 'order-events',
event: 'new-order',
input: {
message: {
orderId: ctx.input.orderId,
items: ctx.input.items,
},
},
retries: 2,
});

Control Flow

Sleep

Pause workflow execution:

await ctx.sleep(5000);        // 5 seconds
await ctx.sleep('5m'); // 5 minutes
await ctx.sleep('1h'); // 1 hour

Wait for Signal

Pause until an external event:

const approval = await ctx.waitForSignal('order-approved', {
timeout: '24h',
});

console.log('Approved by:', approval.approvedBy);

Send a signal from outside:

await ductape.workflows.signal({
product: 'my-product',
env: 'prd',
workflow_id: 'wf-123',
signal: 'order-approved',
payload: { approvedBy: 'manager@company.com' },
});

Checkpoint

Save state for recovery:

await ctx.checkpoint('payment-complete', {
chargeId: payment.id,
timestamp: Date.now(),
});

Manual Rollback

Trigger rollback programmatically:

if (fraudCheck.risk > 0.8) {
await ctx.triggerRollback('High fraud risk detected');
}

Executing Workflows

const result = await ductape.workflows.execute({
product: 'my-product',
env: 'prd',
tag: 'order-fulfillment',
input: {
orderId: 'ORD-12345',
amount: 99.99,
email: 'customer@example.com',
items: ['SKU-001', 'SKU-002'],
},
});

console.log('Status:', result.status); // 'completed' | 'failed' | 'rolled_back'
console.log('Output:', result.output);
console.log('Duration:', result.execution_time, 'ms');
console.log('Completed Steps:', result.completed_steps);

Next Steps

See Also

  • Jobs - Schedule workflows to run automatically