Skip to main content
Preview
Preview Feature — This feature is currently in preview and under active development. APIs and functionality may change. We recommend testing thoroughly before using in production.

Context API Reference

Complete reference for all methods available on the workflow context (ctx) object.

ctx.step()

Define a workflow step with optional rollback.

const result = await ctx.step<T>(
tag: string,
handler: () => Promise<T>,
rollback?: ((result: T) => Promise<void>) | null,
options?: IWorkflowStepOptions
): Promise<T>;

Parameters

ParameterTypeDescription
tagstringUnique step identifier
handlerfunctionAsync function that performs the step's work
rollbackfunction | nullOptional function called during rollback
optionsobjectStep configuration options

Options

OptionTypeDefaultDescription
allow_failbooleanfalseContinue workflow if step fails
retriesnumber0Number of retry attempts
retry_delaynumber1000Milliseconds between retries
timeoutnumber-Step timeout in milliseconds
criticalbooleanfalseAlways rollback this step on failure

Examples

// Basic step
const user = await ctx.step('create-user', async () => {
return ctx.database.insert({ database: 'users', event: 'create', data: {} });
});

// With rollback
const charge = await ctx.step(
'charge',
async () => ctx.action.run({ app: 'stripe', event: 'charge', input: { amount: 1000 } }),
async (result) => ctx.action.run({ app: 'stripe', event: 'refund', input: { chargeId: result.id } })
);

// With options
await ctx.step(
'notify',
async () => ctx.notification.email({ /* ... */ }),
null,
{ allow_fail: true, retries: 3 }
);

ctx.action

Call external APIs through connected apps.

ctx.action.run()

const result = await ctx.action.run<T>({
app: string,
event: string,
input: Record<string, unknown>, // Flat input format
retries?: number,
timeout?: number,
}): Promise<T>;

Flat Input Format

Fields are automatically resolved to the correct location (body, params, query, or headers) based on the action's schema:

input: {
amount: 1000, // auto-resolves to body.amount
currency: 'usd', // auto-resolves to body.currency
userId: '123' // auto-resolves to params.userId
}

For conflicting keys, use prefix syntax:

PrefixTarget LocationExample
body:Request body'body:id': 'item_456'
params:Route parameters'params:id': 'user_123'
query:Query parameters'query:limit': 10
headers:HTTP headers'headers:X-Custom': 'value'

Example

const charge = await ctx.action.run({
app: 'stripe',
event: 'create-charge',
input: {
amount: 1000,
currency: 'usd',
'headers:Idempotency-Key': ctx.workflow_id
},
retries: 3,
timeout: 10000,
});

ctx.database

Database operations.

ctx.database.insert()

const record = await ctx.database.insert<T>({
database: string,
event: string,
data: Record<string, unknown>,
}): Promise<T>;

ctx.database.query()

const records = await ctx.database.query<T>({
database: string,
event: string,
params?: Record<string, unknown>,
}): Promise<T[]>;

ctx.database.update()

const updated = await ctx.database.update<T>({
database: string,
event: string,
where: Record<string, unknown>,
data: Record<string, unknown>,
}): Promise<T>;

ctx.database.delete()

const success = await ctx.database.delete({
database: string,
event: string,
where: Record<string, unknown>,
}): Promise<boolean>;

ctx.database.execute()

const result = await ctx.database.execute<T>({
database: string,
event: string,
input: IDbActionRequest,
retries?: number,
timeout?: number,
}): Promise<T>;

ctx.graph

Graph database operations.

ctx.graph.createNode()

const node = await ctx.graph.createNode<T>({
graph: string,
labels: string[],
properties: Record<string, unknown>,
}): Promise<T>;

ctx.graph.updateNode()

const node = await ctx.graph.updateNode<T>({
graph: string,
id: string | number,
properties: Record<string, unknown>,
}): Promise<T>;

ctx.graph.deleteNode()

await ctx.graph.deleteNode({
graph: string,
id: string | number,
}): Promise<void>;

ctx.graph.createRelationship()

const rel = await ctx.graph.createRelationship<T>({
graph: string,
from: string,
to: string,
type: string,
properties?: Record<string, unknown>,
}): Promise<T>;

ctx.graph.deleteRelationship()

await ctx.graph.deleteRelationship({
graph: string,
id: string | number,
}): Promise<void>;

ctx.graph.query()

const results = await ctx.graph.query<T>({
graph: string,
action: string,
params?: Record<string, unknown>,
}): Promise<T[]>;

ctx.graph.execute()

const result = await ctx.graph.execute<T>({
graph: string,
action: string,
input: Record<string, unknown>,
}): Promise<T>;

ctx.notification

Send notifications.

ctx.notification.email()

await ctx.notification.email({
notification: string,
event: string,
recipients: string[],
subject: Record<string, unknown>,
template: Record<string, unknown>,
}): Promise<void>;

ctx.notification.sms()

await ctx.notification.sms({
notification: string,
event: string,
phones: string[],
message: Record<string, unknown>,
}): Promise<void>;

ctx.notification.push()

await ctx.notification.push({
notification: string,
event: string,
tokens: string[],
title: Record<string, unknown>,
body: Record<string, unknown>,
data?: Record<string, unknown>,
}): Promise<void>;

ctx.notification.send()

await ctx.notification.send({
notification: string,
event: string,
input: INotificationRequest,
retries?: number,
}): Promise<void>;

ctx.storage

File operations.

ctx.storage.upload()

const result = await ctx.storage.upload({
storage: string,
event: string,
input: {
buffer: Buffer | string,
fileName: string,
mimeType: string,
},
retries?: number,
}): Promise<{
file_key: string,
url?: string,
size?: number,
content_type?: string,
}>;

ctx.storage.download()

const file = await ctx.storage.download({
storage: string,
event: string,
input: { file_key: string },
}): Promise<{
content: Buffer | string,
content_type?: string,
size?: number,
}>;

ctx.storage.delete()

await ctx.storage.delete({
storage: string,
event: string,
input: { file_key: string },
}): Promise<void>;

ctx.publish

Message broker operations.

ctx.publish.send()

await ctx.publish.send({
broker: string,
event: string,
input: {
message: Record<string, unknown>,
},
retries?: number,
}): Promise<void>;

ctx.quota

Rate limiting.

ctx.quota.execute()

const result = await ctx.quota.execute<T>({
quota: string,
input: Record<string, unknown>,
timeout?: number,
}): Promise<T>;

ctx.fallback

Failure handling with alternatives.

ctx.fallback.execute()

const result = await ctx.fallback.execute<T>({
fallback: string,
input: Record<string, unknown>,
timeout?: number,
}): Promise<T>;

ctx.healthcheck

Check service availability.

ctx.healthcheck.getStatus()

const status = await ctx.healthcheck.getStatus(tag: string): Promise<{
status: 'available' | 'unavailable',
lastChecked?: string,
lastLatency?: number,
}>;

Control Flow

ctx.sleep()

Pause workflow execution.

await ctx.sleep(duration: number | string): Promise<void>;
FormatExampleDescription
number5000Milliseconds
string'5s'Seconds
string'5m'Minutes
string'1h'Hours
string'1d'Days

ctx.waitForSignal()

Wait for an external signal.

const payload = await ctx.waitForSignal<T>(
signal: string | string[],
options?: { timeout?: number | string }
): Promise<T>;

ctx.checkpoint()

Save state for recovery.

await ctx.checkpoint(
name: string,
metadata?: Record<string, unknown>
): Promise<void>;

ctx.triggerRollback()

Manually trigger rollback.

const result = await ctx.triggerRollback(reason: string): Promise<{
success: boolean,
rolled_back_steps: string[],
failed_steps?: Array<{ tag: string, error: string }>,
reason: string,
}>;

ctx.workflow()

Run a child workflow.

const result = await ctx.workflow<TInput, TOutput>(
childId: string,
tag: string,
input: TInput,
options?: {
timeout?: number | string,
retries?: number,
parent_close_policy?: 'terminate' | 'abandon' | 'request_cancel',
idempotency_key?: string,
}
): Promise<TOutput>;

State Management

ctx.setState()

ctx.setState(key: string, value: unknown): void;

ctx.getState()

const value = ctx.getState<T>(key: string): T | undefined;

Read-Only Properties

PropertyTypeDescription
ctx.stateobjectAll workflow state
ctx.stepsobjectResults from completed steps
ctx.completed_stepsstring[]Tags of completed steps
ctx.current_stepstring | nullCurrently executing step

Data Access

ctx.input

Read-only workflow input data.

const email = ctx.input.email;
const items = ctx.input.order.items;

ctx.variable()

Get app variable value.

const value = ctx.variable(app: string, key: string): unknown;

ctx.constant()

Get app constant value.

const value = ctx.constant(app: string, key: string): unknown;

ctx.token()

Get token value.

const token = ctx.token(key: string): string;

ctx.default()

Provide fallback for undefined values.

const value = ctx.default<T>(value: T | undefined, fallback: T): T;

ctx.transform

Data transformation utilities.

MethodDescription
size(obj)Number of keys in object
length(arr)Length of array or string
parseJson<T>(str)Parse JSON string
stringify(obj)Convert to JSON string
upper(str)Uppercase string
lower(str)Lowercase string
trim(str)Trim whitespace
split(str, sep)Split string
join(arr, sep)Join array
now()Current timestamp
formatDate(date, fmt)Format date

ctx.log

Logging methods.

ctx.log.debug(message: string, data?: Record<string, unknown>): void;
ctx.log.info(message: string, data?: Record<string, unknown>): void;
ctx.log.warn(message: string, data?: Record<string, unknown>): void;
ctx.log.error(message: string, data?: Record<string, unknown>): void;

Metadata Properties

PropertyTypeDescription
ctx.workflow_idstringUnique execution ID
ctx.workflow_tagstringWorkflow tag
ctx.envstringEnvironment (dev, staging, prd)
ctx.productstringProduct tag
ctx.contextobjectAdditional metadata
ctx.sessionobjectSession info (if provided)
ctx.authobjectAuthentication data

Replay, Restart & Resume Properties

Properties for detecting and handling replayed, restarted, or resumed workflows:

PropertyTypeDescription
ctx.is_replaybooleanWhether this is a replay execution
ctx.is_restartbooleanWhether this is a restart execution
ctx.is_restoredbooleanWhether this is a resumed execution
ctx.replayed_fromstring | undefinedOriginal workflow ID (if replay)
ctx.restarted_fromstring | undefinedOriginal workflow ID (if restart)
ctx.resumed_fromstring | undefinedOriginal workflow ID (if resume)
ctx.replay_reasonstring | undefinedReason for replay
ctx.restart_reasonstring | undefinedReason for restart
ctx.restored_checkpointobject | undefinedCheckpoint info (if resumed)
ctx.original_inputobject | undefinedOriginal input (for restart comparison)

Example: Detecting Replay/Restart/Resume

handler: async (ctx) => {
// Log execution context
if (ctx.is_replay) {
ctx.log.info('Replaying workflow', {
original_id: ctx.replayed_from,
reason: ctx.replay_reason,
});
}

if (ctx.is_restart) {
ctx.log.info('Restarted workflow', {
original_id: ctx.restarted_from,
reason: ctx.restart_reason,
original_input: ctx.original_input,
});
}

if (ctx.is_restored) {
ctx.log.info('Resuming workflow', {
original_id: ctx.resumed_from,
checkpoint: ctx.restored_checkpoint?.name,
completed_steps: ctx.completed_steps,
});

// Skip already completed steps
if (ctx.completed_steps.includes('validate-order')) {
ctx.log.info('Skipping validate-order (already completed)');
}
}

// Workflow logic...
}

SDK Workflow Methods

Methods available on ductape.workflows for managing workflows outside of the handler context.

ductape.workflows.define()

Define and register a workflow:

const workflow = await ductape.workflows.define({
product: string,
tag: string,
name: string,
description?: string,
options?: IWorkflowOptions,
signals?: Record<string, { input: Record<string, string> }>,
queries?: Record<string, { handler: (ctx) => unknown }>,
envs?: Array<{ slug: string; active: boolean }>,
handler: (ctx: IWorkflowContext) => Promise<TOutput>,
});

ductape.workflows.execute()

Execute a workflow immediately:

const result = await ductape.workflows.execute({
product: string,
env: string,
tag: string,
input: Record<string, unknown>,
workflow_id?: string,
idempotency_key?: string,
timeout?: number,
context?: Record<string, unknown>,
});

ductape.workflows.dispatch()

Schedule a workflow for later execution:

const result = await ductape.workflows.dispatch({
product: string,
env: string,
workflow: string,
input: Record<string, unknown>,
schedule?: {
start_at?: number | string, // When to start
cron?: string, // Cron expression
every?: number, // Interval in ms
timezone?: string, // Timezone
limit?: number, // Max executions
endDate?: string, // End date
},
options?: {
retries?: number,
timeout?: number,
},
});

// Returns
interface IWorkflowDispatchResult {
job_id: string;
status: 'scheduled' | 'queued';
scheduled_at: number;
recurring: boolean;
next_run_at?: number;
}

ductape.workflows.replay()

Re-execute a workflow with the same input:

const result = await ductape.workflows.replay({
product: string,
env: string,
workflow_id: string, // Original workflow ID
options?: {
retries?: number,
timeout?: number,
debug?: boolean,
},
reason?: string, // Audit reason
idempotency_key?: string,
});

// Returns
interface IWorkflowReplayResult extends IWorkflowResult {
replayed_from: string; // Original workflow ID
}

ductape.workflows.restart()

Re-execute a workflow with new or modified input:

const result = await ductape.workflows.restart({
product: string,
env: string,
workflow_id: string, // Original workflow ID
input?: Record<string, unknown>, // New input (replaces original)
input_override?: Record<string, unknown>, // Partial override
merge_input?: boolean, // Merge with original input
reason?: string,
options?: IWorkflowOptions,
});

// Returns
interface IWorkflowRestartResult extends IWorkflowResult {
restarted_from: string; // Original workflow ID
}

ductape.workflows.resume()

Continue a paused or failed workflow:

const result = await ductape.workflows.resume({
product: string,
env: string,
workflow_id: string,
from_checkpoint?: string, // Resume from checkpoint
from_step?: string, // Resume from step
skip_steps?: string[], // Steps to skip
input?: Record<string, unknown>, // Additional input
});

// Returns
interface IWorkflowResumeResult extends IWorkflowResult {
resumed_from: string; // Original workflow ID
resumed_checkpoint?: string;
}

ductape.workflows.replayFromStep()

Re-execute starting from a specific step:

const result = await ductape.workflows.replayFromStep({
product: string,
env: string,
workflow_id: string,
from_step: string, // Step to start from
step_outputs?: Record<string, unknown>, // Override previous step outputs
debug?: {
enabled: boolean,
pause_after_step?: boolean,
log_level?: 'info' | 'verbose' | 'debug',
},
});

ductape.workflows.signal()

Send a signal to a running workflow:

await ductape.workflows.signal({
product: string,
env: string,
workflow_id: string,
signal: string, // Signal name
payload: Record<string, unknown>, // Signal data
});

ductape.workflows.query()

Query a running workflow's state:

const result = await ductape.workflows.query<T>({
product: string,
env: string,
workflow_id: string,
query: string, // Query handler name
});

ductape.workflows.status()

Get workflow execution status:

const status = await ductape.workflows.status({
product: string,
env: string,
workflow_id: string,
});

// Returns
interface IWorkflowStatus {
workflow_id: string;
workflow_tag: string;
status: 'running' | 'completed' | 'failed' | 'rolled_back' | 'cancelled';
current_step?: string;
completed_steps: string[];
state: Record<string, unknown>;
started_at: number;
updated_at: number;
}

ductape.workflows.cancel()

Cancel a running workflow:

const result = await ductape.workflows.cancel({
product: string,
env: string,
workflow_id: string,
reason?: string,
});

// Returns
interface IWorkflowCancelResult {
cancelled: boolean;
rolled_back_steps: string[];
}

ductape.workflows.history()

Get workflow execution history:

const history = await ductape.workflows.history({
product: string,
env: string,
workflow_id: string,
include_step_details?: boolean,
include_rollback_details?: boolean,
});

// Returns
interface IWorkflowHistory {
workflow_id: string;
workflow_tag: string;
status: WorkflowStatus;
events: IWorkflowEvent[];
checkpoints: ICheckpoint[];
replays?: string[];
restarts?: string[];
}

ductape.workflows.stepDetail()

Get detailed information about a step:

const detail = await ductape.workflows.stepDetail({
product: string,
env: string,
workflow_id: string,
step_tag: string,
});

// Returns
interface IStepDetail {
tag: string;
name: string;
status: StepStatus;
input?: Record<string, unknown>;
output?: Record<string, unknown>;
error?: IStepError;
attempts: number;
start_time?: number;
end_time?: number;
duration?: number;
rollback_status?: RollbackStatus;
}

ductape.workflows.relatedExecutions()

List all related executions (replays, restarts, resumes):

const related = await ductape.workflows.relatedExecutions({
product: string,
env: string,
workflow_id: string,
});

// Returns
interface IRelatedExecutions {
original: string;
executions: Array<{
workflow_id: string;
type: 'original' | 'replay' | 'restart' | 'resume';
status: WorkflowStatus;
created_at: number;
replayed_from?: string;
restarted_from?: string;
resumed_from?: string;
}>;
}

ductape.workflows.compare()

Compare two workflow executions:

const comparison = await ductape.workflows.compare({
product: string,
env: string,
workflow_ids: string[], // Two workflow IDs to compare
});

// Returns
interface IExecutionComparison {
workflows: string[];
input_diff: Record<string, unknown[]>;
step_diffs: Array<{
step: string;
[workflow_id: string]: {
status: StepStatus;
output?: Record<string, unknown>;
error?: string;
};
}>;
outcome_diff: Record<string, { status: WorkflowStatus; output?: unknown }>;
}