Workflows
Build durable workflows with step functions and automatic retries.
Workflows
Ploy Start provides type-safe workflow handlers for building durable, multi-step processes with automatic state persistence and retries.
Setup
1. Configure Workflow Binding
Add a workflow binding to your ploy.yaml:
kind: worker
build: pnpm build
out: dist
workflow:
ORDER_FLOW: order_processingThe key is the binding name (accessed via ctx.env), the value is the workflow function name.
2. Generate Types
pnpm typesThis creates env.d.ts with your workflow binding typed as WorkflowBinding.
Defining Workflows
Use .workflow() to define a type-safe workflow:
import { ploy, z } from "@meetploy/start";
import type { Env } from "./env.js";
const worker = ploy<Env>()
.workflow(
"order_processing",
{
input: z.object({
orderId: z.string(),
amount: z.number(),
}),
output: z.object({
success: z.boolean(),
trackingNumber: z.string(),
}),
},
async (ctx) => {
// Workflow steps...
return { success: true, trackingNumber: "TRACK123" };
},
)
.build();
export default worker;The workflow name in .workflow() must match the value in your ploy.yaml
(e.g., "order_processing").
Workflow Context
The workflow handler receives a typed context:
interface EnhancedWorkflowContext<Env, Input> {
input: Input; // Validated workflow input
env: Env; // Environment bindings
executionId: string; // Unique execution ID
step: {
run: (name, fn, opts?) => Promise<T>;
sleep: (duration) => Promise<void>;
parallel: (name, fns) => Promise<T[]>;
};
log: (message, data?) => void;
}Step Functions
ctx.step.run
Execute a named step with automatic persistence:
.workflow("order_processing", {...}, async (ctx) => {
// Step 1: Validate
const validation = await ctx.step.run("validate", async () => {
if (!ctx.input.orderId) {
throw new Error("Missing orderId");
}
return { valid: true, orderId: ctx.input.orderId };
});
// Step 2: Charge payment
const payment = await ctx.step.run("charge", async () => {
return { paymentId: `pay_${Date.now()}`, status: "paid" };
});
// Step 3: Fulfill
const fulfillment = await ctx.step.run("fulfill", async () => {
return { trackingNumber: `TRACK_${Date.now()}` };
});
return {
success: true,
trackingNumber: fulfillment.trackingNumber
};
})Each step is persisted. If the workflow restarts, completed steps are skipped and their results are replayed from storage.
Step Options
Configure retries and timeouts:
await ctx.step.run(
"external-api",
async () => {
return await callExternalAPI();
},
{
retries: 3, // Retry up to 3 times
timeout: 30000, // Timeout after 30 seconds
},
);ctx.step.sleep
Pause execution for a duration:
// Sleep for milliseconds
await ctx.step.sleep(5000); // 5 seconds
// Sleep for human-readable duration
await ctx.step.sleep("30s"); // 30 seconds
await ctx.step.sleep("5m"); // 5 minutes
await ctx.step.sleep("1h"); // 1 hourSleep is durable - the workflow state is saved and resumed after the duration.
ctx.step.parallel
Run multiple operations in parallel:
const [userResult, orderResult, inventoryResult] = await ctx.step.parallel(
"fetch-data",
[
async () => await fetchUser(ctx.input.userId),
async () => await fetchOrder(ctx.input.orderId),
async () => await checkInventory(ctx.input.items),
],
);Triggering Workflows
Trigger workflows via the binding in HTTP routes:
Start Execution
.get("/workflow/trigger", {
response: z.object({ executionId: z.string() })
}, async (ctx) => {
const { executionId } = await ctx.env.ORDER_FLOW.trigger({
orderId: "order-123",
amount: 99.99
});
return { executionId };
})Check Status
.get("/workflow/status/:id", {
params: z.object({ id: z.string() }),
response: z.object({ execution: z.unknown() })
}, async (ctx) => {
const execution = await ctx.env.ORDER_FLOW.getExecution(ctx.params.id);
return { execution };
})Cancel Execution
.get("/workflow/cancel/:id", {
params: z.object({ id: z.string() }),
response: z.object({ cancelled: z.boolean() })
}, async (ctx) => {
await ctx.env.ORDER_FLOW.cancel(ctx.params.id);
return { cancelled: true };
})Error Handling
Handle errors in individual steps:
.workflow("order_processing", {...}, async (ctx) => {
// Retry payment with backoff
let payment;
for (let attempt = 0; attempt < 3; attempt++) {
payment = await ctx.step.run(`charge_attempt_${attempt}`, async () => {
const result = await chargeCard(ctx.input.amount);
if (result.status === "declined") {
throw new Error("Payment declined");
}
return result;
});
if (payment.status === "paid") break;
// Wait before retry
await ctx.step.sleep(attempt * 1000);
}
if (!payment || payment.status !== "paid") {
throw new Error("Payment failed after retries");
}
return { success: true, paymentId: payment.id };
})Full Example
import { ploy } from "@meetploy/start";
import { z } from "zod";
import type { Env } from "./env.js";
const worker = ploy<Env>()
// Trigger endpoint
.post(
"/orders",
{
body: z.object({
userId: z.string(),
items: z.array(
z.object({
productId: z.string(),
quantity: z.number(),
price: z.number(),
}),
),
}),
response: z.object({
orderId: z.string(),
executionId: z.string(),
}),
},
async (ctx) => {
const orderId = crypto.randomUUID();
const amount = ctx.body.items.reduce(
(sum, item) => sum + item.price * item.quantity,
0,
);
const { executionId } = await ctx.env.ORDER_FLOW.trigger({
orderId,
userId: ctx.body.userId,
items: ctx.body.items,
amount,
});
return { orderId, executionId };
},
)
// Status endpoint
.get(
"/orders/:id/status",
{
params: z.object({ id: z.string() }),
response: z.object({
status: z.string(),
result: z.unknown().optional(),
}),
},
async (ctx) => {
const execution = await ctx.env.ORDER_FLOW.getExecution(ctx.params.id);
return {
status: execution.status,
result: execution.result,
};
},
)
// Workflow definition
.workflow(
"order_processing",
{
input: z.object({
orderId: z.string(),
userId: z.string(),
items: z.array(
z.object({
productId: z.string(),
quantity: z.number(),
price: z.number(),
}),
),
amount: z.number(),
}),
output: z.object({
orderId: z.string(),
paymentId: z.string(),
trackingNumber: z.string(),
completedAt: z.string(),
}),
},
async (ctx) => {
ctx.log("Starting order processing", { orderId: ctx.input.orderId });
// Step 1: Validate inventory
const inventory = await ctx.step.run("check-inventory", async () => {
for (const item of ctx.input.items) {
const available = await checkStock(item.productId);
if (available < item.quantity) {
throw new Error(`Insufficient stock for ${item.productId}`);
}
}
return { available: true };
});
// Step 2: Reserve inventory
await ctx.step.run("reserve-inventory", async () => {
for (const item of ctx.input.items) {
await reserveStock(item.productId, item.quantity);
}
});
// Step 3: Process payment with retry
let payment;
for (let attempt = 0; attempt < 3; attempt++) {
payment = await ctx.step.run(`payment-attempt-${attempt}`, async () => {
return await processPayment(ctx.input.userId, ctx.input.amount);
});
if (payment.status === "succeeded") break;
await ctx.step.sleep("5s");
}
if (!payment || payment.status !== "succeeded") {
// Rollback inventory reservation
await ctx.step.run("rollback-inventory", async () => {
for (const item of ctx.input.items) {
await releaseStock(item.productId, item.quantity);
}
});
throw new Error("Payment failed");
}
// Step 4: Create shipment
const shipment = await ctx.step.run("create-shipment", async () => {
return await createShipment(ctx.input.orderId, ctx.input.items);
});
// Step 5: Send confirmation email
await ctx.step.run("send-confirmation", async () => {
await sendEmail(ctx.input.userId, {
subject: "Order Confirmed",
body: `Your order ${ctx.input.orderId} is on its way!`,
});
});
// Step 6: Wait and send follow-up
await ctx.step.sleep("24h");
await ctx.step.run("send-followup", async () => {
await sendEmail(ctx.input.userId, {
subject: "How was your order?",
body: "We hope you enjoyed your purchase!",
});
});
return {
orderId: ctx.input.orderId,
paymentId: payment.id,
trackingNumber: shipment.trackingNumber,
completedAt: new Date().toISOString(),
};
},
)
.build();
export default worker;Best Practices
- Use unique step names - Each step needs a unique name for replay
- Keep steps atomic - One logical operation per step
- Handle failures gracefully - Use try/catch and implement rollback logic
- Log progress - Use
ctx.log()for debugging - Set timeouts - Prevent steps from hanging indefinitely
- Design for idempotency - Steps may be retried
How is this guide?
Last updated on