Building Workflows
This guide covers the complete code-first API for defining workflows using ductape.workflows.define().
Workflow Definition
Define a workflow by passing a configuration object with a handler function:
const workflow = await ductape.workflows.define({
product: 'my-product', // Target product
tag: 'workflow-tag', // Unique identifier
name: 'Workflow Name', // Display name
description: 'Description', // Optional
handler: async (ctx) => {
// Your workflow logic
return { result: 'data' };
},
});
Definition Options
Basic Properties
await ductape.workflows.define({
product: 'my-product',
tag: 'order-process',
name: 'Order Processing',
description: 'Handles complete order fulfillment',
// Workflow options
options: {
timeout: 300000, // 5 minutes max
rollback_strategy: 'reverse_all', // How to handle failures
},
// Environment configurations
envs: [
{ slug: 'dev', active: true },
{ slug: 'staging', active: true },
{ slug: 'prd', active: true },
],
handler: async (ctx) => {
// ...
},
});
Recording options (conditionals, loops, switch)
When your handler uses if/early return, loops, or switch, the compiler needs help to record all branches. See Conditionals, loops, and switch for full details.
| Option | Purpose |
|---|---|
| branchOverrides | Step result overrides so the handler continues past an if (result.x) return … and records later steps. Those steps get a condition at runtime (e.g. $Step{validate}{valid} == true). |
| recordInput | Sample input used during recording so loops (e.g. for (const item of ctx.input.items)) run and record steps. Use unique step tags per iteration. |
| recordScenarios | Array of input scenarios; the handler runs once per scenario. Each step gets a condition (e.g. $Input{type} == 'a') so only the matching branch runs at runtime. Use for switch-style branches. |
Signals
Define signals that external systems can send to the workflow:
await ductape.workflows.define({
// ...
signals: {
'approve': { input: { approver_id: 'string', comments: 'string' } },
'cancel': { input: { reason: 'string' } },
},
handler: async (ctx) => {
// Wait for approval signal
const approval = await ctx.waitForSignal('approve', { timeout: '24h' });
console.log('Approved by:', approval.approver_id);
},
});
Queries
Define queries to check workflow state:
await ductape.workflows.define({
// ...
queries: {
'getProgress': {
handler: (ctx) => ({
completed_steps: ctx.completed_steps,
current_step: ctx.current_step,
}),
},
},
handler: async (ctx) => {
// ...
},
});
Conditionals, loops, and switch
The handler runs once at define time to record steps. If you use if, early return, loops, or switch, only the path taken during that run is recorded. Use the options below so all branches are recorded and the right step conditions are applied at runtime.
| You want to… | Use | Runtime behavior |
|---|---|---|
| If / else on a step result (e.g. "only run payment if validation passed") | branchOverrides | Steps after the branch get a condition like $Step{validate}{valid} == true; executor skips them when the condition is false. |
Loop over input (e.g. process each item in ctx.input.items) | recordInput | Supply sample input so the loop runs during recording; steps are recorded per iteration. |
If/else or switch on workflow input (e.g. different steps for type: 'a' vs type: 'b') | recordScenarios | Handler runs once per scenario; each step gets a condition like $Input{type} == 'a' and only runs when input matches. |
If / else (early return) – branchOverrides
When the handler branches on a step result and returns early, later steps are never recorded. Use branchOverrides so the compiler "pretends" a step returned a value that keeps the handler running and records those steps. At runtime, those steps get a condition so they only run when the real step output matches.
await ductape.workflows.define({
product: 'my-product',
tag: 'order-fulfillment',
name: 'Order Fulfillment',
// So the handler continues past the if and records process-payment
branchOverrides: {
validate: { valid: true },
},
handler: async (ctx) => {
const validation = await ctx.step('validate', async () => {
return ctx.action.run({
app: 'orders-api',
event: 'validate-order',
input: { body: { orderId: ctx.input.orderId } },
});
});
if (!validation.valid) return { success: false, error: validation.reason };
const payment = await ctx.step('process-payment', async () =>
ctx.action.run({
app: 'stripe',
event: 'create-charge',
input: { body: { amount: ctx.input.amount } },
})
);
return { success: true, orderId: ctx.input.orderId, chargeId: payment.id };
},
});
- Compile: With
validate: { valid: true }, the handler does not early-return, soprocess-paymentis recorded. The step gets condition$Step{validate}{valid} == true. - Runtime: The executor runs
validate; if the real result hasvalid === false,process-paymentis skipped; ifvalid === true, it runs.
Loops (for) – recordInput
By default, recording uses empty input, so a loop like for (const item of ctx.input.items) runs zero times and no steps are recorded. Use recordInput to supply sample input during recording so the loop runs and each iteration’s steps are recorded.
Use unique step tags per iteration (e.g. `process-${item.id}`).
await ductape.workflows.define({
product: 'my-product',
tag: 'process-orders',
name: 'Process Orders',
recordInput: {
items: [{ id: '1' }, { id: '2' }],
},
handler: async (ctx) => {
for (const item of ctx.input.items) {
await ctx.step(`process-${item.id}`, async () =>
ctx.action.run({
app: 'orders-api',
event: 'process-item',
input: { body: { id: item.id } },
})
);
}
},
});
Recording runs with ctx.input.items = [{ id: '1' }, { id: '2' }], so steps process-1 and process-2 are recorded. Keys not in recordInput still compile to $Input{...} in the schema.
If/else or switch on input – recordScenarios
For if/else or switch on workflow input (e.g. different steps per type or status), use recordScenarios. The handler runs once per scenario (with that scenario as input). Every step recorded in that run gets a condition so it only runs when the real input matches (e.g. $Input{type} == 'a'). Steps from all runs are merged; if the same step appears in multiple scenarios, conditions are combined with ||. You can use either if (ctx.input.type === 'a') { ... } else if (ctx.input.type === 'b') { ... } or switch (ctx.input.type) { case 'a': ... } in the handler.
await ductape.workflows.define({
product: 'my-product',
tag: 'handle-by-type',
name: 'Handle by type',
recordScenarios: [
{ type: 'a' },
{ type: 'b' },
],
handler: async (ctx) => {
switch (ctx.input.type) {
case 'a':
await ctx.step('handle-a', async () =>
ctx.action.run({ app: 'api', event: 'handle-a', input: {} })
);
break;
case 'b':
await ctx.step('handle-b', async () =>
ctx.action.run({ app: 'api', event: 'handle-b', input: {} })
);
break;
}
},
});
- Compile: Two runs: one with
{ type: 'a' }, one with{ type: 'b' }. Stepshandle-a(condition$Input{type} == 'a') andhandle-b(condition$Input{type} == 'b') are recorded. - Runtime: The executor evaluates the condition and runs only the matching step.
Step condition syntax
Conditions are evaluated at runtime by the workflow executor. You can combine conditions with && (and) and || (or). Each part can use these operators:
| Operator | Meaning | Example |
|---|---|---|
==, === | Equal | $Step{validate}{valid} == true |
!=, !== | Not equal | $Input{status} != 'draft' |
>, >= | Greater than (or equal) | $Step{score}{value} >= 5 |
<, <= | Less than (or equal) | $Input{count} <= 100 |
Left and right sides can be:
- Step output:
$Step{stepTag}{field}or$Step{stepTag}{nested.field} - Workflow input:
$Input{field}or$Input{nested.field} - Literals:
true,false, numbers, or quoted strings ('a',"draft")
Examples:
$Step{validate}{available} == true
$Input{type} == 'premium'
$Step{score}{value} >= 80 && $Input{role} != 'guest'
($Input{region} == 'eu') || ($Input{region} == 'uk')
Working with Steps
The ctx.step() Function
Steps are the building blocks of workflows:
const result = await ctx.step(
'step-tag', // Unique identifier
handler, // Async function
rollback?, // Optional rollback function
options? // Optional step options
);
Basic Step
const user = await ctx.step('create-user', async () => {
return ctx.database.insert({
database: 'users-db',
event: 'create-user',
data: { email: ctx.input.email },
});
});
// Use the result in subsequent steps
console.log('Created user:', user.id);
Step with Rollback
const charge = await ctx.step(
'charge-card',
// Handler
async () => {
return ctx.action.run({
app: 'stripe',
event: 'create-charge',
input: { body: { amount: ctx.input.amount } },
});
},
// Rollback - called if a later step fails
async (result) => {
await ctx.action.run({
app: 'stripe',
event: 'refund',
input: { body: { chargeId: result.id } },
});
}
);
Step Options
await ctx.step(
'send-notification',
async () => {
await ctx.notification.email({ /* ... */ });
},
null, // No rollback
{
allow_fail: true, // Continue on failure
retries: 3, // Retry count
retry_delay: 1000, // Delay between retries (ms)
timeout: 30000, // Step timeout (ms)
critical: false, // Mark as critical for rollback
}
);
Using Ductape Components
ctx.action - API Calls
Call external APIs through connected apps:
// Basic call
const result = await ctx.action.run({
app: 'stripe',
event: 'create-charge',
input: {
body: { amount: 1000, currency: 'usd' },
},
});
// With retries and timeout
const result = await ctx.action.run({
app: 'external-api',
event: 'fetch-data',
input: {
body: { query: ctx.input.query },
headers: { 'X-Request-ID': ctx.workflow_id },
},
retries: 3,
timeout: 10000,
});
ctx.database - Database Operations
// Insert
const record = await ctx.database.insert({
database: 'orders-db',
event: 'create-order',
data: {
customer_id: ctx.input.customerId,
total: ctx.input.amount,
items: ctx.input.items,
},
});
// Query
const orders = await ctx.database.query({
database: 'orders-db',
event: 'find-by-customer',
params: { customerId: ctx.input.customerId },
});
// Execute (for custom operations)
const result = await ctx.database.execute({
database: 'analytics-db',
event: 'aggregate-sales',
input: { startDate: ctx.input.from, endDate: ctx.input.to },
});
// Update
await ctx.database.update({
database: 'orders-db',
event: 'update-status',
where: { id: ctx.input.orderId },
data: { status: 'shipped' },
});
// Delete
await ctx.database.delete({
database: 'orders-db',
event: 'remove-order',
where: { id: ctx.input.orderId },
});
ctx.graph - Graph Database
// Create node
const node = await ctx.graph.createNode({
graph: 'social-graph',
labels: ['User'],
properties: { name: ctx.input.name, email: ctx.input.email },
});
// Update node
await ctx.graph.updateNode({
graph: 'social-graph',
id: node.id,
properties: { verified: true },
});
// Create relationship
await ctx.graph.createRelationship({
graph: 'social-graph',
from: ctx.input.userId,
to: ctx.input.friendId,
type: 'FOLLOWS',
properties: { since: new Date().toISOString() },
});
// Query
const connections = await ctx.graph.query({
graph: 'social-graph',
action: 'find-connections',
params: { userId: ctx.input.userId, depth: 2 },
});
// Execute custom action
const result = await ctx.graph.execute({
graph: 'social-graph',
action: 'compute-influence',
input: { userId: ctx.input.userId },
});
// Delete node
await ctx.graph.deleteNode({
graph: 'social-graph',
id: ctx.input.nodeId,
});
// Delete relationship
await ctx.graph.deleteRelationship({
graph: 'social-graph',
id: ctx.input.relationshipId,
});
ctx.notification - Send Notifications
// Email
await ctx.notification.email({
notification: 'transactional',
event: 'order-confirmation',
recipients: [ctx.input.email],
subject: { orderId: ctx.input.orderId },
template: {
orderId: ctx.input.orderId,
items: ctx.input.items,
total: ctx.input.total,
},
});
// SMS
await ctx.notification.sms({
notification: 'alerts',
event: 'verification-code',
phones: [ctx.input.phone],
message: { code: ctx.input.code },
});
// Push notification
await ctx.notification.push({
notification: 'mobile-app',
event: 'order-shipped',
tokens: [ctx.input.deviceToken],
title: { status: 'Shipped' },
body: { trackingNumber: ctx.input.trackingNumber },
data: { orderId: ctx.input.orderId },
});
// Generic send
await ctx.notification.send({
notification: 'multi-channel',
event: 'urgent-alert',
input: {
email: { recipients: [ctx.input.email] },
sms: { phones: [ctx.input.phone] },
push: { tokens: [ctx.input.token] },
},
retries: 3,
});
ctx.storage - File Operations
// Upload file
const uploaded = await ctx.storage.upload({
storage: 'documents',
event: 'upload-invoice',
input: {
buffer: ctx.input.fileData,
fileName: `invoice-${ctx.input.orderId}.pdf`,
mimeType: 'application/pdf',
},
retries: 2,
});
console.log('File URL:', uploaded.url);
console.log('File key:', uploaded.file_key);
// Download file
const file = await ctx.storage.download({
storage: 'templates',
event: 'get-template',
input: { file_key: 'invoice-template.pdf' },
});
console.log('Content:', file.content);
console.log('Size:', file.size);
// Delete file
await ctx.storage.delete({
storage: 'temp-files',
event: 'cleanup',
input: { file_key: ctx.input.tempFileKey },
});
ctx.messaging - Message Broker (Ductape primitive)
// Publish message (event = "broker-tag:topic-tag")
await ctx.messaging.produce({
event: 'order-events:order-created',
message: {
orderId: ctx.input.orderId,
customerId: ctx.input.customerId,
items: ctx.input.items,
timestamp: Date.now(),
},
});
ctx.quota - Rate Limiting
// Check quota before proceeding
const quotaResult = await ctx.quota.execute({
quota: 'api-rate-limit',
input: { key: ctx.input.userId },
timeout: 5000,
});
if (!quotaResult.allowed) {
throw new Error('Rate limit exceeded');
}
ctx.fallback - Failure Handling
// Use fallback for resilient operations
const result = await ctx.fallback.execute({
fallback: 'payment-fallback',
input: {
amount: ctx.input.amount,
method: ctx.input.paymentMethod,
},
timeout: 30000,
});
Control Flow
ctx.sleep() - Pause Execution
// Wait 5 seconds
await ctx.sleep(5000);
// Wait using duration string
await ctx.sleep('5m'); // 5 minutes
await ctx.sleep('1h'); // 1 hour
await ctx.sleep('1d'); // 1 day
ctx.waitForSignal() - Wait for External Event
// Wait for a single signal
const approval = await ctx.waitForSignal('order-approved', {
timeout: '24h',
});
// Wait for any of multiple signals
const result = await ctx.waitForSignal(['approved', 'rejected'], {
timeout: '48h',
});
// Access signal payload
console.log('Approved by:', result.approvedBy);
ctx.checkpoint() - Save Progress
// Save state for recovery
await ctx.checkpoint('payment-complete', {
chargeId: payment.id,
amount: ctx.input.amount,
timestamp: Date.now(),
});
ctx.triggerRollback() - Manual Rollback
// Manually trigger rollback
if (fraudCheck.score > 0.8) {
const result = await ctx.triggerRollback('High fraud risk detected');
console.log('Rolled back steps:', result.rolled_back_steps);
return { success: false, reason: 'fraud_detected' };
}
ctx.workflow() - Child Workflows
// Run another workflow as a child
const paymentResult = await ctx.workflow(
'child-payment-123', // Child workflow ID
'payment-processing', // Workflow tag
{ amount: ctx.input.amount }, // Input
{
timeout: '5m',
retries: 2,
parent_close_policy: 'terminate',
}
);
State Management
ctx.setState() and ctx.getState()
// Save state
ctx.setState('retryCount', 0);
ctx.setState('lastError', null);
// Later, retrieve state
const retryCount = ctx.getState<number>('retryCount') || 0;
if (retryCount < 3) {
ctx.setState('retryCount', retryCount + 1);
// Retry logic...
}
// Access all state
console.log('Current state:', ctx.state);
ctx.steps - Access Step Results
// Access results from previous steps
const userResult = ctx.steps['create-user'];
console.log('User ID:', userResult.id);
// Check completed steps
console.log('Completed:', ctx.completed_steps);
console.log('Current:', ctx.current_step);
Data Access
ctx.variable() and ctx.constant()
// Access app variables
const apiVersion = ctx.variable('stripe', 'api_version');
// Access app constants
const defaultCurrency = ctx.constant('stripe', 'default_currency');
ctx.default() - Fallback Values
// Provide default values
const currency = ctx.default(ctx.input.currency, 'usd');
const retries = ctx.default(ctx.input.retries, 3);
ctx.transform - Data Transformations
// String transformations
const upper = ctx.transform.upper(ctx.input.name);
const lower = ctx.transform.lower(ctx.input.email);
const trimmed = ctx.transform.trim(ctx.input.text);
// JSON operations
const parsed = ctx.transform.parseJson<MyType>(ctx.input.jsonString);
const stringified = ctx.transform.stringify(ctx.input.data);
// Date operations
const now = ctx.transform.now();
const formatted = ctx.transform.formatDate(Date.now(), 'YYYY-MM-DD');
// Array/object operations
const length = ctx.transform.length(ctx.input.items);
const size = ctx.transform.size(ctx.input.metadata);
const parts = ctx.transform.split(ctx.input.csv, ',');
const joined = ctx.transform.join(ctx.input.tags, ', ');
Logging
// Log messages at different levels
ctx.log.debug('Processing step', { step: 'create-user' });
ctx.log.info('User created', { userId: user.id });
ctx.log.warn('Retry attempt', { attempt: 2, maxRetries: 3 });
ctx.log.error('Step failed', { error: err.message });
Complete Example
This example uses branchOverrides so the compiler records the payment step even though the handler can early-return when validation fails. At runtime, the payment step runs only when $Step{validate}{available} is true.
await ductape.workflows.define({
product: 'my-product',
tag: 'order-fulfillment',
name: 'Order Fulfillment',
description: 'Complete order processing workflow',
options: {
timeout: 300000,
rollback_strategy: 'reverse_all',
},
signals: {
'cancel-order': { input: { reason: 'string' } },
},
// Record the "continue" branch so payment and later steps are in the schema
branchOverrides: { validate: { available: true } },
handler: async (ctx) => {
ctx.log.info('Starting order fulfillment', { orderId: ctx.input.orderId });
// Step 1: Validate order
const validation = await ctx.step('validate', async () => {
return ctx.action.run({
app: 'inventory-api',
event: 'check-availability',
input: { body: { items: ctx.input.items } },
});
});
if (!validation.available) {
return { success: false, reason: 'items_unavailable' };
}
// Step 2: Process payment (with rollback)
const payment = await ctx.step(
'payment',
async () => {
return ctx.action.run({
app: 'stripe',
event: 'create-charge',
input: {
body: {
amount: ctx.input.amount,
customer: ctx.input.customerId,
},
},
retries: 3,
});
},
async (result) => {
await ctx.action.run({
app: 'stripe',
event: 'refund',
input: { body: { chargeId: result.id } },
});
}
);
// Checkpoint after payment
await ctx.checkpoint('payment-complete', { chargeId: payment.id });
// Step 3: Reserve inventory (with rollback)
await ctx.step(
'reserve-inventory',
async () => {
return ctx.action.run({
app: 'inventory-api',
event: 'reserve',
input: { body: { items: ctx.input.items } },
});
},
async () => {
await ctx.action.run({
app: 'inventory-api',
event: 'release',
input: { body: { items: ctx.input.items } },
});
}
);
// Step 4: Create order record
const order = await ctx.step('create-order', async () => {
return ctx.database.insert({
database: 'orders-db',
event: 'create',
data: {
customer_id: ctx.input.customerId,
items: ctx.input.items,
total: ctx.input.amount,
payment_id: payment.id,
status: 'processing',
},
});
});
// Step 5: Send confirmation (non-critical)
await ctx.step(
'send-confirmation',
async () => {
await ctx.notification.email({
notification: 'transactional',
event: 'order-confirmed',
recipients: [ctx.input.email],
template: {
orderId: order.id,
items: ctx.input.items,
total: ctx.input.amount,
},
});
},
null,
{ allow_fail: true }
);
// Step 6: Queue for fulfillment
await ctx.step('queue-fulfillment', async () => {
await ctx.messaging.produce({
event: 'warehouse:new-order',
message: {
orderId: order.id,
items: ctx.input.items,
},
});
});
ctx.log.info('Order fulfilled successfully', { orderId: order.id });
return {
success: true,
orderId: order.id,
chargeId: payment.id,
};
},
});
Next Steps
- Step Types - Detailed reference for all ctx methods
- Execution & Rollbacks - How execution works
- Examples - Real-world patterns