mirror of https://github.com/nodejs/node.git
doc: Provide 2 examples of SimpleProtocol parser
The first example uses Readable, and shows the use of readable.unshift(). The second uses the Transform class, showing that it's much simpler in this case.pull/24507/merge
parent
88644eaa2d
commit
4926ffd14b
|
@ -84,7 +84,7 @@ method.
|
||||||
A `Readable Stream` has the following methods, members, and events.
|
A `Readable Stream` has the following methods, members, and events.
|
||||||
|
|
||||||
Note that `stream.Readable` is an abstract class designed to be
|
Note that `stream.Readable` is an abstract class designed to be
|
||||||
extended with an underlying implementation of the `_read(size, cb)`
|
extended with an underlying implementation of the `_read(size)`
|
||||||
method. (See below.)
|
method. (See below.)
|
||||||
|
|
||||||
### new stream.Readable([options])
|
### new stream.Readable([options])
|
||||||
|
@ -105,32 +105,39 @@ In classes that extend the Readable class, make sure to call the
|
||||||
constructor so that the buffering settings can be properly
|
constructor so that the buffering settings can be properly
|
||||||
initialized.
|
initialized.
|
||||||
|
|
||||||
### readable.\_read(size, callback)
|
### readable.\_read(size)
|
||||||
|
|
||||||
* `size` {Number} Number of bytes to read asynchronously
|
* `size` {Number} Number of bytes to read asynchronously
|
||||||
* `callback` {Function} Called with an error or with data
|
* `callback` {Function} Called with an error or with data
|
||||||
|
|
||||||
All Readable stream implementations must provide a `_read` method
|
Note: **This function should NOT be called directly.** It should be
|
||||||
to fetch data from the underlying resource.
|
|
||||||
|
|
||||||
Note: **This function MUST NOT be called directly.** It should be
|
|
||||||
implemented by child classes, and called by the internal Readable
|
implemented by child classes, and called by the internal Readable
|
||||||
class methods only.
|
class methods only.
|
||||||
|
|
||||||
Call the callback using the standard `callback(error, data)` pattern.
|
All Readable stream implementations must provide a `_read` method
|
||||||
When no more data can be fetched, call `callback(null, null)` to
|
to fetch data from the underlying resource.
|
||||||
signal the EOF.
|
|
||||||
|
|
||||||
This method is prefixed with an underscore because it is internal to
|
This method is prefixed with an underscore because it is internal to
|
||||||
the class that defines it, and should not be called directly by user
|
the class that defines it, and should not be called directly by user
|
||||||
programs. However, you **are** expected to override this method in
|
programs. However, you **are** expected to override this method in
|
||||||
your own extension classes.
|
your own extension classes.
|
||||||
|
|
||||||
|
When data is available, put it into the read queue by calling
|
||||||
|
`readable.push(chunk)`. If `push` returns false, then you should stop
|
||||||
|
reading. When `_read` is called again, you should start pushing more
|
||||||
|
data.
|
||||||
|
|
||||||
### readable.push(chunk)
|
### readable.push(chunk)
|
||||||
|
|
||||||
* `chunk` {Buffer | null | String} Chunk of data to push into the read queue
|
* `chunk` {Buffer | null | String} Chunk of data to push into the read queue
|
||||||
* return {Boolean} Whether or not more pushes should be performed
|
* return {Boolean} Whether or not more pushes should be performed
|
||||||
|
|
||||||
|
Note: **This function should be called by Readable implementors, NOT
|
||||||
|
by consumers of Readable subclasses.** The `_read()` function will not
|
||||||
|
be called again until at least one `push(chunk)` call is made. If no
|
||||||
|
data is available, then you MAY call `push('')` (an empty string) to
|
||||||
|
allow a future `_read` call, without adding any data to the queue.
|
||||||
|
|
||||||
The `Readable` class works by putting data into a read queue to be
|
The `Readable` class works by putting data into a read queue to be
|
||||||
pulled out later by calling the `read()` method when the `'readable'`
|
pulled out later by calling the `read()` method when the `'readable'`
|
||||||
event fires.
|
event fires.
|
||||||
|
@ -167,6 +174,115 @@ stream._read = function(size, cb) {
|
||||||
};
|
};
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### readable.unshift(chunk)
|
||||||
|
|
||||||
|
* `chunk` {Buffer | null | String} Chunk of data to unshift onto the read queue
|
||||||
|
* return {Boolean} Whether or not more pushes should be performed
|
||||||
|
|
||||||
|
This is the corollary of `readable.push(chunk)`. Rather than putting
|
||||||
|
the data at the *end* of the read queue, it puts it at the *front* of
|
||||||
|
the read queue.
|
||||||
|
|
||||||
|
This is useful in certain use-cases where a stream is being consumed
|
||||||
|
by a parser, which needs to "un-consume" some data that it has
|
||||||
|
optimistically pulled out of the source.
|
||||||
|
|
||||||
|
```javascript
|
||||||
|
// A parser for a simple data protocol.
|
||||||
|
// The "header" is a JSON object, followed by 2 \n characters, and
|
||||||
|
// then a message body.
|
||||||
|
//
|
||||||
|
// Note: This can be done more simply as a Transform stream. See below.
|
||||||
|
|
||||||
|
function SimpleProtocol(source, options) {
|
||||||
|
if (!(this instanceof SimpleProtocol))
|
||||||
|
return new SimpleProtocol(options);
|
||||||
|
|
||||||
|
Readable.call(this, options);
|
||||||
|
this._inBody = false;
|
||||||
|
this._sawFirstCr = false;
|
||||||
|
|
||||||
|
// source is a readable stream, such as a socket or file
|
||||||
|
this._source = source;
|
||||||
|
|
||||||
|
var self = this;
|
||||||
|
source.on('end', function() {
|
||||||
|
self.push(null);
|
||||||
|
});
|
||||||
|
|
||||||
|
// give it a kick whenever the source is readable
|
||||||
|
// read(0) will not consume any bytes
|
||||||
|
source.on('readable', function() {
|
||||||
|
self.read(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
this._rawHeader = [];
|
||||||
|
this.header = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
SimpleProtocol.prototype = Object.create(
|
||||||
|
Readable.prototype, { constructor: { value: SimpleProtocol }});
|
||||||
|
|
||||||
|
SimpleProtocol.prototype._read = function(n) {
|
||||||
|
if (!this._inBody) {
|
||||||
|
var chunk = this._source.read();
|
||||||
|
|
||||||
|
// if the source doesn't have data, we don't have data yet.
|
||||||
|
if (chunk === null)
|
||||||
|
return this.push('');
|
||||||
|
|
||||||
|
// check if the chunk has a \n\n
|
||||||
|
var split = -1;
|
||||||
|
for (var i = 0; i < chunk.length; i++) {
|
||||||
|
if (chunk[i] === 10) { // '\n'
|
||||||
|
if (this._sawFirstCr) {
|
||||||
|
split = i;
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
this._sawFirstCr = true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
this._sawFirstCr = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (split === -1) {
|
||||||
|
// still waiting for the \n\n
|
||||||
|
// stash the chunk, and try again.
|
||||||
|
this._rawHeader.push(chunk);
|
||||||
|
this.push('');
|
||||||
|
} else {
|
||||||
|
this._inBody = true;
|
||||||
|
var h = chunk.slice(0, split);
|
||||||
|
this._rawHeader.push(h);
|
||||||
|
var header = Buffer.concat(this._rawHeader).toString();
|
||||||
|
try {
|
||||||
|
this.header = JSON.parse(header);
|
||||||
|
} catch (er) {
|
||||||
|
this.emit('error', new Error('invalid simple protocol data'));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// now, because we got some extra data, unshift the rest
|
||||||
|
// back into the read queue so that our consumer will see it.
|
||||||
|
this.unshift(b);
|
||||||
|
|
||||||
|
// and let them know that we are done parsing the header.
|
||||||
|
this.emit('header', this.header);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// from there on, just provide the data to our consumer.
|
||||||
|
// careful not to push(null), since that would indicate EOF.
|
||||||
|
var chunk = this._source.read();
|
||||||
|
if (chunk) this.push(chunk);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Usage:
|
||||||
|
var parser = new SimpleProtocol(source);
|
||||||
|
// Now parser is a readable stream that will emit 'header'
|
||||||
|
// with the parsed header data.
|
||||||
|
```
|
||||||
|
|
||||||
### readable.wrap(stream)
|
### readable.wrap(stream)
|
||||||
|
|
||||||
* `stream` {Stream} An "old style" readable stream
|
* `stream` {Stream} An "old style" readable stream
|
||||||
|
@ -232,6 +348,8 @@ constructor.
|
||||||
* `size` {Number | null} Optional number of bytes to read.
|
* `size` {Number | null} Optional number of bytes to read.
|
||||||
* Return: {Buffer | String | null}
|
* Return: {Buffer | String | null}
|
||||||
|
|
||||||
|
Note: **This function SHOULD be called by Readable stream users.**
|
||||||
|
|
||||||
Call this method to consume data once the `'readable'` event is
|
Call this method to consume data once the `'readable'` event is
|
||||||
emitted.
|
emitted.
|
||||||
|
|
||||||
|
@ -243,8 +361,8 @@ If there is no data to consume, or if there are fewer bytes in the
|
||||||
internal buffer than the `size` argument, then `null` is returned, and
|
internal buffer than the `size` argument, then `null` is returned, and
|
||||||
a future `'readable'` event will be emitted when more is available.
|
a future `'readable'` event will be emitted when more is available.
|
||||||
|
|
||||||
Note that calling `stream.read(0)` will always return `null`, and will
|
Calling `stream.read(0)` will always return `null`, and will trigger a
|
||||||
trigger a refresh of the internal buffer, but otherwise be a no-op.
|
refresh of the internal buffer, but otherwise be a no-op.
|
||||||
|
|
||||||
### readable.pipe(destination, [options])
|
### readable.pipe(destination, [options])
|
||||||
|
|
||||||
|
@ -416,14 +534,14 @@ A "duplex" stream is one that is both Readable and Writable, such as a
|
||||||
TCP socket connection.
|
TCP socket connection.
|
||||||
|
|
||||||
Note that `stream.Duplex` is an abstract class designed to be
|
Note that `stream.Duplex` is an abstract class designed to be
|
||||||
extended with an underlying implementation of the `_read(size, cb)`
|
extended with an underlying implementation of the `_read(size)`
|
||||||
and `_write(chunk, callback)` methods as you would with a Readable or
|
and `_write(chunk, callback)` methods as you would with a Readable or
|
||||||
Writable stream class.
|
Writable stream class.
|
||||||
|
|
||||||
Since JavaScript doesn't have multiple prototypal inheritance, this
|
Since JavaScript doesn't have multiple prototypal inheritance, this
|
||||||
class prototypally inherits from Readable, and then parasitically from
|
class prototypally inherits from Readable, and then parasitically from
|
||||||
Writable. It is thus up to the user to implement both the lowlevel
|
Writable. It is thus up to the user to implement both the lowlevel
|
||||||
`_read(n,cb)` method as well as the lowlevel `_write(chunk,cb)` method
|
`_read(n)` method as well as the lowlevel `_write(chunk,cb)` method
|
||||||
on extension duplex classes.
|
on extension duplex classes.
|
||||||
|
|
||||||
### new stream.Duplex(options)
|
### new stream.Duplex(options)
|
||||||
|
@ -471,13 +589,13 @@ initialized.
|
||||||
* `callback` {Function} Call this function (optionally with an error
|
* `callback` {Function} Call this function (optionally with an error
|
||||||
argument) when you are done processing the supplied chunk.
|
argument) when you are done processing the supplied chunk.
|
||||||
|
|
||||||
All Transform stream implementations must provide a `_transform`
|
|
||||||
method to accept input and produce output.
|
|
||||||
|
|
||||||
Note: **This function MUST NOT be called directly.** It should be
|
Note: **This function MUST NOT be called directly.** It should be
|
||||||
implemented by child classes, and called by the internal Transform
|
implemented by child classes, and called by the internal Transform
|
||||||
class methods only.
|
class methods only.
|
||||||
|
|
||||||
|
All Transform stream implementations must provide a `_transform`
|
||||||
|
method to accept input and produce output.
|
||||||
|
|
||||||
`_transform` should do whatever has to be done in this specific
|
`_transform` should do whatever has to be done in this specific
|
||||||
Transform class, to handle the bytes being written, and pass them off
|
Transform class, to handle the bytes being written, and pass them off
|
||||||
to the readable portion of the interface. Do asynchronous I/O,
|
to the readable portion of the interface. Do asynchronous I/O,
|
||||||
|
@ -521,6 +639,82 @@ the class that defines it, and should not be called directly by user
|
||||||
programs. However, you **are** expected to override this method in
|
programs. However, you **are** expected to override this method in
|
||||||
your own extension classes.
|
your own extension classes.
|
||||||
|
|
||||||
|
### Example: `SimpleProtocol` parser
|
||||||
|
|
||||||
|
The example above of a simple protocol parser can be implemented much
|
||||||
|
more simply by using the higher level `Transform` stream class.
|
||||||
|
|
||||||
|
In this example, rather than providing the input as an argument, it
|
||||||
|
would be piped into the parser, which is a more idiomatic Node stream
|
||||||
|
approach.
|
||||||
|
|
||||||
|
```javascript
|
||||||
|
function SimpleProtocol(options) {
|
||||||
|
if (!(this instanceof SimpleProtocol))
|
||||||
|
return new SimpleProtocol(options);
|
||||||
|
|
||||||
|
Transform.call(this, options);
|
||||||
|
this._inBody = false;
|
||||||
|
this._sawFirstCr = false;
|
||||||
|
this._rawHeader = [];
|
||||||
|
this.header = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
SimpleProtocol.prototype = Object.create(
|
||||||
|
Transform.prototype, { constructor: { value: SimpleProtocol }});
|
||||||
|
|
||||||
|
SimpleProtocol.prototype._transform = function(chunk, output, done) {
|
||||||
|
if (!this._inBody) {
|
||||||
|
// check if the chunk has a \n\n
|
||||||
|
var split = -1;
|
||||||
|
for (var i = 0; i < chunk.length; i++) {
|
||||||
|
if (chunk[i] === 10) { // '\n'
|
||||||
|
if (this._sawFirstCr) {
|
||||||
|
split = i;
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
this._sawFirstCr = true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
this._sawFirstCr = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (split === -1) {
|
||||||
|
// still waiting for the \n\n
|
||||||
|
// stash the chunk, and try again.
|
||||||
|
this._rawHeader.push(chunk);
|
||||||
|
} else {
|
||||||
|
this._inBody = true;
|
||||||
|
var h = chunk.slice(0, split);
|
||||||
|
this._rawHeader.push(h);
|
||||||
|
var header = Buffer.concat(this._rawHeader).toString();
|
||||||
|
try {
|
||||||
|
this.header = JSON.parse(header);
|
||||||
|
} catch (er) {
|
||||||
|
this.emit('error', new Error('invalid simple protocol data'));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// and let them know that we are done parsing the header.
|
||||||
|
this.emit('header', this.header);
|
||||||
|
|
||||||
|
// now, because we got some extra data, emit this first.
|
||||||
|
output(b);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// from there on, just provide the data to our consumer as-is.
|
||||||
|
output(b);
|
||||||
|
}
|
||||||
|
done();
|
||||||
|
};
|
||||||
|
|
||||||
|
var parser = new SimpleProtocol();
|
||||||
|
source.pipe(parser)
|
||||||
|
|
||||||
|
// Now parser is a readable stream that will emit 'header'
|
||||||
|
// with the parsed header data.
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
## Class: stream.PassThrough
|
## Class: stream.PassThrough
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue