Async Tasks in Cloudflare Workers – Part 2: Decomposing tasks into multiple Workers
In Part 1, we explored strategies for managing asynchronous tasks within a single Cloudflare Worker, focusing on sequential execution and parallel background processing with waitUntil(). While this approach works well for simple cases, complex systems require robust error handling, retry mechanisms, and separation of concerns. These patterns become more manageable when we distribute asynchronous tasks across multiple workers. In Part 2, we will: Distribute tasks across workers using Service Bindings Then explore how to achieve enhanced fault tolerance with Cloudflare Queues. Let's start by breaking down the Marathon Sign-up Worker from Part 1! 1. Cloudflare Service Bindings: Service Decomposition In our Marathon Registration API, we have two main tasks: storing the runner's data in a database and sending a confirmation email. To decouple these tasks, we can create a separate Email Worker that handles sending emails. When using Cloudflare’s Service Bindings, workers can call each other within the same thread using fetch or rpc. This means there is no additional latency compared to a single-worker approach, and the performance remains consistent. You can find the code examples to the blog here Why Opt for Separation? Transitioning to a (micro)service architecture with Cloudflare Workers provides several key benefits: Independently deployable: Each service can be updated and deployed without affecting others. Reusibility: A single worker can serve multiple APIs or applications, enhancing reusability. Imagine we have an additional newsletter subscription service that also requires sending emails. Improved security: Services can operate without a public interface, reducing exposure and potential attack surfaces. The email worker can be now kept private and only accessed within the Cloudflare network. Setting Up Service Bindings Bindings can be set up quite easily in the wrangler.toml file. services = [ { binding = "WORKER_EMAIL", service = "worker-email" } ] The only thing you have to make sure of is that the service name matches the name of the worker you want to bind to. The worker that is being bound (in our case, the worker that sends the email) needs to extend the WorkerEntryPoint class. export class WorkerEmail extends WorkerEntrypoint { // Implement worker functions here } Start both workers locally (in separate terminals), and you can see if your binding is working correctly: Now that the services are bound, we can either use rpc to call a function directly in the other worker, or use fetch to call one of its endpoints. // using rpc await c.env.WORKER_EMAIL.send(email, firstName); // using fetch await c.env.WORKER_EMAIL.fetch( new Request("https://worker-email/send", { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ email, firstName }) }) ); Execution patterns Since the calls remain within the same thread, invoking the new email worker is handled similarly to managing an asynchronous task within a single worker. When using separate Cloudflare Workers, tasks become asynchronous. The await keyword ensures that the worker completes its operation before the current thread terminates. Services can be invoked using sequential logic app.post("/api/marathon-sequential", async (c) => { const { firstName, lastName, email, address, distance } = await c.req.json(); await insertData( firstName, lastName, email, address, distance, c.env.DATABASE_URL ); // using rpc await c.env.WORKER_EMAIL.send(email, firstName); return c.text("Thanks for registering for our Marathon", 200); }); To improve response time, waitUntil() can be used to insert the data and send the email in parallel, similar to the example with a single worker. app.post("/api/marathon-wait-until", async (c) => { const { firstName, lastName, email, address, distance } = await c.req.json(); //using rpc c.executionCtx.waitUntil(c.env.WORKER_EMAIL.send(email, firstName)); await insertData( firstName, lastName, email, address, distance, c.env.DATABASE_URL ); return c.text("Thanks for registering for our Marathon", 200); }); Fiberplane Studio nicely outlines the invocation of the different workers in both cases. Excursion: Ensure Type Safety When using TypeScript, ensuring type conformity across workers is needed. This involves defining a type or interface for the service to which you bind. However, this approach can introduce several challenges, as discussed in the Cloudflare forum. If both workers are in a monorepo, they can share the same type definition file, simplifying type management. For example: export interface WorkerEmail { fetch(request: Request): Promise; send(email: string, firstName: string): Promise; } The Sign-up Worker can call the Mail Worker using the shared WorkerEmail int
In Part 1, we explored strategies for managing asynchronous tasks within a single Cloudflare Worker, focusing on sequential execution and parallel background processing with waitUntil()
.
While this approach works well for simple cases, complex systems require robust error handling, retry mechanisms, and separation of concerns. These patterns become more manageable when we distribute asynchronous tasks across multiple workers.
In Part 2, we will:
- Distribute tasks across workers using Service Bindings
- Then explore how to achieve enhanced fault tolerance with Cloudflare Queues.
Let's start by breaking down the Marathon Sign-up Worker from Part 1!
1. Cloudflare Service Bindings: Service Decomposition
In our Marathon Registration API, we have two main tasks: storing the runner's data in a database and sending a confirmation email.
To decouple these tasks, we can create a separate Email Worker that handles sending emails.
When using Cloudflare’s Service Bindings, workers can call each other within the same thread using fetch or rpc.
This means there is no additional latency compared to a single-worker approach, and the performance remains consistent.
You can find the code examples to the blog here
Why Opt for Separation?
Transitioning to a (micro)service architecture with Cloudflare Workers provides several key benefits:
- Independently deployable: Each service can be updated and deployed without affecting others.
- Reusibility: A single worker can serve multiple APIs or applications, enhancing reusability. Imagine we have an additional newsletter subscription service that also requires sending emails.
- Improved security: Services can operate without a public interface, reducing exposure and potential attack surfaces. The email worker can be now kept private and only accessed within the Cloudflare network.
Setting Up Service Bindings
Bindings can be set up quite easily in the wrangler.toml
file.
services = [
{ binding = "WORKER_EMAIL", service = "worker-email" }
]
The only thing you have to make sure of is that the service name matches the name of the worker you want to bind to.
The worker that is being bound (in our case, the worker that sends the email) needs to extend the WorkerEntryPoint
class.
export class WorkerEmail extends WorkerEntrypoint {
// Implement worker functions here
}
Start both workers locally (in separate terminals), and you can see if your binding is working correctly:
Now that the services are bound, we can either use rpc to call a function directly in the other worker, or use fetch to call one of its endpoints.
// using rpc
await c.env.WORKER_EMAIL.send(email, firstName);
// using fetch
await c.env.WORKER_EMAIL.fetch(
new Request("https://worker-email/send", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ email, firstName })
})
);
Execution patterns
Since the calls remain within the same thread, invoking the new email worker is handled similarly to managing an asynchronous task within a single worker.
When using separate Cloudflare Workers, tasks become asynchronous. The await
keyword ensures that the worker completes its operation before the current thread terminates.
Services can be invoked using sequential logic
app.post("/api/marathon-sequential", async (c) => {
const { firstName, lastName, email, address, distance } = await c.req.json();
await insertData(
firstName,
lastName,
email,
address,
distance,
c.env.DATABASE_URL
);
// using rpc
await c.env.WORKER_EMAIL.send(email, firstName);
return c.text("Thanks for registering for our Marathon", 200);
});
To improve response time, waitUntil()
can be used to insert the data and send the email in parallel, similar to the example with a single worker.
app.post("/api/marathon-wait-until", async (c) => {
const { firstName, lastName, email, address, distance } = await c.req.json();
//using rpc
c.executionCtx.waitUntil(c.env.WORKER_EMAIL.send(email, firstName));
await insertData(
firstName,
lastName,
email,
address,
distance,
c.env.DATABASE_URL
);
return c.text("Thanks for registering for our Marathon", 200);
});
Fiberplane Studio nicely outlines the invocation of the different workers in both cases.
Excursion: Ensure Type Safety
When using TypeScript, ensuring type conformity across workers is needed. This involves defining a type
or interface
for the service to which you bind.
However, this approach can introduce several challenges, as discussed in the Cloudflare forum.
If both workers are in a monorepo, they can share the same type definition file, simplifying type management. For example:
export interface WorkerEmail {
fetch(request: Request): Promise<Response>;
send(email: string, firstName: string): Promise<Response>;
}
The Sign-up Worker can call the Mail Worker using the shared WorkerEmail interface, ensuring type safety and preventing runtime errors from mismatched method signatures:
import type { WorkerEmail } from "../../worker-email-types";
type Bindings = {
WORKER_EMAIL: WorkerEmail;
};
If you're not working in a monorepo, and your workers are in different repositories, it's more difficult for them to share the same type definition file.
Kent C. Dodds wrote about fully typed web apps discussing the end-to end type safety from a Database to a WebUI.
He talks about boundaries and how to address them. If our workers are not in the same repo we have another boundary that we need to address.
In this scenario, things become a bit more challenging. You have two options:
Define the Interface Separately in Both Workers
This approach can lead to duplication, as the interface would need to be maintained independently in multiple workers.Publish a Shared Type Definition Package
A better solution is to publish the type definitions as an NPM package (e.g., worker-email-types) and install it in both projects.
This approach ensures consistency while keeping the type definitions centralized.
2. Cloudflare Queues: Enhacing error handling
So far we can use concurrent execution and background tasks (in a single worker and in seperate workers). This way our user won't be bothered if the email worker fails.
But how can we implement a more robust error handling strategy if the email worker fails ensuring emails are sent?
Implementing a retry mechanism is a common approach to handle such scenarios.
While it is complex to implement and handle retries in the business logic of the worker itself, we can leverage a message broker to simplify the process.
Cloudflare provides Queues to facilitate this approach.
Let's first modify our architecture to include a queue between the sign-up (Producer) and email (Consumer). Afterwards we will explore how to handle retries and failed messages in the consumer.
Workers can produce messages to a queue and consume messages from a queue.
The producer is unaware of the consumer, which effectively enables a fire-and-forget implementation for the worker.
The message broker is responsible for handling and delivering the messages.
Under the hood, a Cloudflare Queue leverages Durable Objects.
Fiberplane has a great hands-on introduction to building with Durable Objects here.
Why Opt for Queues?
Decoupling your services with a message broker like Cloudflare Queues offers several advantages:
- Enhanced Error Handling: Queues simplify retries and provide mechanisms like dead-letter queues for failed messages, allowing granular control over error management.
- Scalability: Producers and consumers are decoupled, enabling them to scale independently based on demand.
- Improved Reliability: Queues act as a buffer between services, ensuring tasks are not lost even if a worker experiences downtime.
- Event-Driven Integration: External services can interact with your internal Cloudflare Workers via queues, facilitating seamless, serverless integration.
- Efficient Processing: Strategies like batching and buffering reduce the frequency of worker invocations, optimizing performance and costs.
Development Considerations for Queues
When working with Cloudflare Queues during local development, there are some current limitations to be aware of:
-
Local Testing of Queues:
- Currently, it’s not possible to run two independent workers locally and connect them to the same queue.
- Queues cannot be run remotely while connecting local workers to them.
-
Workarounds for Testing:
- One option is to combine producer and consumer logic within a single worker during local testing. However, this approach does not simulate the separation of services in a production environment.
- Alternatively, testing can be conducted by deploying workers to the Cloudflare network, either in a staging or production environment.
-
Trade-offs with Queues:
- Pro: Decoupling of services, enhanced error handling, and scalability
- Con: Increased complexity in architecture, harder to debug and monitor the full call chain compared to direct service bindings
Setting up the producer: Sending Messages to the Queue
In the worker that sends the message, you can add a binding to the wrangler.toml
file
[[queues.producers]]
queue = "registration-queue"
binding = "REGISTRATION_QUEUE"
Once that binding is set up, sending messages to the queue is straightforward.
await c.env.REGISTRATION_QUEUE.send(messagePayload);
Setting up the consumer: Handling Messages from the Queue
Consuming messages from a queue can be done in two ways.
The default method is for the queue to push messages to a consumer worker.
This works well for a serverless approach and ensures the worker is only invoked when there is a message in the queue.
However, queues also support polling, which can be useful for integrating with services outside the Cloudflare ecosystem, but is not part of this post.
To set up a consumer worker, the consumer binding must be defined in the wrangler.toml
file.
[[queues.consumers]]
queue = "registration-queue"
The consumer worker must implement a queue
method to handle incoming messages:
// Queue consumer
async queue(batch: MessageBatch): Promise<void> {
for (const msg of batch.messages) {
console.log("Received", msg.body);
const { email, firstName } = JSON.parse(msg.body as string);
await this.send(email, firstName);
}
}
In the example above, the consumer worker receives a batch of messages and processes them individually.
The wrangler.toml
defines the batching strategy for a consumer:
[[queues.consumers]]
queue = "registration-queue"
max_batch_size = 10 # optional: defaults to 10
max_batch_timeout = 60 # optional: defaults to 5 seconds
The max_batch_size
and max_batch_timeout
represent "racing" conditions. That is to say, the batch is sent as soon as one of the conditions is met.
In the example, the message batch is sent either when 10 messages are in the queue or after 60 seconds. Nothing will be pushed if the queue is empty.
Error Handling and Retries
Now let's add some error handling and retries for the email worker (consumer).
It is handy because Cloudflare queues by default comes with a retry mechanism.
By default, the Cloudflare queue tries to deliver a message three times. The queue manages the left over retries and if the message is not delivered after three attempts, it is marked as failed.
You can modify this behavior in the consumer, where you can define the maximum number of retries and also set up a dead-letter queue for all failed messages.
[[queues.consumers]]
queue = "registration-queue"
max_retries = 10
dead_letter_queue = "failed-registration-message-queue"
Message Batchs, Acknowledgements and Retries: Building idempotent consumers
Above we have set up a message batch for our email worker and we have defined a retry and failing strategy.
What happens now if one message in the batch fails? If we don't specify individual acknowledgements, the entire batch will be marked as failed and all messages will be retried.
This means we also sent out the successful emails again. This is not ideal as we might Spam our runners.
In order to achieve idempotent consumers we need to handle messages in the batch individually.
Individual messages within the batch must be acknowledged.
async queue(batch: MessageBatch): Promise<void> {
for (const message of batch.messages) {
console.log("Received", message.body);
const { email, firstName } = JSON.parse(message.body as string);
await this.send(email, firstName);
await message.ack();
}
}
With the single acknowledgment of messages, only failed messages will be retried in a batch.
Retry is the process of negatively acknowledging (nacking) a message and putting it back in the queue.
While we can acknowledge messages individually, we can also negatively acknowledge them individually and specify a delay before retrying.
msg.retry({ delaySeconds: 1000 });
This allows for defining task-specific retry strategies. For example, when an API signals that it is receiving too many requests, a delay time can be defined before retrying.
Further each messages has an attempt property and the ability to define delaySeconds
in the retry method, a backoff strategy can also be defined.
Execution pattern
Since the producer and consumer are decoupled, the producer can send messages to the queue without waiting for the consumer to process them.
The decoupled asynchronous tasks are not remaining in the same thread.
The image above outlines the call chain from the sign up worker in Fiberplane studio.
app.post("/api/marathon-producer-queue", async (c) => {
const { firstName, lastName, email, address, distance } = await c.req.json();
const messagePayload = {
firstName: firstName,
email: email
};
const messageBody = JSON.stringify(messagePayload);
//produce a message for the Queue
console.log("Sending message to queue");
await c.env.REGISTRATION_QUEUE.send(messageBody);
await insertData(
firstName,
lastName,
email,
address,
distance,
c.env.DATABASE_URL
);
return c.text("Thanks for registering for our Marathon", 200);
});
In order to get a full call chain among multiple threads in different workers including the queue, one need to set up distributed tracing.
This is can get quite complex and is not covered in this post.
Conclusion
In this post we handled asynchronous tasks in seperate Cloudflare workers.
The post outlined two different ways of invoking the workers:
- Service Bindings: Decoupling asynchronous tasks (sign-up and email sending) into separate workers for better modularity, security, and deployability.
- Cloudflare Queues: Implementing robust error handling, retries, and batching to enhance reliability and scalability.
However, what if state needs to be maintained across the business process?
In a serverless environment, how can a failing email be related back to the person who signed up or how can the database entry be checked?
If the person needs to be removed, should this be handled through data and manual intervention?
This challenge becomes especially critical when there are not just two workers, but multiple working in tandem.
In Part 3, Cloudflare Workflows will be explored as a solution for state management across distributed Cloudflare services.