I recently encountered an issue in a Node.js v12 service where output streamed from a child process is intermittently missing from downstream processing. The pipeline streams the data into a ZIP archive and uploads it to S3. The service is deployed on AWS Lambda.
In this post, I describe the problem, walk through the debugging process, and evaluate several solutions.
Data pipeline
At a high level, the data pipeline inside the Lambda function looks like:
.-------------------. .---------------. .----.
----------->| custom executable |----------->| node-archiver |----------->| S3 |
event '-------------------' stdout '---------------' pipe '----'
The custom executable accepts parameters from the Lambda event and outputs data on its stdout stream. The data is compressed into a ZIP file using node-archiver and piped to a stream that uploads to S3.
The project has integration tests that run the pipeline with different parameters and verify the contents of the generated ZIP files. When the problem occurs, the ZIP files are valid archives, but the single compressed file inside is either empty or is missing the first chunk of 65536 bytes.
I confirmed this data loss in a Python REPL by comparing the data from successful and failed runs:
>>> with open('good.data', 'rb') as gf, open('bad.data', 'rb') as bf:
... good = gf.read()
... bad = bf.read()
...
>>> len(bad)
63532
>>> len(good)
129068
>>> bad in good
True
>>> good.find(bad)
65536
In this example, the bad
data is missing the first 65536
bytes, but is otherwise correct.
Source code
This section includes distilled code samples to show how the data pipeline is constructed. The actual project code is structured very differently and includes configuration and error handling.
Create a stream to upload to S3
The first step initializes an upload to an S3 bucket. The output is a writeable stream and a Promise
that resolves when the upload completes:
import AWS from ‘aws-sdk’;
import stream from ‘stream’;
const s3 = new AWS.S3();
const bucketName = 'my-bucket';
const key = 'my-key';
const uploadStream = new stream.PassThrough();
const params = {
Bucket: bucketName,
Key: key,
Body: uploadStream,
};
const uploadPromise = s3.upload(params).promise();
Run the custom executable
Next, we run the custom executable and get its standard output stream:
import { spawn } from 'child_process';
const args = [...];
const myProcess = spawn('./myExecutable', args);
const myProcessStdout = myProcess.stdout;
Create the ZIP file
The last step adds the custom executable’s output to a ZIP file and streams the file to S3.
const archive = archiver('zip');
// Streams from previous steps
const inputStream = myProcessStdout;
const outputStream = uploadStream;
const filename = 'my.data';
// Promise that resolves when archiver is finished writing to the output
// stream
const archivePromise = new Promise((resolve, reject) => {
outputStream.on('finish', () => {
resolve();
});
outputStream.on('error', (err) => {
reject(err);
});
archive.on('error', (err) => {
reject(err);
});
});
archive.pipe(outputStream);
archive.append(inputStream, { name: filename });
archive.finalize();
Wait for asynchronous operations
Once the pipeline is established, we wait for the promises to resolve:
await Promise.all([archivePromise, uploadPromise]);
Debugging
When the pipeline fails, a valid ZIP file is uploaded to S3, but the data inside
the archive is incomplete. No failures were observed when skipping the ZIP step,
i.e. when directly piping the child_process
stdout stream to the S3 upload
stream. This suggests that the problem is related to node-archiver
.
node-archiver
To figure out what’s happening, I read node-archiver’s source code.
Asynchronous queue
When adding an entry to a ZIP file, node-archiver’s append()
function doesn’t
immediately perform the operation. Instead, it adds a task to an async
queue. On a subsequent tick
of the event loop, the task pipes the input stream (the child_process
stdout
stream) into the output stream (the S3 upload stream).
Stream flowing mode
Running in the debugger revealed that the input stream may already be in
“flowing mode” by the time node-archiver calls pipe()
; the stream’s
readableFlowing
property is true. When in flowing mode, a stream’s data is available through the
EventEmitter
interface.
The Node.js stream documentation describes flowing mode in detail:
All Readable streams begin in paused mode but can be switched to flowing mode in one of the following ways:
- Adding a ‘data’ event handler.
- Calling the stream.resume() method.
- Calling the stream.pipe() method to send the data to a Writable.
In this project, there are no ‘data’ event handlers, and the stream is already
flowing by the time stream.pipe()
is called. Therefore, something must have
called stream.resume()
.
Race condition
In the debugger, I found that if the child process exits before its output
stream is piped, Node.js flushes the stream by calling
stream.resume()
.
This explains how the stream switches to flowing mode.
Therefore, there’s a race condition where the child process might exit and flush
its output before node-archiver calls pipe()
. In that case, some or all of
the data from the child process could be discarded before being added to the ZIP
file.
Solutions
To avoid the race condition that may result in lost data, pipe()
must be
called on the child_process
stdout stream in the same tick of the event loop
in which the stream was created. This section describes several ways to
accomplish this.
Solution 1: zip-stream
Node-archiver builds on several low-level libraries, including
zip-stream. Using zip-stream
avoids the async queue. pipe()
is called immediately when adding a stream to
the archive, and no data from the child process is lost.
Pros:
- Drop-in solution with minor code changes
Cons:
- The documentation suggests that the module “is meant to be wrapped internally by other modules”
- Less flexible than node-archiver in case requirements change
Solution 2: Wait for a patched node-archiver release
Node-archiver uses another low-level library, called archiver-utils. A patch to this library fixes the underlying problem. From https://github.com/archiverjs/archiver-utils/pull/17:
normalizeInputSource: always pipe stream through a PassThrough stream
This is needed to guarantee pausing the stream if it’s already flowing, since it will only be processed in a (distant) future iteration of the event loop, and will lose data if already flowing when added to the queue.
The latest version of node-archive as of this writing–4.0.1–doesn’t include the patched archiver-utils; see https://github.com/archiverjs/node-archiver/issues/364.
Pros:
- Drop-in solution with no code changes
Cons:
- A patched node-archiver has yet to be released
Solution 3: Local workaround
Until a fixed version of node-archiver is released, we can fix the race condition by adding a workaround directly in our project.
The archiver-utils patch described above fixes the data loss by piping the input
stream through a PassThrough
stream. The PassThrough
stream effectively
pauses the data flow until it’s piped to another stream.
The updated ZIP file creation code looks like:
const passThrough = new PassThrough();
inputStream = inputStream.pipe(passThrough);
archive.append(inputStream, { name: filename });
archive.finalize();
Because we spawn the child process and pipe its stdout stream in the same tick of the event loop, no data is lost, even if the child process exits quickly.
Conclusion
This issue highlights the importance of having multiple layers of automated testing and the value of open source software and the open source community.
Intermittent integration test failures uncovered this race condition and provided artifacts that aided debugging. Having access to the source code of Node.js and node-archiver helped explain the root cause of the issue. Discussions on GitHub led to a solution.