Handling Backpressure

Learn how to handle backpressure by pushing data into a steam as it's needed, rather than as it's ready.

When the server streams data faster than the client can process it, excess data will queue up in the client's memory. This issue is called backpressure, and it can lead to memory overflow errors, or data loss when the client's memory reaches capacity.

In this recipe, you create an API endpoint that:

  • Simulates backpressure by generating data faster than a stream can read it
  • Handles backpressure by pushing data into a stream as it's needed, rather than as it's ready

Jump to the full example to see the full recipe.

In this case, it will be a generator function that yields a new integer indefinitely

generator
// A generator that will yield positive integers
async function* integers() {
  let i = 1;
  while (true) {
    console.log(`yielding ${i}`);
    yield i++;
 
    await sleep(100);
  }
}
// Add a custom sleep function to create
// a delay that simulates how slow some
// Function responses are.
function sleep(ms: number) {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

Next, create a method that adds the generator function to a ReadableStream. Using the pull handler, you can prevent new data being added from the generator to the stream if no more data is being requested

Pull data
// Wraps a generator into a ReadableStream
function createStream(iterator::AsyncGenerator<number, void, unknown>) {
  return new ReadableStream({
    // The pull method controls what happens
    // when data is added to a stream.
    async pull(controller) {
      const { value, done } = await iterator.next();
      // done == true when the generator will yield
      // no more new values. If that's the case,
      // close the stream.
      if (done) {
        controller.close();
      } else {
        controller.enqueue(value);
      }
    },
  });
}

Finally, iterate through a loop and read data from the stream. Without the code that checks if the generator is done, the stream would continue taking values from integers() indefinitely, filling up memory. Because the code checks if the generator is done, the stream closes after you iterator as many times as loopCount:

iterate-values
// Demonstrate handling backpressure
async function backpressureDemo() {
  // Set up a stream of integers
  const stream = createStream(integers());
 
  // Read values from the stream
  const reader = stream.getReader();
  const loopCount = 5;
  // Read as much data as you want
  for (let i = 0; i < loopCount; i++) {
    // Get the newest value added to the stream
    const { value } = await reader.read();
    console.log(`Stream value: ${value}`);
    await sleep(1000);
  }
}

The final file, including the route handler function, will look like this:

app/api/handle-backpressure/route.ts
// A generator that will yield positive integers
async function* integers() {
  let i = 1;
  while (true) {
    console.log(`yielding ${i}`);
    yield i++;
 
    await sleep(100);
  }
}
// Add a custom sleep function to create
// a delay that simulates how slow some
// Function responses are.
function sleep(ms: number) {
  return new Promise((resolve) => setTimeout(resolve, ms));
}
// Wraps a generator into a ReadableStream
function createStream(iterator: AsyncGenerator<number, void, unknown>) {
  return new ReadableStream({
    // The pull method controls what happens
    // when data is added to a stream.
    async pull(controller) {
      const { value, done } = await iterator.next();
      // done == true when the generator will yield
      // no more new values. If that's the case,
      // close the stream.
      if (done) {
        controller.close();
      } else {
        controller.enqueue(value);
      }
    },
  });
}
// Demonstrate handling backpressure
async function backpressureDemo() {
  // Set up a stream of integers
  const stream = createStream(integers());
  // Read values from the stream
  const reader = stream.getReader();
  const loopCount = 5;
  // Read as much data as you want
  for (let i = 0; i < loopCount; i++) {
    // Get the newest value added to the stream
    const { value } = await reader.read();
    console.log(`Stream value: ${value}`);
    await sleep(1000);
  }
}
 
export async function GET() {
  backpressureDemo();
  return new Response('Check your console to see the result!');
}

If you're not using a framework, you must either add "type": "module" to your package.json or change your JavaScript Functions' file extensions from .js to .mjs

Last updated on September 20, 2024