Use ReadableStream with Durable Object and Workers
Stream ReadableStream from Durable Objects.
This example demonstrates:
- A Worker receives a request, and forwards it to a Durable Object my-id.
- The Durable Object streams an incrementing number every second, until it receives AbortSignal.
- The Worker reads and logs the values from the stream.
- The Worker then cancels the stream after 5 values.
import { DurableObject } from "cloudflare:workers";
// Send incremented counter value every secondasync function* dataSource(signal) {  let counter = 0;  while (!signal.aborted) {    yield counter++;    await new Promise((resolve) => setTimeout(resolve, 1_000));  }
  console.log("Data source cancelled");}
export class MyDurableObject extends DurableObject {  async fetch(request) {    const abortController = new AbortController();
    const stream = new ReadableStream({      async start(controller) {        if (request.signal.aborted) {          controller.close();          abortController.abort();          return;        }
        for await (const value of dataSource(abortController.signal)) {          controller.enqueue(new TextEncoder().encode(String(value)));        }      },      cancel() {        console.log("Stream cancelled");        abortController.abort();      },    });
    const headers = new Headers({      "Content-Type": "application/octet-stream",    });
    return new Response(stream, { headers });  }}
export default {  async fetch(request, env, ctx) {    const stub = env.MY_DURABLE_OBJECT.getByName("foo");    const response = await stub.fetch(request, { ...request });    if (!response.ok || !response.body) {      return new Response("Invalid response", { status: 500 });    }
    const reader = response.body      .pipeThrough(new TextDecoderStream())      .getReader();
    let data = [];    let i = 0;    while (true) {      // Cancel the stream after 5 messages      if (i > 5) {        reader.cancel();        break;      }      const { value, done } = await reader.read();
      if (value) {        console.log(`Got value ${value}`);        data = [...data, value];      }
      if (done) {        break;      }      i++;    }
    return Response.json(data);  },};import { DurableObject } from 'cloudflare:workers';
// Send incremented counter value every secondasync function* dataSource(signal: AbortSignal) {let counter = 0;while (!signal.aborted) {yield counter++;await new Promise((resolve) => setTimeout(resolve, 1_000));}
    console.log('Data source cancelled');}
export class MyDurableObject extends DurableObject<Env> {    async fetch(request: Request): Promise<Response> {        const abortController = new AbortController();
        const stream = new ReadableStream({            async start(controller) {                if (request.signal.aborted) {                    controller.close();                    abortController.abort();                    return;                }
                for await (const value of dataSource(abortController.signal)) {                    controller.enqueue(new TextEncoder().encode(String(value)));                }            },            cancel() {                console.log('Stream cancelled');                abortController.abort();            },        });
        const headers = new Headers({            'Content-Type': 'application/octet-stream',        });
        return new Response(stream, { headers });    }
}
export default {    async fetch(request, env, ctx): Promise<Response> {        const stub = env.MY_DURABLE_OBJECT.getByName("foo");        const response = await stub.fetch(request, { ...request });        if (!response.ok || !response.body) {            return new Response('Invalid response', { status: 500 });        }
        const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
        let data = [] as string[];        let i = 0;        while (true) {            // Cancel the stream after 5 messages            if (i > 5) {                reader.cancel();                break;            }            const { value, done } = await reader.read();
            if (value) {                console.log(`Got value ${value}`);                data = [...data, value];            }
            if (done) {                break;            }            i++;        }
        return Response.json(data);    },
} satisfies ExportedHandler<Env>;Was this helpful?
- Resources
- API
- New to Cloudflare?
- Directory
- Sponsorships
- Open Source
- Support
- Help Center
- System Status
- Compliance
- GDPR
- Company
- cloudflare.com
- Our team
- Careers
- © 2025 Cloudflare, Inc.
- Privacy Policy
- Terms of Use
- Report Security Issues
- Trademark