From 4926ffd14be0c4c53f63915d5ebf0ada3190b0ae Mon Sep 17 00:00:00 2001 From: isaacs Date: Thu, 28 Feb 2013 15:42:55 -0800 Subject: [PATCH] doc: Provide 2 examples of SimpleProtocol parser The first example uses Readable, and shows the use of readable.unshift(). The second uses the Transform class, showing that it's much simpler in this case. --- doc/api/stream.markdown | 226 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 210 insertions(+), 16 deletions(-) diff --git a/doc/api/stream.markdown b/doc/api/stream.markdown index ebb771dd559..86f45ca9f9f 100644 --- a/doc/api/stream.markdown +++ b/doc/api/stream.markdown @@ -84,7 +84,7 @@ method. A `Readable Stream` has the following methods, members, and events. Note that `stream.Readable` is an abstract class designed to be -extended with an underlying implementation of the `_read(size, cb)` +extended with an underlying implementation of the `_read(size)` method. (See below.) ### new stream.Readable([options]) @@ -105,32 +105,39 @@ In classes that extend the Readable class, make sure to call the constructor so that the buffering settings can be properly initialized. -### readable.\_read(size, callback) +### readable.\_read(size) * `size` {Number} Number of bytes to read asynchronously * `callback` {Function} Called with an error or with data -All Readable stream implementations must provide a `_read` method -to fetch data from the underlying resource. - -Note: **This function MUST NOT be called directly.** It should be +Note: **This function should NOT be called directly.** It should be implemented by child classes, and called by the internal Readable class methods only. -Call the callback using the standard `callback(error, data)` pattern. -When no more data can be fetched, call `callback(null, null)` to -signal the EOF. +All Readable stream implementations must provide a `_read` method +to fetch data from the underlying resource. This method is prefixed with an underscore because it is internal to the class that defines it, and should not be called directly by user programs. However, you **are** expected to override this method in your own extension classes. +When data is available, put it into the read queue by calling +`readable.push(chunk)`. If `push` returns false, then you should stop +reading. When `_read` is called again, you should start pushing more +data. + ### readable.push(chunk) * `chunk` {Buffer | null | String} Chunk of data to push into the read queue * return {Boolean} Whether or not more pushes should be performed +Note: **This function should be called by Readable implementors, NOT +by consumers of Readable subclasses.** The `_read()` function will not +be called again until at least one `push(chunk)` call is made. If no +data is available, then you MAY call `push('')` (an empty string) to +allow a future `_read` call, without adding any data to the queue. + The `Readable` class works by putting data into a read queue to be pulled out later by calling the `read()` method when the `'readable'` event fires. @@ -167,6 +174,115 @@ stream._read = function(size, cb) { }; ``` +### readable.unshift(chunk) + +* `chunk` {Buffer | null | String} Chunk of data to unshift onto the read queue +* return {Boolean} Whether or not more pushes should be performed + +This is the corollary of `readable.push(chunk)`. Rather than putting +the data at the *end* of the read queue, it puts it at the *front* of +the read queue. + +This is useful in certain use-cases where a stream is being consumed +by a parser, which needs to "un-consume" some data that it has +optimistically pulled out of the source. + +```javascript +// A parser for a simple data protocol. +// The "header" is a JSON object, followed by 2 \n characters, and +// then a message body. +// +// Note: This can be done more simply as a Transform stream. See below. + +function SimpleProtocol(source, options) { + if (!(this instanceof SimpleProtocol)) + return new SimpleProtocol(options); + + Readable.call(this, options); + this._inBody = false; + this._sawFirstCr = false; + + // source is a readable stream, such as a socket or file + this._source = source; + + var self = this; + source.on('end', function() { + self.push(null); + }); + + // give it a kick whenever the source is readable + // read(0) will not consume any bytes + source.on('readable', function() { + self.read(0); + }); + + this._rawHeader = []; + this.header = null; +} + +SimpleProtocol.prototype = Object.create( + Readable.prototype, { constructor: { value: SimpleProtocol }}); + +SimpleProtocol.prototype._read = function(n) { + if (!this._inBody) { + var chunk = this._source.read(); + + // if the source doesn't have data, we don't have data yet. + if (chunk === null) + return this.push(''); + + // check if the chunk has a \n\n + var split = -1; + for (var i = 0; i < chunk.length; i++) { + if (chunk[i] === 10) { // '\n' + if (this._sawFirstCr) { + split = i; + break; + } else { + this._sawFirstCr = true; + } + } else { + this._sawFirstCr = false; + } + } + + if (split === -1) { + // still waiting for the \n\n + // stash the chunk, and try again. + this._rawHeader.push(chunk); + this.push(''); + } else { + this._inBody = true; + var h = chunk.slice(0, split); + this._rawHeader.push(h); + var header = Buffer.concat(this._rawHeader).toString(); + try { + this.header = JSON.parse(header); + } catch (er) { + this.emit('error', new Error('invalid simple protocol data')); + return; + } + // now, because we got some extra data, unshift the rest + // back into the read queue so that our consumer will see it. + this.unshift(b); + + // and let them know that we are done parsing the header. + this.emit('header', this.header); + } + } else { + // from there on, just provide the data to our consumer. + // careful not to push(null), since that would indicate EOF. + var chunk = this._source.read(); + if (chunk) this.push(chunk); + } +}; + +// Usage: +var parser = new SimpleProtocol(source); +// Now parser is a readable stream that will emit 'header' +// with the parsed header data. +``` + ### readable.wrap(stream) * `stream` {Stream} An "old style" readable stream @@ -232,6 +348,8 @@ constructor. * `size` {Number | null} Optional number of bytes to read. * Return: {Buffer | String | null} +Note: **This function SHOULD be called by Readable stream users.** + Call this method to consume data once the `'readable'` event is emitted. @@ -243,8 +361,8 @@ If there is no data to consume, or if there are fewer bytes in the internal buffer than the `size` argument, then `null` is returned, and a future `'readable'` event will be emitted when more is available. -Note that calling `stream.read(0)` will always return `null`, and will -trigger a refresh of the internal buffer, but otherwise be a no-op. +Calling `stream.read(0)` will always return `null`, and will trigger a +refresh of the internal buffer, but otherwise be a no-op. ### readable.pipe(destination, [options]) @@ -416,14 +534,14 @@ A "duplex" stream is one that is both Readable and Writable, such as a TCP socket connection. Note that `stream.Duplex` is an abstract class designed to be -extended with an underlying implementation of the `_read(size, cb)` +extended with an underlying implementation of the `_read(size)` and `_write(chunk, callback)` methods as you would with a Readable or Writable stream class. Since JavaScript doesn't have multiple prototypal inheritance, this class prototypally inherits from Readable, and then parasitically from Writable. It is thus up to the user to implement both the lowlevel -`_read(n,cb)` method as well as the lowlevel `_write(chunk,cb)` method +`_read(n)` method as well as the lowlevel `_write(chunk,cb)` method on extension duplex classes. ### new stream.Duplex(options) @@ -471,13 +589,13 @@ initialized. * `callback` {Function} Call this function (optionally with an error argument) when you are done processing the supplied chunk. -All Transform stream implementations must provide a `_transform` -method to accept input and produce output. - Note: **This function MUST NOT be called directly.** It should be implemented by child classes, and called by the internal Transform class methods only. +All Transform stream implementations must provide a `_transform` +method to accept input and produce output. + `_transform` should do whatever has to be done in this specific Transform class, to handle the bytes being written, and pass them off to the readable portion of the interface. Do asynchronous I/O, @@ -521,6 +639,82 @@ the class that defines it, and should not be called directly by user programs. However, you **are** expected to override this method in your own extension classes. +### Example: `SimpleProtocol` parser + +The example above of a simple protocol parser can be implemented much +more simply by using the higher level `Transform` stream class. + +In this example, rather than providing the input as an argument, it +would be piped into the parser, which is a more idiomatic Node stream +approach. + +```javascript +function SimpleProtocol(options) { + if (!(this instanceof SimpleProtocol)) + return new SimpleProtocol(options); + + Transform.call(this, options); + this._inBody = false; + this._sawFirstCr = false; + this._rawHeader = []; + this.header = null; +} + +SimpleProtocol.prototype = Object.create( + Transform.prototype, { constructor: { value: SimpleProtocol }}); + +SimpleProtocol.prototype._transform = function(chunk, output, done) { + if (!this._inBody) { + // check if the chunk has a \n\n + var split = -1; + for (var i = 0; i < chunk.length; i++) { + if (chunk[i] === 10) { // '\n' + if (this._sawFirstCr) { + split = i; + break; + } else { + this._sawFirstCr = true; + } + } else { + this._sawFirstCr = false; + } + } + + if (split === -1) { + // still waiting for the \n\n + // stash the chunk, and try again. + this._rawHeader.push(chunk); + } else { + this._inBody = true; + var h = chunk.slice(0, split); + this._rawHeader.push(h); + var header = Buffer.concat(this._rawHeader).toString(); + try { + this.header = JSON.parse(header); + } catch (er) { + this.emit('error', new Error('invalid simple protocol data')); + return; + } + // and let them know that we are done parsing the header. + this.emit('header', this.header); + + // now, because we got some extra data, emit this first. + output(b); + } + } else { + // from there on, just provide the data to our consumer as-is. + output(b); + } + done(); +}; + +var parser = new SimpleProtocol(); +source.pipe(parser) + +// Now parser is a readable stream that will emit 'header' +// with the parsed header data. +``` + ## Class: stream.PassThrough