Queues
Handle queue messages with type-safe payload validation.
Queues
Ploy Start provides type-safe queue handlers with Zod payload validation for processing background jobs.
Setup
1. Configure Queue Binding
Add a queue binding to your ploy.yaml:
kind: worker
build: pnpm build
out: dist
queue:
TASKS: tasks2. Generate Types
pnpm typesThis creates env.d.ts with your queue binding typed as QueueBinding.
Defining Queue Handlers
Use .queue() to define a type-safe message handler:
import { ploy, z } from "@meetploy/start";
import type { Env } from "./env.js";
const worker = ploy<Env>()
.queue(
"TASKS",
{
payload: z.object({
task: z.string(),
data: z.record(z.unknown()).optional(),
}),
},
async (ctx) => {
// ctx.message.payload is typed
console.log("Processing:", ctx.message.payload.task);
ctx.ack();
},
)
.build();
export default worker;The queue name in .queue() must match the binding name in your ploy.yaml
(e.g., "TASKS").
Queue Context
The queue handler receives a typed context:
interface QueueContext<Env, State, Payload> {
message: {
id: string; // Unique message ID
queueName: string; // Queue name
payload: Payload; // Validated payload
attempt: number; // Retry attempt (1-based)
timestamp: Date; // When message was sent
};
env: Env; // Environment bindings
state: State; // State from .state() calls
ctx: ExecutionContext;
ack: () => void; // Acknowledge success
retry: (delayMs?: number) => void; // Retry later
deadLetter: (reason?: string) => void; // Send to DLQ
}Message Handling
Acknowledge Success
Call ctx.ack() to mark the message as processed:
.queue("TASKS", {...}, async (ctx) => {
await processTask(ctx.message.payload);
ctx.ack(); // Message processed successfully
})Retry Later
Call ctx.retry() to requeue the message:
.queue("TASKS", {...}, async (ctx) => {
try {
await processTask(ctx.message.payload);
ctx.ack();
} catch (error) {
if (ctx.message.attempt < 3) {
ctx.retry(5000); // Retry in 5 seconds
} else {
ctx.deadLetter("Max retries exceeded");
}
}
})Dead Letter Queue
Call ctx.deadLetter() to stop processing and log the failure:
.queue("TASKS", {...}, async (ctx) => {
if (!isValidPayload(ctx.message.payload)) {
ctx.deadLetter("Invalid payload");
return;
}
// Process...
})Sending Messages
Send messages to queues via the binding in your HTTP routes:
Single Message
.get("/queue/send", {
response: z.object({ messageId: z.string() })
}, async (ctx) => {
const { messageId } = await ctx.env.TASKS.send({
task: "process-order",
data: { orderId: "123" }
});
return { messageId };
})Delayed Message
.get("/queue/send-delayed", {
response: z.object({ messageId: z.string() })
}, async (ctx) => {
const { messageId } = await ctx.env.TASKS.send(
{ task: "reminder" },
{ delaySeconds: 60 } // Delay by 60 seconds
);
return { messageId };
})Batch Messages
.get("/queue/batch", {
response: z.object({ messageIds: z.array(z.string()) })
}, async (ctx) => {
const { messageIds } = await ctx.env.TASKS.sendBatch([
{ payload: { task: "job-1" } },
{ payload: { task: "job-2" } },
{ payload: { task: "job-3" }, delaySeconds: 30 }
]);
return { messageIds };
})Multiple Queues
Handle multiple queues by calling .queue() multiple times:
const worker = ploy<Env>()
.queue(
"EMAILS",
{
payload: z.object({
to: z.string().email(),
subject: z.string(),
body: z.string(),
}),
},
async (ctx) => {
await sendEmail(ctx.message.payload);
ctx.ack();
},
)
.queue(
"TASKS",
{
payload: z.object({
task: z.string(),
priority: z.enum(["low", "medium", "high"]),
}),
},
async (ctx) => {
await processTask(ctx.message.payload);
ctx.ack();
},
)
.build();Using State in Queues
Access state from .state() calls in queue handlers:
import { withDrizzle } from "@meetploy/start";
const worker = ploy<Env>()
.state(withDrizzle("DB", schema))
.queue(
"ORDERS",
{
payload: z.object({ orderId: z.string() }),
},
async (ctx) => {
// Access database via state
const order = await ctx.state.db
.select()
.from(orders)
.where(eq(orders.id, ctx.message.payload.orderId))
.limit(1);
if (order[0]) {
await processOrder(order[0]);
}
ctx.ack();
},
)
.build();Full Example
import { ploy, withDrizzle, z } from "@meetploy/start";
import { eq } from "drizzle-orm";
import type { Env } from "./env.js";
import * as schema from "./schema.js";
const worker = ploy<Env>()
.state(withDrizzle("DB", schema))
// Send message endpoint
.post(
"/orders",
{
body: z.object({
userId: z.string(),
items: z.array(
z.object({
productId: z.string(),
quantity: z.number(),
}),
),
}),
response: z.object({
orderId: z.string(),
messageId: z.string(),
}),
},
async (ctx) => {
// Create order in database
const orderId = crypto.randomUUID();
// Queue background processing
const { messageId } = await ctx.env.ORDER_QUEUE.send({
action: "process",
orderId,
userId: ctx.body.userId,
items: ctx.body.items,
});
return { orderId, messageId };
},
)
// Queue handler
.queue(
"ORDER_QUEUE",
{
payload: z.object({
action: z.enum(["process", "cancel", "refund"]),
orderId: z.string(),
userId: z.string(),
items: z
.array(
z.object({
productId: z.string(),
quantity: z.number(),
}),
)
.optional(),
}),
},
async (ctx) => {
const { action, orderId, userId } = ctx.message.payload;
console.log(`Processing ${action} for order ${orderId}`);
try {
switch (action) {
case "process":
await processOrder(orderId, ctx.message.payload.items!);
break;
case "cancel":
await cancelOrder(orderId);
break;
case "refund":
await refundOrder(orderId);
break;
}
ctx.ack();
} catch (error) {
if (ctx.message.attempt < 3) {
ctx.retry(ctx.message.attempt * 1000);
} else {
ctx.deadLetter(`Failed after ${ctx.message.attempt} attempts`);
}
}
},
)
.build();
export default worker;Best Practices
- Keep handlers idempotent - Messages may be delivered more than once
- Use unique IDs - Include IDs in payloads to detect duplicates
- Set appropriate retries - Don't retry indefinitely
- Log message IDs - For debugging and tracing
- Validate payloads - Zod validation catches malformed messages
- Handle partial failures - Design for resilience
How is this guide?
Last updated on