Skip to main content

Distributed Transactions

The Warehouse uses the Saga pattern to execute distributed transactions across multiple data sources. This ensures data consistency even when operations span different databases, graphs, and vector stores.

Overview

Traditional ACID transactions don't work across different database systems. The Saga pattern breaks a transaction into a sequence of local transactions, each with a compensating action (rollback) that can undo its effects if a later step fails.

[Step 1] → [Step 2] → [Step 3] → ✅ Success
↓ ↓ ↓
[Undo 1] ← [Undo 2] ← [Undo 3] ← ❌ Failure

Basic Transaction

Execute multiple operations as a single logical transaction:

const result = await ductape.warehouse.transaction([
// Step 1: Create order in PostgreSQL
{
operation: 'insert',
from: {
type: 'database',
tag: 'orders-postgres',
entity: 'orders'
},
data: {
userId: 'user_123',
total: 99.99,
status: 'pending'
}
},
// Step 2: Create event in Neo4j graph
{
operation: 'insert',
from: {
type: 'graph',
tag: 'activity-neo4j',
entity: 'OrderEvent'
},
data: {
type: 'ORDER_PLACED',
userId: 'user_123',
timestamp: new Date().toISOString()
}
},
// Step 3: Update user behavior vector
{
operation: 'upsert',
from: {
type: 'vector',
tag: 'user-behavior',
entity: 'behaviors'
},
data: {
id: 'user_123_purchase',
vector: purchaseBehaviorVector,
metadata: { action: 'purchase', amount: 99.99 }
}
}
]);

if (result.status === 'completed') {
console.log('Transaction successful');
} else if (result.status === 'compensated') {
console.log('Transaction rolled back:', result.error);
}

Transaction Result

interface ISagaResult {
status: 'completed' | 'compensated' | 'failed';
executionTime: number;
stepResults: Record<string, {
data?: any[];
affectedRows?: number;
error?: string;
}>;
compensatedSteps?: string[];
error?: string;
}

Transaction Options

Configure transaction behavior with options:

const result = await ductape.warehouse.transaction(
operations,
{
timeout: 30000, // Total timeout in ms
retryOnFailure: true, // Retry failed steps
maxRetries: 3, // Max retry attempts per step
compensateOnTimeout: true // Rollback if timeout
}
);
OptionTypeDefaultDescription
timeoutnumber30000Maximum execution time in ms
retryOnFailurebooleanfalseAuto-retry failed steps
maxRetriesnumber3Max retries per step
compensateOnTimeoutbooleantrueRollback on timeout

Multi-Database Write

A common pattern - write to multiple databases atomically:

// Create a user across multiple systems
const result = await ductape.warehouse.transaction([
// 1. Create user in main database
{
operation: 'insert',
from: {
type: 'database',
tag: 'users-postgres',
entity: 'users'
},
data: {
id: userId,
email: 'john@example.com',
name: 'John Doe',
createdAt: new Date()
}
},
// 2. Create user profile in MongoDB
{
operation: 'insert',
from: {
type: 'database',
tag: 'profiles-mongo',
entity: 'profiles'
},
data: {
userId,
bio: '',
avatar: null,
preferences: defaultPreferences
}
},
// 3. Create user node in graph
{
operation: 'insert',
from: {
type: 'graph',
tag: 'social-neo4j',
entity: 'User'
},
data: {
id: userId,
name: 'John Doe',
joinedAt: new Date().toISOString()
}
},
// 4. Index user for search
{
operation: 'upsert',
from: {
type: 'vector',
tag: 'user-search',
entity: 'users'
},
data: {
id: userId,
vector: await generateUserEmbedding({ name: 'John Doe', email: 'john@example.com' }),
metadata: { name: 'John Doe', email: 'john@example.com' }
}
}
]);

Order Processing Example

Complete order processing with inventory, payment, and notification:

async function processOrder(order: Order) {
const result = await ductape.warehouse.transaction([
// 1. Create order record
{
operation: 'insert',
from: { type: 'database', tag: 'orders-db', entity: 'orders' },
data: {
id: order.id,
userId: order.userId,
items: order.items,
total: order.total,
status: 'processing'
}
},
// 2. Decrement inventory for each item
...order.items.map(item => ({
operation: 'update' as const,
from: { type: 'database', tag: 'inventory-db', entity: 'inventory' },
data: { quantity: { $decrement: item.quantity } },
where: { productId: { $eq: item.productId } }
})),
// 3. Create payment record
{
operation: 'insert',
from: { type: 'database', tag: 'payments-db', entity: 'payments' },
data: {
orderId: order.id,
amount: order.total,
status: 'pending',
method: order.paymentMethod
}
},
// 4. Log order event in graph
{
operation: 'insert',
from: { type: 'graph', tag: 'events-neo4j', entity: 'OrderEvent' },
data: {
orderId: order.id,
userId: order.userId,
type: 'ORDER_CREATED',
timestamp: new Date().toISOString()
}
}
]);

if (result.status !== 'completed') {
throw new Error(`Order failed: ${result.error}`);
}

return result;
}

Handling Failures

When a step fails, the Warehouse automatically compensates by undoing completed steps:

const result = await ductape.warehouse.transaction([
{ operation: 'insert', from: { type: 'database', tag: 'db1', entity: 'table1' }, data: { id: 1 } },
{ operation: 'insert', from: { type: 'database', tag: 'db2', entity: 'table2' }, data: { id: 1 } },
{ operation: 'insert', from: { type: 'database', tag: 'db3', entity: 'table3' }, data: { invalidData: true } } // Fails
]);

if (result.status === 'compensated') {
console.log('Steps that were rolled back:', result.compensatedSteps);
// ['step_0', 'step_1'] - First two inserts were deleted
console.log('Error:', result.error);
}

Compensation Strategies

The Warehouse uses these compensation strategies by operation type:

OperationCompensation
insertdelete the inserted record
updateupdate with original values
deleteinsert the deleted record
upsertDelete or restore original

Error Handling Best Practices

try {
const result = await ductape.warehouse.transaction(operations);

switch (result.status) {
case 'completed':
// All steps succeeded
return { success: true, data: result.stepResults };

case 'compensated':
// Failed but rolled back successfully
console.error('Transaction rolled back:', result.error);
return { success: false, rolledBack: true, error: result.error };

case 'failed':
// Failed and could not fully rollback
console.error('Transaction failed with partial state:', result.error);
console.error('Compensated steps:', result.compensatedSteps);
// Manual intervention may be needed
await alertOpsTeam(result);
return { success: false, rolledBack: false, error: result.error };
}
} catch (error) {
// Network or system error
console.error('System error during transaction:', error);
throw error;
}

Idempotency

For reliable retries, design operations to be idempotent:

// Good: Idempotent upsert with unique key
{
operation: 'upsert',
from: { type: 'database', tag: 'orders-db', entity: 'orders' },
data: {
idempotencyKey: `order_${userId}_${timestamp}`,
// ... order data
}
}

// Good: Conditional update
{
operation: 'update',
from: { type: 'database', tag: 'inventory-db', entity: 'inventory' },
data: { quantity: { $decrement: 1 } },
where: {
productId: { $eq: productId },
quantity: { $gte: 1 } // Only if sufficient stock
}
}

Monitoring Transactions

Track transaction execution in the result metadata:

const result = await ductape.warehouse.transaction(operations);

console.log('Execution time:', result.executionTime, 'ms');
console.log('Steps executed:', Object.keys(result.stepResults).length);

for (const [stepId, stepResult] of Object.entries(result.stepResults)) {
console.log(`${stepId}:`, {
success: !stepResult.error,
affectedRows: stepResult.affectedRows,
error: stepResult.error
});
}

Next Steps