I recently encountered an issue in a Node.js v12 service where output streamed from a child process is intermittently missing from downstream processing. The pipeline streams the data into a ZIP archive and uploads it to S3. The service is deployed on AWS Lambda.

In this post, I describe the problem, walk through the debugging process, and evaluate several solutions.

Data pipeline

At a high level, the data pipeline inside the Lambda function looks like:


              .-------------------.            .---------------.            .----.
  ----------->| custom executable |----------->| node-archiver |----------->| S3 |
     event    '-------------------'   stdout   '---------------'    pipe    '----'

The custom executable accepts parameters from the Lambda event and outputs data on its stdout stream. The data is compressed into a ZIP file using node-archiver and piped to a stream that uploads to S3.

The project has integration tests that run the pipeline with different parameters and verify the contents of the generated ZIP files. When the problem occurs, the ZIP files are valid archives, but the single compressed file inside is either empty or is missing the first chunk of 65536 bytes.

I confirmed this data loss in a Python REPL by comparing the data from successful and failed runs:

>>> with open('good.data', 'rb') as gf, open('bad.data', 'rb') as bf:
...     good = gf.read()
...     bad = bf.read()
...
>>> len(bad)
63532
>>> len(good)
129068
>>> bad in good
True
>>> good.find(bad)
65536

In this example, the bad data is missing the first 65536 bytes, but is otherwise correct.

Source code

This section includes distilled code samples to show how the data pipeline is constructed. The actual project code is structured very differently and includes configuration and error handling.

Create a stream to upload to S3

The first step initializes an upload to an S3 bucket. The output is a writeable stream and a Promise that resolves when the upload completes:

import AWS from aws-sdk;
import stream from stream;

const s3 = new AWS.S3();

const bucketName = 'my-bucket';
const key = 'my-key';
const uploadStream = new stream.PassThrough();

const params = {
    Bucket: bucketName,
    Key: key,
    Body: uploadStream,
};
const uploadPromise = s3.upload(params).promise();

Run the custom executable

Next, we run the custom executable and get its standard output stream:

import { spawn } from 'child_process';

const args = [...];
const myProcess = spawn('./myExecutable', args);
const myProcessStdout = myProcess.stdout;

Create the ZIP file

The last step adds the custom executable’s output to a ZIP file and streams the file to S3.

const archive = archiver('zip');

// Streams from previous steps
const inputStream = myProcessStdout;
const outputStream = uploadStream;

const filename = 'my.data';

// Promise that resolves when archiver is finished writing to the output
// stream
const archivePromise = new Promise((resolve, reject) => {
    outputStream.on('finish', () => {
        resolve();
    });

    outputStream.on('error', (err) => {
        reject(err);
    });

    archive.on('error', (err) => {
        reject(err);
    });
});

archive.pipe(outputStream);
archive.append(inputStream, { name: filename });
archive.finalize();

Wait for asynchronous operations

Once the pipeline is established, we wait for the promises to resolve:

await Promise.all([archivePromise, uploadPromise]);

Debugging

When the pipeline fails, a valid ZIP file is uploaded to S3, but the data inside the archive is incomplete. No failures were observed when skipping the ZIP step, i.e. when directly piping the child_process stdout stream to the S3 upload stream. This suggests that the problem is related to node-archiver.

node-archiver

To figure out what’s happening, I read node-archiver’s source code.

Asynchronous queue

When adding an entry to a ZIP file, node-archiver’s append() function doesn’t immediately perform the operation. Instead, it adds a task to an async queue. On a subsequent tick of the event loop, the task pipes the input stream (the child_process stdout stream) into the output stream (the S3 upload stream).

Stream flowing mode

Running in the debugger revealed that the input stream may already be in “flowing mode” by the time node-archiver calls pipe(); the stream’s readableFlowing property is true. When in flowing mode, a stream’s data is available through the EventEmitter interface.

The Node.js stream documentation describes flowing mode in detail:

All Readable streams begin in paused mode but can be switched to flowing mode in one of the following ways:

  • Adding a ‘data’ event handler.
  • Calling the stream.resume() method.
  • Calling the stream.pipe() method to send the data to a Writable.

In this project, there are no ‘data’ event handlers, and the stream is already flowing by the time stream.pipe() is called. Therefore, something must have called stream.resume().

Race condition

In the debugger, I found that if the child process exits before its output stream is piped, Node.js flushes the stream by calling stream.resume(). This explains how the stream switches to flowing mode.

Therefore, there’s a race condition where the child process might exit and flush its output before node-archiver calls pipe(). In that case, some or all of the data from the child process could be discarded before being added to the ZIP file.

Solutions

To avoid the race condition that may result in lost data, pipe() must be called on the child_process stdout stream in the same tick of the event loop in which the stream was created. This section describes several ways to accomplish this.

Solution 1: zip-stream

Node-archiver builds on several low-level libraries, including zip-stream. Using zip-stream avoids the async queue. pipe() is called immediately when adding a stream to the archive, and no data from the child process is lost.

Pros:

  • Drop-in solution with minor code changes

Cons:

  • The documentation suggests that the module “is meant to be wrapped internally by other modules”
  • Less flexible than node-archiver in case requirements change

Solution 2: Wait for a patched node-archiver release

Node-archiver uses another low-level library, called archiver-utils. A patch to this library fixes the underlying problem. From https://github.com/archiverjs/archiver-utils/pull/17:

normalizeInputSource: always pipe stream through a PassThrough stream

This is needed to guarantee pausing the stream if it’s already flowing, since it will only be processed in a (distant) future iteration of the event loop, and will lose data if already flowing when added to the queue.

The latest version of node-archive as of this writing–4.0.1–doesn’t include the patched archiver-utils; see https://github.com/archiverjs/node-archiver/issues/364.

Pros:

  • Drop-in solution with no code changes

Cons:

  • A patched node-archiver has yet to be released

Solution 3: Local workaround

Until a fixed version of node-archiver is released, we can fix the race condition by adding a workaround directly in our project.

The archiver-utils patch described above fixes the data loss by piping the input stream through a PassThrough stream. The PassThrough stream effectively pauses the data flow until it’s piped to another stream.

The updated ZIP file creation code looks like:

const passThrough = new PassThrough();
inputStream = inputStream.pipe(passThrough);

archive.append(inputStream, { name: filename });
archive.finalize();

Because we spawn the child process and pipe its stdout stream in the same tick of the event loop, no data is lost, even if the child process exits quickly.

Conclusion

This issue highlights the importance of having multiple layers of automated testing and the value of open source software and the open source community.

Intermittent integration test failures uncovered this race condition and provided artifacts that aided debugging. Having access to the source code of Node.js and node-archiver helped explain the root cause of the issue. Discussions on GitHub led to a solution.