Skip to main content

Context API Reference

Complete reference for all methods available on the workflow context (ctx) object.

ctx.step()

Define a workflow step with optional rollback.

const result = await ctx.step<T>(
tag: string,
handler: () => Promise<T>,
rollback?: ((result: T) => Promise<void>) | null,
options?: IWorkflowStepOptions
): Promise<T>;

Parameters

ParameterTypeDescription
tagstringUnique step identifier
handlerfunctionAsync function that performs the step's work
rollbackfunction | nullOptional function called during rollback
optionsobjectStep configuration options

Options

OptionTypeDefaultDescription
allow_failbooleanfalseContinue workflow if step fails
retriesnumber0Number of retry attempts
retry_delaynumber1000Milliseconds between retries
timeoutnumber-Step timeout in milliseconds
criticalbooleanfalseAlways rollback this step on failure

Examples

// Basic step
const user = await ctx.step('create-user', async () => {
return ctx.database.insert({ database: 'users', event: 'create', data: {} });
});

// With rollback
const charge = await ctx.step(
'charge',
async () => ctx.action.run({ app: 'stripe', event: 'charge', input: {} }),
async (result) => ctx.action.run({ app: 'stripe', event: 'refund', input: { body: { id: result.id } } })
);

// With options
await ctx.step(
'notify',
async () => ctx.notification.email({ /* ... */ }),
null,
{ allow_fail: true, retries: 3 }
);

ctx.action

Call external APIs through connected apps.

ctx.action.run()

const result = await ctx.action.run<T>({
app: string,
event: string,
input: {
body?: Record<string, unknown>,
headers?: Record<string, string>,
query?: Record<string, string>,
params?: Record<string, string>,
},
retries?: number,
timeout?: number,
}): Promise<T>;

Example

const charge = await ctx.action.run({
app: 'stripe',
event: 'create-charge',
input: {
body: { amount: 1000, currency: 'usd' },
headers: { 'Idempotency-Key': ctx.workflow_id },
},
retries: 3,
timeout: 10000,
});

ctx.database

Database operations.

ctx.database.insert()

const record = await ctx.database.insert<T>({
database: string,
event: string,
data: Record<string, unknown>,
}): Promise<T>;

ctx.database.query()

const records = await ctx.database.query<T>({
database: string,
event: string,
params?: Record<string, unknown>,
}): Promise<T[]>;

ctx.database.update()

const updated = await ctx.database.update<T>({
database: string,
event: string,
where: Record<string, unknown>,
data: Record<string, unknown>,
}): Promise<T>;

ctx.database.delete()

const success = await ctx.database.delete({
database: string,
event: string,
where: Record<string, unknown>,
}): Promise<boolean>;

ctx.database.execute()

const result = await ctx.database.execute<T>({
database: string,
event: string,
input: IDbActionRequest,
retries?: number,
timeout?: number,
}): Promise<T>;

ctx.graph

Graph database operations.

ctx.graph.createNode()

const node = await ctx.graph.createNode<T>({
graph: string,
labels: string[],
properties: Record<string, unknown>,
}): Promise<T>;

ctx.graph.updateNode()

const node = await ctx.graph.updateNode<T>({
graph: string,
id: string | number,
properties: Record<string, unknown>,
}): Promise<T>;

ctx.graph.deleteNode()

await ctx.graph.deleteNode({
graph: string,
id: string | number,
}): Promise<void>;

ctx.graph.createRelationship()

const rel = await ctx.graph.createRelationship<T>({
graph: string,
from: string,
to: string,
type: string,
properties?: Record<string, unknown>,
}): Promise<T>;

ctx.graph.deleteRelationship()

await ctx.graph.deleteRelationship({
graph: string,
id: string | number,
}): Promise<void>;

ctx.graph.query()

const results = await ctx.graph.query<T>({
graph: string,
action: string,
params?: Record<string, unknown>,
}): Promise<T[]>;

ctx.graph.execute()

const result = await ctx.graph.execute<T>({
graph: string,
action: string,
input: Record<string, unknown>,
}): Promise<T>;

ctx.notification

Send notifications.

ctx.notification.email()

await ctx.notification.email({
notification: string,
event: string,
recipients: string[],
subject: Record<string, unknown>,
template: Record<string, unknown>,
}): Promise<void>;

ctx.notification.sms()

await ctx.notification.sms({
notification: string,
event: string,
phones: string[],
message: Record<string, unknown>,
}): Promise<void>;

ctx.notification.push()

await ctx.notification.push({
notification: string,
event: string,
tokens: string[],
title: Record<string, unknown>,
body: Record<string, unknown>,
data?: Record<string, unknown>,
}): Promise<void>;

ctx.notification.send()

await ctx.notification.send({
notification: string,
event: string,
input: INotificationRequest,
retries?: number,
}): Promise<void>;

ctx.storage

File operations.

ctx.storage.upload()

const result = await ctx.storage.upload({
storage: string,
event: string,
input: {
buffer: Buffer | string,
fileName: string,
mimeType: string,
},
retries?: number,
}): Promise<{
file_key: string,
url?: string,
size?: number,
content_type?: string,
}>;

ctx.storage.download()

const file = await ctx.storage.download({
storage: string,
event: string,
input: { file_key: string },
}): Promise<{
content: Buffer | string,
content_type?: string,
size?: number,
}>;

ctx.storage.delete()

await ctx.storage.delete({
storage: string,
event: string,
input: { file_key: string },
}): Promise<void>;

ctx.publish

Message broker operations.

ctx.publish.send()

await ctx.publish.send({
broker: string,
event: string,
input: {
message: Record<string, unknown>,
},
retries?: number,
}): Promise<void>;

ctx.quota

Rate limiting.

ctx.quota.execute()

const result = await ctx.quota.execute<T>({
quota: string,
input: Record<string, unknown>,
timeout?: number,
}): Promise<T>;

ctx.fallback

Failure handling with alternatives.

ctx.fallback.execute()

const result = await ctx.fallback.execute<T>({
fallback: string,
input: Record<string, unknown>,
timeout?: number,
}): Promise<T>;

ctx.healthcheck

Check service availability.

ctx.healthcheck.getStatus()

const status = await ctx.healthcheck.getStatus(tag: string): Promise<{
status: 'available' | 'unavailable',
lastChecked?: string,
lastLatency?: number,
}>;

Control Flow

ctx.sleep()

Pause workflow execution.

await ctx.sleep(duration: number | string): Promise<void>;
FormatExampleDescription
number5000Milliseconds
string'5s'Seconds
string'5m'Minutes
string'1h'Hours
string'1d'Days

ctx.waitForSignal()

Wait for an external signal.

const payload = await ctx.waitForSignal<T>(
signal: string | string[],
options?: { timeout?: number | string }
): Promise<T>;

ctx.checkpoint()

Save state for recovery.

await ctx.checkpoint(
name: string,
metadata?: Record<string, unknown>
): Promise<void>;

ctx.triggerRollback()

Manually trigger rollback.

const result = await ctx.triggerRollback(reason: string): Promise<{
success: boolean,
rolled_back_steps: string[],
failed_steps?: Array<{ tag: string, error: string }>,
reason: string,
}>;

ctx.workflow()

Run a child workflow.

const result = await ctx.workflow<TInput, TOutput>(
childId: string,
tag: string,
input: TInput,
options?: {
timeout?: number | string,
retries?: number,
parent_close_policy?: 'terminate' | 'abandon' | 'request_cancel',
idempotency_key?: string,
}
): Promise<TOutput>;

State Management

ctx.setState()

ctx.setState(key: string, value: unknown): void;

ctx.getState()

const value = ctx.getState<T>(key: string): T | undefined;

Read-Only Properties

PropertyTypeDescription
ctx.stateobjectAll workflow state
ctx.stepsobjectResults from completed steps
ctx.completed_stepsstring[]Tags of completed steps
ctx.current_stepstring | nullCurrently executing step

Data Access

ctx.input

Read-only workflow input data.

const email = ctx.input.email;
const items = ctx.input.order.items;

ctx.variable()

Get app variable value.

const value = ctx.variable(app: string, key: string): unknown;

ctx.constant()

Get app constant value.

const value = ctx.constant(app: string, key: string): unknown;

ctx.token()

Get token value.

const token = ctx.token(key: string): string;

ctx.default()

Provide fallback for undefined values.

const value = ctx.default<T>(value: T | undefined, fallback: T): T;

ctx.transform

Data transformation utilities.

MethodDescription
size(obj)Number of keys in object
length(arr)Length of array or string
parseJson<T>(str)Parse JSON string
stringify(obj)Convert to JSON string
upper(str)Uppercase string
lower(str)Lowercase string
trim(str)Trim whitespace
split(str, sep)Split string
join(arr, sep)Join array
now()Current timestamp
formatDate(date, fmt)Format date

ctx.log

Logging methods.

ctx.log.debug(message: string, data?: Record<string, unknown>): void;
ctx.log.info(message: string, data?: Record<string, unknown>): void;
ctx.log.warn(message: string, data?: Record<string, unknown>): void;
ctx.log.error(message: string, data?: Record<string, unknown>): void;

Metadata Properties

PropertyTypeDescription
ctx.workflow_idstringUnique execution ID
ctx.workflow_tagstringWorkflow tag
ctx.envstringEnvironment (dev, staging, prd)
ctx.productstringProduct tag
ctx.contextobjectAdditional metadata
ctx.sessionobjectSession info (if provided)
ctx.authobjectAuthentication data

Replay, Restart & Resume Properties

Properties for detecting and handling replayed, restarted, or resumed workflows:

PropertyTypeDescription
ctx.is_replaybooleanWhether this is a replay execution
ctx.is_restartbooleanWhether this is a restart execution
ctx.is_restoredbooleanWhether this is a resumed execution
ctx.replayed_fromstring | undefinedOriginal workflow ID (if replay)
ctx.restarted_fromstring | undefinedOriginal workflow ID (if restart)
ctx.resumed_fromstring | undefinedOriginal workflow ID (if resume)
ctx.replay_reasonstring | undefinedReason for replay
ctx.restart_reasonstring | undefinedReason for restart
ctx.restored_checkpointobject | undefinedCheckpoint info (if resumed)
ctx.original_inputobject | undefinedOriginal input (for restart comparison)

Example: Detecting Replay/Restart/Resume

handler: async (ctx) => {
// Log execution context
if (ctx.is_replay) {
ctx.log.info('Replaying workflow', {
original_id: ctx.replayed_from,
reason: ctx.replay_reason,
});
}

if (ctx.is_restart) {
ctx.log.info('Restarted workflow', {
original_id: ctx.restarted_from,
reason: ctx.restart_reason,
original_input: ctx.original_input,
});
}

if (ctx.is_restored) {
ctx.log.info('Resuming workflow', {
original_id: ctx.resumed_from,
checkpoint: ctx.restored_checkpoint?.name,
completed_steps: ctx.completed_steps,
});

// Skip already completed steps
if (ctx.completed_steps.includes('validate-order')) {
ctx.log.info('Skipping validate-order (already completed)');
}
}

// Workflow logic...
}

SDK Workflow Methods

Methods available on ductape.workflows for managing workflows outside of the handler context.

ductape.workflows.define()

Define and register a workflow:

const workflow = await ductape.workflows.define({
product: string,
tag: string,
name: string,
description?: string,
options?: IWorkflowOptions,
signals?: Record<string, { input: Record<string, string> }>,
queries?: Record<string, { handler: (ctx) => unknown }>,
envs?: Array<{ slug: string; active: boolean }>,
handler: (ctx: IWorkflowContext) => Promise<TOutput>,
});

ductape.workflows.execute()

Execute a workflow immediately:

const result = await ductape.workflows.execute({
product: string,
env: string,
tag: string,
input: Record<string, unknown>,
workflow_id?: string,
idempotency_key?: string,
timeout?: number,
context?: Record<string, unknown>,
});

ductape.workflows.dispatch()

Schedule a workflow for later execution:

const result = await ductape.workflows.dispatch({
product: string,
env: string,
workflow: string,
input: Record<string, unknown>,
schedule?: {
start_at?: number | string, // When to start
cron?: string, // Cron expression
every?: number, // Interval in ms
timezone?: string, // Timezone
limit?: number, // Max executions
endDate?: string, // End date
},
options?: {
retries?: number,
timeout?: number,
},
});

// Returns
interface IWorkflowDispatchResult {
job_id: string;
status: 'scheduled' | 'queued';
scheduled_at: number;
recurring: boolean;
next_run_at?: number;
}

ductape.workflows.replay()

Re-execute a workflow with the same input:

const result = await ductape.workflows.replay({
product: string,
env: string,
workflow_id: string, // Original workflow ID
options?: {
retries?: number,
timeout?: number,
debug?: boolean,
},
reason?: string, // Audit reason
idempotency_key?: string,
});

// Returns
interface IWorkflowReplayResult extends IWorkflowResult {
replayed_from: string; // Original workflow ID
}

ductape.workflows.restart()

Re-execute a workflow with new or modified input:

const result = await ductape.workflows.restart({
product: string,
env: string,
workflow_id: string, // Original workflow ID
input?: Record<string, unknown>, // New input (replaces original)
input_override?: Record<string, unknown>, // Partial override
merge_input?: boolean, // Merge with original input
reason?: string,
options?: IWorkflowOptions,
});

// Returns
interface IWorkflowRestartResult extends IWorkflowResult {
restarted_from: string; // Original workflow ID
}

ductape.workflows.resume()

Continue a paused or failed workflow:

const result = await ductape.workflows.resume({
product: string,
env: string,
workflow_id: string,
from_checkpoint?: string, // Resume from checkpoint
from_step?: string, // Resume from step
skip_steps?: string[], // Steps to skip
input?: Record<string, unknown>, // Additional input
});

// Returns
interface IWorkflowResumeResult extends IWorkflowResult {
resumed_from: string; // Original workflow ID
resumed_checkpoint?: string;
}

ductape.workflows.replayFromStep()

Re-execute starting from a specific step:

const result = await ductape.workflows.replayFromStep({
product: string,
env: string,
workflow_id: string,
from_step: string, // Step to start from
step_outputs?: Record<string, unknown>, // Override previous step outputs
debug?: {
enabled: boolean,
pause_after_step?: boolean,
log_level?: 'info' | 'verbose' | 'debug',
},
});

ductape.workflows.signal()

Send a signal to a running workflow:

await ductape.workflows.signal({
product: string,
env: string,
workflow_id: string,
signal: string, // Signal name
payload: Record<string, unknown>, // Signal data
});

ductape.workflows.query()

Query a running workflow's state:

const result = await ductape.workflows.query<T>({
product: string,
env: string,
workflow_id: string,
query: string, // Query handler name
});

ductape.workflows.status()

Get workflow execution status:

const status = await ductape.workflows.status({
product: string,
env: string,
workflow_id: string,
});

// Returns
interface IWorkflowStatus {
workflow_id: string;
workflow_tag: string;
status: 'running' | 'completed' | 'failed' | 'rolled_back' | 'cancelled';
current_step?: string;
completed_steps: string[];
state: Record<string, unknown>;
started_at: number;
updated_at: number;
}

ductape.workflows.cancel()

Cancel a running workflow:

const result = await ductape.workflows.cancel({
product: string,
env: string,
workflow_id: string,
reason?: string,
});

// Returns
interface IWorkflowCancelResult {
cancelled: boolean;
rolled_back_steps: string[];
}

ductape.workflows.history()

Get workflow execution history:

const history = await ductape.workflows.history({
product: string,
env: string,
workflow_id: string,
include_step_details?: boolean,
include_rollback_details?: boolean,
});

// Returns
interface IWorkflowHistory {
workflow_id: string;
workflow_tag: string;
status: WorkflowStatus;
events: IWorkflowEvent[];
checkpoints: ICheckpoint[];
replays?: string[];
restarts?: string[];
}

ductape.workflows.stepDetail()

Get detailed information about a step:

const detail = await ductape.workflows.stepDetail({
product: string,
env: string,
workflow_id: string,
step_tag: string,
});

// Returns
interface IStepDetail {
tag: string;
name: string;
status: StepStatus;
input?: Record<string, unknown>;
output?: Record<string, unknown>;
error?: IStepError;
attempts: number;
start_time?: number;
end_time?: number;
duration?: number;
rollback_status?: RollbackStatus;
}

ductape.workflows.relatedExecutions()

List all related executions (replays, restarts, resumes):

const related = await ductape.workflows.relatedExecutions({
product: string,
env: string,
workflow_id: string,
});

// Returns
interface IRelatedExecutions {
original: string;
executions: Array<{
workflow_id: string;
type: 'original' | 'replay' | 'restart' | 'resume';
status: WorkflowStatus;
created_at: number;
replayed_from?: string;
restarted_from?: string;
resumed_from?: string;
}>;
}

ductape.workflows.compare()

Compare two workflow executions:

const comparison = await ductape.workflows.compare({
product: string,
env: string,
workflow_ids: string[], // Two workflow IDs to compare
});

// Returns
interface IExecutionComparison {
workflows: string[];
input_diff: Record<string, unknown[]>;
step_diffs: Array<{
step: string;
[workflow_id: string]: {
status: StepStatus;
output?: Record<string, unknown>;
error?: string;
};
}>;
outcome_diff: Record<string, { status: WorkflowStatus; output?: unknown }>;
}