diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index d2e3d8fc4e9..3b015663e07 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -219,6 +219,49 @@ function onwrite(stream, er) { stream.emit('drain'); }); } + + // if there's something in the buffer waiting, then process it + // It would be nice if there were TCO in JS, and we could just + // shift the top off the buffer and _write that, but that approach + // causes RangeErrors when you have a very large number of very + // small writes, and is not very efficient otherwise. + if (!state.bufferProcessing && state.buffer.length) { + state.bufferProcessing = true; + + for (var c = 0; c < state.buffer.length; c++) { + var chunkCb = state.buffer[c]; + var chunk = chunkCb[0]; + cb = chunkCb[1]; + + if (false === state.decodeStrings) + l = chunk[0].length; + else + l = chunk.length; + + state.writelen = l; + state.writecb = cb; + state.writechunk = chunk; + state.writing = true; + state.sync = true; + stream._write(chunk, state.onwrite); + state.sync = false; + + // if we didn't call the onwrite immediately, then + // it means that we need to wait until it does. + // also, that means that the chunk and cb are currently + // being processed, so move the buffer counter past them. + if (state.writing) { + c++; + break; + } + } + + state.bufferProcessing = false; + if (c < state.buffer.length) + state.buffer = state.buffer.slice(c); + else + state.buffer.length = 0; + } } Writable.prototype._write = function(chunk, cb) {