From c4161f32f5e2f36db25629b3bf2ab617e66070ae Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Wed, 15 Dec 2010 15:47:02 -0800 Subject: [PATCH] Add callback to socket.write() --- doc/api/net.markdown | 7 +++++-- lib/net.js | 25 ++++++++++++++++++++++--- test/simple/test-net-connect-buffer.js | 22 ++++++++++++++++++---- test/simple/test-net-pingpong.js | 9 +++++++-- 4 files changed, 52 insertions(+), 11 deletions(-) diff --git a/doc/api/net.markdown b/doc/api/net.markdown index 229ecaf38f4..6146fd751d8 100644 --- a/doc/api/net.markdown +++ b/doc/api/net.markdown @@ -196,7 +196,7 @@ context of the defined or default list of trusted CA certificates. Returns a JSON structure detailing the peer's certificate, containing a dictionary with keys for the certificate `'subject'`, `'issuer'`, `'valid_from'` and `'valid_to'`. -#### stream.write(data, [encoding]) +#### stream.write(data, [encoding], [callback]) Sends data on the stream. The second parameter specifies the encoding in the case of a string--it defaults to UTF8 encoding. @@ -205,7 +205,10 @@ Returns `true` if the entire data was flushed successfully to the kernel buffer. Returns `false` if all or part of the data was queued in user memory. `'drain'` will be emitted when the buffer is again free. -#### stream.write(data, [encoding], [fileDescriptor]) +The optional `callback` parameter will be executed when the data is finally +written out - this may not be immediately. + +#### stream.write(data, [encoding], [fileDescriptor], [callback]) For UNIX sockets, it is possible to send a file descriptor through the stream. Simply add the `fileDescriptor` argument and listen for the `'fd'` diff --git a/lib/net.js b/lib/net.js index e6ec7961f0f..db491be59bb 100644 --- a/lib/net.js +++ b/lib/net.js @@ -178,6 +178,7 @@ function initStream(self) { self._writeQueue = []; self._writeQueueEncoding = []; self._writeQueueFD = []; + self._writeQueueCallbacks = []; self._writeWatcher = ioWatchers.alloc(); self._writeWatcher.socket = self; @@ -296,6 +297,7 @@ Stream.prototype.write = function(data /* [encoding], [fd], [cb] */) { this._writeQueue = []; this._writeQueueEncoding = []; this._writeQueueFD = []; + this._writeQueueCallbacks = []; } // Slow. There is already a write queue, so let's append to it. @@ -311,9 +313,22 @@ Stream.prototype.write = function(data /* [encoding], [fd], [cb] */) { this._writeQueueEncoding[last] === encoding) { // optimization - concat onto last this._writeQueue[last] += data; + + if (cb) { + if (!this._writeQueueCallbacks[last]) { + this._writeQueueCallbacks[last] = cb; + } else { + // awful + this._writeQueueCallbacks[last] = function () { + this._writeQueueCallbacks[last](); + cb(); + }; + } + } } else { this._writeQueue.push(data); this._writeQueueEncoding.push(encoding); + this._writeQueueCallbacks.push(cb); } if (fd != undefined) { @@ -325,7 +340,7 @@ Stream.prototype.write = function(data /* [encoding], [fd], [cb] */) { // Fast. // The most common case. There is no write queue. Just push the data // directly to the socket. - return this._writeOut(data, encoding, fd); + return this._writeOut(data, encoding, fd, cb); } }; @@ -337,7 +352,7 @@ Stream.prototype.write = function(data /* [encoding], [fd], [cb] */) { // 2. Write data to socket. Return true if flushed. // 3. Slice out remaining // 4. Unshift remaining onto _writeQueue. Return false. -Stream.prototype._writeOut = function(data, encoding, fd) { +Stream.prototype._writeOut = function(data, encoding, fd, cb) { if (!this.writable) { throw new Error('Stream is not writable'); } @@ -388,6 +403,7 @@ Stream.prototype._writeOut = function(data, encoding, fd) { // Unshift whatever didn't fit onto the buffer this._writeQueue.unshift(data.slice(charsWritten)); this._writeQueueEncoding.unshift(encoding); + this._writeQueueCallbacks.unshift(cb); this._writeWatcher.start(); queuedData = true; } @@ -416,6 +432,7 @@ Stream.prototype._writeOut = function(data, encoding, fd) { if (queuedData) { return false; } else { + if (cb) cb(); return true; } } @@ -434,6 +451,7 @@ Stream.prototype._writeOut = function(data, encoding, fd) { // data should be the next thing to write. this._writeQueue.unshift(leftOver); this._writeQueueEncoding.unshift(null); + this._writeQueueCallbacks.unshift(cb); // If didn't successfully write any bytes, enqueue our fd and try again if (!bytesWritten) { @@ -450,6 +468,7 @@ Stream.prototype.flush = function() { while (this._writeQueue && this._writeQueue.length) { var data = this._writeQueue.shift(); var encoding = this._writeQueueEncoding.shift(); + var cb = this._writeQueueCallbacks.shift(); var fd = this._writeQueueFD.shift(); if (data === END_OF_FILE) { @@ -457,7 +476,7 @@ Stream.prototype.flush = function() { return true; } - var flushed = this._writeOut(data, encoding, fd); + var flushed = this._writeOut(data, encoding, fd, cb); if (!flushed) return false; } if (this._writeWatcher) this._writeWatcher.stop(); diff --git a/test/simple/test-net-connect-buffer.js b/test/simple/test-net-connect-buffer.js index f4cada937a5..6e6bc885280 100644 --- a/test/simple/test-net-connect-buffer.js +++ b/test/simple/test-net-connect-buffer.js @@ -3,6 +3,8 @@ var assert = require('assert'); var net = require('net'); var tcpPort = common.PORT; +var fooWritten = false; +var connectHappened = false; var tcp = net.Server(function(s) { tcp.close(); @@ -25,23 +27,35 @@ var tcp = net.Server(function(s) { process.exit(1); }); }); -tcp.listen(tcpPort, startClient); -function startClient() { +tcp.listen(common.PORT, function () { var socket = net.Stream(); console.log('Connecting to socket'); socket.connect(tcpPort); + socket.on('connect', function() { console.log('socket connected'); + connectHappened = true; }); assert.equal('opening', socket.readyState); - assert.equal(false, socket.write('foo')); + var r = socket.write('foo', function () { + fooWritten = true; + assert.ok(connectHappened); + console.error("foo written"); + }); + + assert.equal(false, r); socket.end('bar'); assert.equal('opening', socket.readyState); -} +}); + +process.on('exit', function () { + assert.ok(connectHappened); + assert.ok(fooWritten); +}); diff --git a/test/simple/test-net-pingpong.js b/test/simple/test-net-pingpong.js index 66307287b81..c154e6783c8 100644 --- a/test/simple/test-net-pingpong.js +++ b/test/simple/test-net-pingpong.js @@ -8,6 +8,7 @@ var tests_run = 0; function pingPongTest(port, host) { var N = 1000; var count = 0; + var sentPongs = 0; var sent_final_ping = false; var server = net.createServer({ allowHalfOpen: true }, function(socket) { @@ -25,7 +26,10 @@ function pingPongTest(port, host) { assert.equal(true, socket.readable); assert.equal(true, count <= N); if (/PING/.exec(data)) { - socket.write('PONG'); + socket.write('PONG', function () { + sentPongs++; + console.error('sent PONG'); + }); } }); @@ -85,8 +89,9 @@ function pingPongTest(port, host) { }); client.addListener('close', function() { - console.log('client.endd'); + console.log('client.end'); assert.equal(N + 1, count); + assert.equal(N + 1, sentPongs); assert.equal(true, sent_final_ping); tests_run += 1; });