Context API Reference
Complete reference for all methods available on the workflow context (ctx) object.
ctx.step()
Define a workflow step with optional rollback.
const result = await ctx.step<T>(
tag: string,
handler: () => Promise<T>,
rollback?: ((result: T) => Promise<void>) | null,
options?: IWorkflowStepOptions
): Promise<T>;
Parameters
| Parameter | Type | Description |
|---|---|---|
tag | string | Unique step identifier |
handler | function | Async function that performs the step's work |
rollback | function | null | Optional function called during rollback |
options | object | Step configuration options |
Options
| Option | Type | Default | Description |
|---|---|---|---|
allow_fail | boolean | false | Continue workflow if step fails |
retries | number | 0 | Number of retry attempts |
retry_delay | number | 1000 | Milliseconds between retries |
timeout | number | - | Step timeout in milliseconds |
critical | boolean | false | Always rollback this step on failure |
Examples
// Basic step
const user = await ctx.step('create-user', async () => {
return ctx.database.insert({ database: 'users', event: 'create', data: {} });
});
// With rollback
const charge = await ctx.step(
'charge',
async () => ctx.action.run({ app: 'stripe', event: 'charge', input: {} }),
async (result) => ctx.action.run({ app: 'stripe', event: 'refund', input: { body: { id: result.id } } })
);
// With options
await ctx.step(
'notify',
async () => ctx.notification.email({ /* ... */ }),
null,
{ allow_fail: true, retries: 3 }
);
ctx.action
Call external APIs through connected apps.
ctx.action.run()
const result = await ctx.action.run<T>({
app: string,
event: string,
input: {
body?: Record<string, unknown>,
headers?: Record<string, string>,
query?: Record<string, string>,
params?: Record<string, string>,
},
retries?: number,
timeout?: number,
}): Promise<T>;
Example
const charge = await ctx.action.run({
app: 'stripe',
event: 'create-charge',
input: {
body: { amount: 1000, currency: 'usd' },
headers: { 'Idempotency-Key': ctx.workflow_id },
},
retries: 3,
timeout: 10000,
});
ctx.database
Database operations.
ctx.database.insert()
const record = await ctx.database.insert<T>({
database: string,
event: string,
data: Record<string, unknown>,
}): Promise<T>;
ctx.database.query()
const records = await ctx.database.query<T>({
database: string,
event: string,
params?: Record<string, unknown>,
}): Promise<T[]>;
ctx.database.update()
const updated = await ctx.database.update<T>({
database: string,
event: string,
where: Record<string, unknown>,
data: Record<string, unknown>,
}): Promise<T>;
ctx.database.delete()
const success = await ctx.database.delete({
database: string,
event: string,
where: Record<string, unknown>,
}): Promise<boolean>;
ctx.database.execute()
const result = await ctx.database.execute<T>({
database: string,
event: string,
input: IDbActionRequest,
retries?: number,
timeout?: number,
}): Promise<T>;
ctx.graph
Graph database operations.
ctx.graph.createNode()
const node = await ctx.graph.createNode<T>({
graph: string,
labels: string[],
properties: Record<string, unknown>,
}): Promise<T>;
ctx.graph.updateNode()
const node = await ctx.graph.updateNode<T>({
graph: string,
id: string | number,
properties: Record<string, unknown>,
}): Promise<T>;
ctx.graph.deleteNode()
await ctx.graph.deleteNode({
graph: string,
id: string | number,
}): Promise<void>;
ctx.graph.createRelationship()
const rel = await ctx.graph.createRelationship<T>({
graph: string,
from: string,
to: string,
type: string,
properties?: Record<string, unknown>,
}): Promise<T>;
ctx.graph.deleteRelationship()
await ctx.graph.deleteRelationship({
graph: string,
id: string | number,
}): Promise<void>;
ctx.graph.query()
const results = await ctx.graph.query<T>({
graph: string,
action: string,
params?: Record<string, unknown>,
}): Promise<T[]>;
ctx.graph.execute()
const result = await ctx.graph.execute<T>({
graph: string,
action: string,
input: Record<string, unknown>,
}): Promise<T>;
ctx.notification
Send notifications.
ctx.notification.email()
await ctx.notification.email({
notification: string,
event: string,
recipients: string[],
subject: Record<string, unknown>,
template: Record<string, unknown>,
}): Promise<void>;
ctx.notification.sms()
await ctx.notification.sms({
notification: string,
event: string,
phones: string[],
message: Record<string, unknown>,
}): Promise<void>;
ctx.notification.push()
await ctx.notification.push({
notification: string,
event: string,
tokens: string[],
title: Record<string, unknown>,
body: Record<string, unknown>,
data?: Record<string, unknown>,
}): Promise<void>;
ctx.notification.send()
await ctx.notification.send({
notification: string,
event: string,
input: INotificationRequest,
retries?: number,
}): Promise<void>;
ctx.storage
File operations.
ctx.storage.upload()
const result = await ctx.storage.upload({
storage: string,
event: string,
input: {
buffer: Buffer | string,
fileName: string,
mimeType: string,
},
retries?: number,
}): Promise<{
file_key: string,
url?: string,
size?: number,
content_type?: string,
}>;
ctx.storage.download()
const file = await ctx.storage.download({
storage: string,
event: string,
input: { file_key: string },
}): Promise<{
content: Buffer | string,
content_type?: string,
size?: number,
}>;
ctx.storage.delete()
await ctx.storage.delete({
storage: string,
event: string,
input: { file_key: string },
}): Promise<void>;
ctx.publish
Message broker operations.
ctx.publish.send()
await ctx.publish.send({
broker: string,
event: string,
input: {
message: Record<string, unknown>,
},
retries?: number,
}): Promise<void>;
ctx.quota
Rate limiting.
ctx.quota.execute()
const result = await ctx.quota.execute<T>({
quota: string,
input: Record<string, unknown>,
timeout?: number,
}): Promise<T>;
ctx.fallback
Failure handling with alternatives.
ctx.fallback.execute()
const result = await ctx.fallback.execute<T>({
fallback: string,
input: Record<string, unknown>,
timeout?: number,
}): Promise<T>;
ctx.healthcheck
Check service availability.
ctx.healthcheck.getStatus()
const status = await ctx.healthcheck.getStatus(tag: string): Promise<{
status: 'available' | 'unavailable',
lastChecked?: string,
lastLatency?: number,
}>;
Control Flow
ctx.sleep()
Pause workflow execution.
await ctx.sleep(duration: number | string): Promise<void>;
| Format | Example | Description |
|---|---|---|
| number | 5000 | Milliseconds |
| string | '5s' | Seconds |
| string | '5m' | Minutes |
| string | '1h' | Hours |
| string | '1d' | Days |
ctx.waitForSignal()
Wait for an external signal.
const payload = await ctx.waitForSignal<T>(
signal: string | string[],
options?: { timeout?: number | string }
): Promise<T>;
ctx.checkpoint()
Save state for recovery.
await ctx.checkpoint(
name: string,
metadata?: Record<string, unknown>
): Promise<void>;
ctx.triggerRollback()
Manually trigger rollback.
const result = await ctx.triggerRollback(reason: string): Promise<{
success: boolean,
rolled_back_steps: string[],
failed_steps?: Array<{ tag: string, error: string }>,
reason: string,
}>;
ctx.workflow()
Run a child workflow.
const result = await ctx.workflow<TInput, TOutput>(
childId: string,
tag: string,
input: TInput,
options?: {
timeout?: number | string,
retries?: number,
parent_close_policy?: 'terminate' | 'abandon' | 'request_cancel',
idempotency_key?: string,
}
): Promise<TOutput>;
State Management
ctx.setState()
ctx.setState(key: string, value: unknown): void;
ctx.getState()
const value = ctx.getState<T>(key: string): T | undefined;
Read-Only Properties
| Property | Type | Description |
|---|---|---|
ctx.state | object | All workflow state |
ctx.steps | object | Results from completed steps |
ctx.completed_steps | string[] | Tags of completed steps |
ctx.current_step | string | null | Currently executing step |
Data Access
ctx.input
Read-only workflow input data.
const email = ctx.input.email;
const items = ctx.input.order.items;
ctx.variable()
Get app variable value.
const value = ctx.variable(app: string, key: string): unknown;
ctx.constant()
Get app constant value.
const value = ctx.constant(app: string, key: string): unknown;
ctx.token()
Get token value.
const token = ctx.token(key: string): string;
ctx.default()
Provide fallback for undefined values.
const value = ctx.default<T>(value: T | undefined, fallback: T): T;
ctx.transform
Data transformation utilities.
| Method | Description |
|---|---|
size(obj) | Number of keys in object |
length(arr) | Length of array or string |
parseJson<T>(str) | Parse JSON string |
stringify(obj) | Convert to JSON string |
upper(str) | Uppercase string |
lower(str) | Lowercase string |
trim(str) | Trim whitespace |
split(str, sep) | Split string |
join(arr, sep) | Join array |
now() | Current timestamp |
formatDate(date, fmt) | Format date |
ctx.log
Logging methods.
ctx.log.debug(message: string, data?: Record<string, unknown>): void;
ctx.log.info(message: string, data?: Record<string, unknown>): void;
ctx.log.warn(message: string, data?: Record<string, unknown>): void;
ctx.log.error(message: string, data?: Record<string, unknown>): void;
Metadata Properties
| Property | Type | Description |
|---|---|---|
ctx.workflow_id | string | Unique execution ID |
ctx.workflow_tag | string | Workflow tag |
ctx.env | string | Environment (dev, staging, prd) |
ctx.product | string | Product tag |
ctx.context | object | Additional metadata |
ctx.session | object | Session info (if provided) |
ctx.auth | object | Authentication data |
Replay, Restart & Resume Properties
Properties for detecting and handling replayed, restarted, or resumed workflows:
| Property | Type | Description |
|---|---|---|
ctx.is_replay | boolean | Whether this is a replay execution |
ctx.is_restart | boolean | Whether this is a restart execution |
ctx.is_restored | boolean | Whether this is a resumed execution |
ctx.replayed_from | string | undefined | Original workflow ID (if replay) |
ctx.restarted_from | string | undefined | Original workflow ID (if restart) |
ctx.resumed_from | string | undefined | Original workflow ID (if resume) |
ctx.replay_reason | string | undefined | Reason for replay |
ctx.restart_reason | string | undefined | Reason for restart |
ctx.restored_checkpoint | object | undefined | Checkpoint info (if resumed) |
ctx.original_input | object | undefined | Original input (for restart comparison) |
Example: Detecting Replay/Restart/Resume
handler: async (ctx) => {
// Log execution context
if (ctx.is_replay) {
ctx.log.info('Replaying workflow', {
original_id: ctx.replayed_from,
reason: ctx.replay_reason,
});
}
if (ctx.is_restart) {
ctx.log.info('Restarted workflow', {
original_id: ctx.restarted_from,
reason: ctx.restart_reason,
original_input: ctx.original_input,
});
}
if (ctx.is_restored) {
ctx.log.info('Resuming workflow', {
original_id: ctx.resumed_from,
checkpoint: ctx.restored_checkpoint?.name,
completed_steps: ctx.completed_steps,
});
// Skip already completed steps
if (ctx.completed_steps.includes('validate-order')) {
ctx.log.info('Skipping validate-order (already completed)');
}
}
// Workflow logic...
}
SDK Workflow Methods
Methods available on ductape.workflows for managing workflows outside of the handler context.
ductape.workflows.define()
Define and register a workflow:
const workflow = await ductape.workflows.define({
product: string,
tag: string,
name: string,
description?: string,
options?: IWorkflowOptions,
signals?: Record<string, { input: Record<string, string> }>,
queries?: Record<string, { handler: (ctx) => unknown }>,
envs?: Array<{ slug: string; active: boolean }>,
handler: (ctx: IWorkflowContext) => Promise<TOutput>,
});
ductape.workflows.execute()
Execute a workflow immediately:
const result = await ductape.workflows.execute({
product: string,
env: string,
tag: string,
input: Record<string, unknown>,
workflow_id?: string,
idempotency_key?: string,
timeout?: number,
context?: Record<string, unknown>,
});
ductape.workflows.dispatch()
Schedule a workflow for later execution:
const result = await ductape.workflows.dispatch({
product: string,
env: string,
workflow: string,
input: Record<string, unknown>,
schedule?: {
start_at?: number | string, // When to start
cron?: string, // Cron expression
every?: number, // Interval in ms
timezone?: string, // Timezone
limit?: number, // Max executions
endDate?: string, // End date
},
options?: {
retries?: number,
timeout?: number,
},
});
// Returns
interface IWorkflowDispatchResult {
job_id: string;
status: 'scheduled' | 'queued';
scheduled_at: number;
recurring: boolean;
next_run_at?: number;
}
ductape.workflows.replay()
Re-execute a workflow with the same input:
const result = await ductape.workflows.replay({
product: string,
env: string,
workflow_id: string, // Original workflow ID
options?: {
retries?: number,
timeout?: number,
debug?: boolean,
},
reason?: string, // Audit reason
idempotency_key?: string,
});
// Returns
interface IWorkflowReplayResult extends IWorkflowResult {
replayed_from: string; // Original workflow ID
}
ductape.workflows.restart()
Re-execute a workflow with new or modified input:
const result = await ductape.workflows.restart({
product: string,
env: string,
workflow_id: string, // Original workflow ID
input?: Record<string, unknown>, // New input (replaces original)
input_override?: Record<string, unknown>, // Partial override
merge_input?: boolean, // Merge with original input
reason?: string,
options?: IWorkflowOptions,
});
// Returns
interface IWorkflowRestartResult extends IWorkflowResult {
restarted_from: string; // Original workflow ID
}
ductape.workflows.resume()
Continue a paused or failed workflow:
const result = await ductape.workflows.resume({
product: string,
env: string,
workflow_id: string,
from_checkpoint?: string, // Resume from checkpoint
from_step?: string, // Resume from step
skip_steps?: string[], // Steps to skip
input?: Record<string, unknown>, // Additional input
});
// Returns
interface IWorkflowResumeResult extends IWorkflowResult {
resumed_from: string; // Original workflow ID
resumed_checkpoint?: string;
}
ductape.workflows.replayFromStep()
Re-execute starting from a specific step:
const result = await ductape.workflows.replayFromStep({
product: string,
env: string,
workflow_id: string,
from_step: string, // Step to start from
step_outputs?: Record<string, unknown>, // Override previous step outputs
debug?: {
enabled: boolean,
pause_after_step?: boolean,
log_level?: 'info' | 'verbose' | 'debug',
},
});
ductape.workflows.signal()
Send a signal to a running workflow:
await ductape.workflows.signal({
product: string,
env: string,
workflow_id: string,
signal: string, // Signal name
payload: Record<string, unknown>, // Signal data
});
ductape.workflows.query()
Query a running workflow's state:
const result = await ductape.workflows.query<T>({
product: string,
env: string,
workflow_id: string,
query: string, // Query handler name
});
ductape.workflows.status()
Get workflow execution status:
const status = await ductape.workflows.status({
product: string,
env: string,
workflow_id: string,
});
// Returns
interface IWorkflowStatus {
workflow_id: string;
workflow_tag: string;
status: 'running' | 'completed' | 'failed' | 'rolled_back' | 'cancelled';
current_step?: string;
completed_steps: string[];
state: Record<string, unknown>;
started_at: number;
updated_at: number;
}
ductape.workflows.cancel()
Cancel a running workflow:
const result = await ductape.workflows.cancel({
product: string,
env: string,
workflow_id: string,
reason?: string,
});
// Returns
interface IWorkflowCancelResult {
cancelled: boolean;
rolled_back_steps: string[];
}
ductape.workflows.history()
Get workflow execution history:
const history = await ductape.workflows.history({
product: string,
env: string,
workflow_id: string,
include_step_details?: boolean,
include_rollback_details?: boolean,
});
// Returns
interface IWorkflowHistory {
workflow_id: string;
workflow_tag: string;
status: WorkflowStatus;
events: IWorkflowEvent[];
checkpoints: ICheckpoint[];
replays?: string[];
restarts?: string[];
}
ductape.workflows.stepDetail()
Get detailed information about a step:
const detail = await ductape.workflows.stepDetail({
product: string,
env: string,
workflow_id: string,
step_tag: string,
});
// Returns
interface IStepDetail {
tag: string;
name: string;
status: StepStatus;
input?: Record<string, unknown>;
output?: Record<string, unknown>;
error?: IStepError;
attempts: number;
start_time?: number;
end_time?: number;
duration?: number;
rollback_status?: RollbackStatus;
}
ductape.workflows.relatedExecutions()
List all related executions (replays, restarts, resumes):
const related = await ductape.workflows.relatedExecutions({
product: string,
env: string,
workflow_id: string,
});
// Returns
interface IRelatedExecutions {
original: string;
executions: Array<{
workflow_id: string;
type: 'original' | 'replay' | 'restart' | 'resume';
status: WorkflowStatus;
created_at: number;
replayed_from?: string;
restarted_from?: string;
resumed_from?: string;
}>;
}
ductape.workflows.compare()
Compare two workflow executions:
const comparison = await ductape.workflows.compare({
product: string,
env: string,
workflow_ids: string[], // Two workflow IDs to compare
});
// Returns
interface IExecutionComparison {
workflows: string[];
input_diff: Record<string, unknown[]>;
step_diffs: Array<{
step: string;
[workflow_id: string]: {
status: StepStatus;
output?: Record<string, unknown>;
error?: string;
};
}>;
outcome_diff: Record<string, { status: WorkflowStatus; output?: unknown }>;
}