node/lib/stream.js

109 lines
2.4 KiB
JavaScript
Raw Normal View History

'use strict';
2012-10-03 06:44:50 +08:00
module.exports = Stream;
const EE = require('events').EventEmitter;
const util = require('util');
2010-10-11 08:21:36 +08:00
util.inherits(Stream, EE);
2012-10-03 06:44:50 +08:00
Stream.Readable = require('_stream_readable');
Stream.Writable = require('_stream_writable');
Stream.Duplex = require('_stream_duplex');
Stream.Transform = require('_stream_transform');
Stream.PassThrough = require('_stream_passthrough');
// Backwards-compat with node 0.4.x
Stream.Stream = Stream;
2010-10-11 08:21:36 +08:00
2012-10-03 06:44:50 +08:00
// old-style streams. Note that the pipe method (the only relevant
// part of this class) is overridden in the Readable class.
function Stream() {
EE.call(this);
2012-10-03 06:44:50 +08:00
}
Stream.prototype.pipe = function(dest, options) {
2010-10-11 08:21:36 +08:00
var source = this;
2010-12-02 12:59:06 +08:00
function ondata(chunk) {
2010-11-24 10:30:52 +08:00
if (dest.writable) {
if (false === dest.write(chunk) && source.pause) {
source.pause();
}
2010-11-24 10:30:52 +08:00
}
}
source.on('data', ondata);
2010-10-11 08:21:36 +08:00
2010-12-02 12:59:06 +08:00
function ondrain() {
if (source.readable && source.resume) {
source.resume();
}
}
2010-12-02 12:59:06 +08:00
dest.on('drain', ondrain);
2010-10-11 08:21:36 +08:00
// If the 'end' option is not supplied, dest.end() will be called when
// source gets the 'end' or 'close' events. Only dest.end() once.
2011-11-11 06:51:16 +08:00
if (!dest._isStdio && (!options || options.end !== false)) {
source.on('end', onend);
source.on('close', onclose);
}
var didOnEnd = false;
function onend() {
if (didOnEnd) return;
didOnEnd = true;
dest.end();
}
function onclose() {
if (didOnEnd) return;
didOnEnd = true;
if (typeof dest.destroy === 'function') dest.destroy();
}
// don't leave dangling pipes when there are errors.
function onerror(er) {
cleanup();
if (EE.listenerCount(this, 'error') === 0) {
throw er; // Unhandled stream error in pipe.
}
2010-10-11 08:21:36 +08:00
}
source.on('error', onerror);
dest.on('error', onerror);
2010-10-11 08:21:36 +08:00
// remove all the event listeners that were added.
function cleanup() {
2011-03-29 02:19:44 +08:00
source.removeListener('data', ondata);
dest.removeListener('drain', ondrain);
2011-03-29 02:19:44 +08:00
source.removeListener('end', onend);
source.removeListener('close', onclose);
source.removeListener('error', onerror);
dest.removeListener('error', onerror);
2011-03-29 02:19:44 +08:00
source.removeListener('end', cleanup);
source.removeListener('close', cleanup);
2011-03-29 02:19:44 +08:00
dest.removeListener('close', cleanup);
}
2011-03-29 02:19:44 +08:00
source.on('end', cleanup);
source.on('close', cleanup);
dest.on('close', cleanup);
2011-02-10 15:02:51 +08:00
dest.emit('pipe', source);
// Allow for unix-like usage: A.pipe(B).pipe(C)
return dest;
2010-10-11 08:21:36 +08:00
};