Ploy
Features

Queues

Add message queues to your workers for background processing.

Queues

Ploy Queues let you send messages to be processed asynchronously by your worker. Messages are durably stored and delivered at least once.

Configuration

Add a queue binding in your ploy.yaml:

ploy.yaml
kind: worker
build: pnpm build
out: dist
queue:
  TASKS: tasks

The key (TASKS) is the binding name available in your worker's env. The value (tasks) is the queue identifier.

Run ploy types to generate TypeScript types:

env.d.ts
import type { QueueBinding } from "@meetploy/types";

export interface Env {
	TASKS: QueueBinding;
}

Basic Example

src/index.ts
import type { Env } from "./env.js";
import type { Ploy, QueueMessageEvent } from "@meetploy/types";

export default {
	async fetch(request, env) {
		// Send a message to the queue
		const { messageId } = await env.TASKS.send({
			task: "process-order",
			orderId: "123",
		});

		return Response.json({ messageId });
	},

	// Handle incoming messages
	async message(event: QueueMessageEvent) {
		console.log("Processing:", event.payload);
	},
} satisfies Ploy<Env>;

Sending Messages

Single Message

const { messageId } = await env.TASKS.send({ task: "process" });

Delayed Message

const { messageId } = await env.TASKS.send(
	{ task: "reminder" },
	{ delaySeconds: 60 },
);

Batch Send

const { messageIds } = await env.TASKS.sendBatch([
	{ payload: { task: "batch-1" } },
	{ payload: { task: "batch-2" } },
	{ payload: { task: "batch-3" } },
]);

Message Handler

The message handler receives a QueueMessageEvent:

async message(event: QueueMessageEvent) {
  console.log({
    id: event.id,
    queue: event.queueName,
    payload: event.payload,
    attempt: event.attempt,
  });
}

Messages that throw an error are automatically retried with exponential backoff.

Next Steps

  • Workflows - Orchestrate multi-step processes
  • Workers - Learn about Ploy workers

How is this guide?

Last updated on

Queues