Ploy
Ploy Start

Queues

Handle queue messages with type-safe payload validation.

Queues

Ploy Start provides type-safe queue handlers with Zod payload validation for processing background jobs.

Setup

1. Configure Queue Binding

Add a queue binding to your ploy.yaml:

ploy.yaml
kind: worker
build: pnpm build
out: dist
queue:
  TASKS: tasks

2. Generate Types

pnpm types

This creates env.d.ts with your queue binding typed as QueueBinding.

Defining Queue Handlers

Use .queue() to define a type-safe message handler:

src/index.ts
import { ploy, z } from "@meetploy/start";
import type { Env } from "./env.js";

const worker = ploy<Env>()
	.queue(
		"TASKS",
		{
			payload: z.object({
				task: z.string(),
				data: z.record(z.unknown()).optional(),
			}),
		},
		async (ctx) => {
			// ctx.message.payload is typed
			console.log("Processing:", ctx.message.payload.task);
			ctx.ack();
		},
	)
	.build();

export default worker;

The queue name in .queue() must match the binding name in your ploy.yaml (e.g., "TASKS").

Queue Context

The queue handler receives a typed context:

interface QueueContext<Env, State, Payload> {
	message: {
		id: string; // Unique message ID
		queueName: string; // Queue name
		payload: Payload; // Validated payload
		attempt: number; // Retry attempt (1-based)
		timestamp: Date; // When message was sent
	};
	env: Env; // Environment bindings
	state: State; // State from .state() calls
	ctx: ExecutionContext;
	ack: () => void; // Acknowledge success
	retry: (delayMs?: number) => void; // Retry later
	deadLetter: (reason?: string) => void; // Send to DLQ
}

Message Handling

Acknowledge Success

Call ctx.ack() to mark the message as processed:

.queue("TASKS", {...}, async (ctx) => {
  await processTask(ctx.message.payload);
  ctx.ack(); // Message processed successfully
})

Retry Later

Call ctx.retry() to requeue the message:

.queue("TASKS", {...}, async (ctx) => {
  try {
    await processTask(ctx.message.payload);
    ctx.ack();
  } catch (error) {
    if (ctx.message.attempt < 3) {
      ctx.retry(5000); // Retry in 5 seconds
    } else {
      ctx.deadLetter("Max retries exceeded");
    }
  }
})

Dead Letter Queue

Call ctx.deadLetter() to stop processing and log the failure:

.queue("TASKS", {...}, async (ctx) => {
  if (!isValidPayload(ctx.message.payload)) {
    ctx.deadLetter("Invalid payload");
    return;
  }
  // Process...
})

Sending Messages

Send messages to queues via the binding in your HTTP routes:

Single Message

.get("/queue/send", {
  response: z.object({ messageId: z.string() })
}, async (ctx) => {
  const { messageId } = await ctx.env.TASKS.send({
    task: "process-order",
    data: { orderId: "123" }
  });
  return { messageId };
})

Delayed Message

.get("/queue/send-delayed", {
  response: z.object({ messageId: z.string() })
}, async (ctx) => {
  const { messageId } = await ctx.env.TASKS.send(
    { task: "reminder" },
    { delaySeconds: 60 } // Delay by 60 seconds
  );
  return { messageId };
})

Batch Messages

.get("/queue/batch", {
  response: z.object({ messageIds: z.array(z.string()) })
}, async (ctx) => {
  const { messageIds } = await ctx.env.TASKS.sendBatch([
    { payload: { task: "job-1" } },
    { payload: { task: "job-2" } },
    { payload: { task: "job-3" }, delaySeconds: 30 }
  ]);
  return { messageIds };
})

Multiple Queues

Handle multiple queues by calling .queue() multiple times:

const worker = ploy<Env>()
	.queue(
		"EMAILS",
		{
			payload: z.object({
				to: z.string().email(),
				subject: z.string(),
				body: z.string(),
			}),
		},
		async (ctx) => {
			await sendEmail(ctx.message.payload);
			ctx.ack();
		},
	)

	.queue(
		"TASKS",
		{
			payload: z.object({
				task: z.string(),
				priority: z.enum(["low", "medium", "high"]),
			}),
		},
		async (ctx) => {
			await processTask(ctx.message.payload);
			ctx.ack();
		},
	)

	.build();

Using State in Queues

Access state from .state() calls in queue handlers:

import { withDrizzle } from "@meetploy/start";

const worker = ploy<Env>()
	.state(withDrizzle("DB", schema))

	.queue(
		"ORDERS",
		{
			payload: z.object({ orderId: z.string() }),
		},
		async (ctx) => {
			// Access database via state
			const order = await ctx.state.db
				.select()
				.from(orders)
				.where(eq(orders.id, ctx.message.payload.orderId))
				.limit(1);

			if (order[0]) {
				await processOrder(order[0]);
			}
			ctx.ack();
		},
	)

	.build();

Full Example

import { ploy, withDrizzle, z } from "@meetploy/start";
import { eq } from "drizzle-orm";
import type { Env } from "./env.js";
import * as schema from "./schema.js";

const worker = ploy<Env>()
	.state(withDrizzle("DB", schema))

	// Send message endpoint
	.post(
		"/orders",
		{
			body: z.object({
				userId: z.string(),
				items: z.array(
					z.object({
						productId: z.string(),
						quantity: z.number(),
					}),
				),
			}),
			response: z.object({
				orderId: z.string(),
				messageId: z.string(),
			}),
		},
		async (ctx) => {
			// Create order in database
			const orderId = crypto.randomUUID();

			// Queue background processing
			const { messageId } = await ctx.env.ORDER_QUEUE.send({
				action: "process",
				orderId,
				userId: ctx.body.userId,
				items: ctx.body.items,
			});

			return { orderId, messageId };
		},
	)

	// Queue handler
	.queue(
		"ORDER_QUEUE",
		{
			payload: z.object({
				action: z.enum(["process", "cancel", "refund"]),
				orderId: z.string(),
				userId: z.string(),
				items: z
					.array(
						z.object({
							productId: z.string(),
							quantity: z.number(),
						}),
					)
					.optional(),
			}),
		},
		async (ctx) => {
			const { action, orderId, userId } = ctx.message.payload;

			console.log(`Processing ${action} for order ${orderId}`);

			try {
				switch (action) {
					case "process":
						await processOrder(orderId, ctx.message.payload.items!);
						break;
					case "cancel":
						await cancelOrder(orderId);
						break;
					case "refund":
						await refundOrder(orderId);
						break;
				}
				ctx.ack();
			} catch (error) {
				if (ctx.message.attempt < 3) {
					ctx.retry(ctx.message.attempt * 1000);
				} else {
					ctx.deadLetter(`Failed after ${ctx.message.attempt} attempts`);
				}
			}
		},
	)

	.build();

export default worker;

Best Practices

  • Keep handlers idempotent - Messages may be delivered more than once
  • Use unique IDs - Include IDs in payloads to detect duplicates
  • Set appropriate retries - Don't retry indefinitely
  • Log message IDs - For debugging and tracing
  • Validate payloads - Zod validation catches malformed messages
  • Handle partial failures - Design for resilience

How is this guide?

Last updated on

Queues