Add callback to socket.write()

pull/5370/head
Ryan Dahl 2010-12-15 15:47:02 -08:00
parent c970968ee6
commit c4161f32f5
4 changed files with 52 additions and 11 deletions

View File

@ -196,7 +196,7 @@ context of the defined or default list of trusted CA certificates.
Returns a JSON structure detailing the peer's certificate, containing a dictionary
with keys for the certificate `'subject'`, `'issuer'`, `'valid_from'` and `'valid_to'`.
#### stream.write(data, [encoding])
#### stream.write(data, [encoding], [callback])
Sends data on the stream. The second parameter specifies the encoding in the
case of a string--it defaults to UTF8 encoding.
@ -205,7 +205,10 @@ Returns `true` if the entire data was flushed successfully to the kernel
buffer. Returns `false` if all or part of the data was queued in user memory.
`'drain'` will be emitted when the buffer is again free.
#### stream.write(data, [encoding], [fileDescriptor])
The optional `callback` parameter will be executed when the data is finally
written out - this may not be immediately.
#### stream.write(data, [encoding], [fileDescriptor], [callback])
For UNIX sockets, it is possible to send a file descriptor through the
stream. Simply add the `fileDescriptor` argument and listen for the `'fd'`

View File

@ -178,6 +178,7 @@ function initStream(self) {
self._writeQueue = [];
self._writeQueueEncoding = [];
self._writeQueueFD = [];
self._writeQueueCallbacks = [];
self._writeWatcher = ioWatchers.alloc();
self._writeWatcher.socket = self;
@ -296,6 +297,7 @@ Stream.prototype.write = function(data /* [encoding], [fd], [cb] */) {
this._writeQueue = [];
this._writeQueueEncoding = [];
this._writeQueueFD = [];
this._writeQueueCallbacks = [];
}
// Slow. There is already a write queue, so let's append to it.
@ -311,9 +313,22 @@ Stream.prototype.write = function(data /* [encoding], [fd], [cb] */) {
this._writeQueueEncoding[last] === encoding) {
// optimization - concat onto last
this._writeQueue[last] += data;
if (cb) {
if (!this._writeQueueCallbacks[last]) {
this._writeQueueCallbacks[last] = cb;
} else {
// awful
this._writeQueueCallbacks[last] = function () {
this._writeQueueCallbacks[last]();
cb();
};
}
}
} else {
this._writeQueue.push(data);
this._writeQueueEncoding.push(encoding);
this._writeQueueCallbacks.push(cb);
}
if (fd != undefined) {
@ -325,7 +340,7 @@ Stream.prototype.write = function(data /* [encoding], [fd], [cb] */) {
// Fast.
// The most common case. There is no write queue. Just push the data
// directly to the socket.
return this._writeOut(data, encoding, fd);
return this._writeOut(data, encoding, fd, cb);
}
};
@ -337,7 +352,7 @@ Stream.prototype.write = function(data /* [encoding], [fd], [cb] */) {
// 2. Write data to socket. Return true if flushed.
// 3. Slice out remaining
// 4. Unshift remaining onto _writeQueue. Return false.
Stream.prototype._writeOut = function(data, encoding, fd) {
Stream.prototype._writeOut = function(data, encoding, fd, cb) {
if (!this.writable) {
throw new Error('Stream is not writable');
}
@ -388,6 +403,7 @@ Stream.prototype._writeOut = function(data, encoding, fd) {
// Unshift whatever didn't fit onto the buffer
this._writeQueue.unshift(data.slice(charsWritten));
this._writeQueueEncoding.unshift(encoding);
this._writeQueueCallbacks.unshift(cb);
this._writeWatcher.start();
queuedData = true;
}
@ -416,6 +432,7 @@ Stream.prototype._writeOut = function(data, encoding, fd) {
if (queuedData) {
return false;
} else {
if (cb) cb();
return true;
}
}
@ -434,6 +451,7 @@ Stream.prototype._writeOut = function(data, encoding, fd) {
// data should be the next thing to write.
this._writeQueue.unshift(leftOver);
this._writeQueueEncoding.unshift(null);
this._writeQueueCallbacks.unshift(cb);
// If didn't successfully write any bytes, enqueue our fd and try again
if (!bytesWritten) {
@ -450,6 +468,7 @@ Stream.prototype.flush = function() {
while (this._writeQueue && this._writeQueue.length) {
var data = this._writeQueue.shift();
var encoding = this._writeQueueEncoding.shift();
var cb = this._writeQueueCallbacks.shift();
var fd = this._writeQueueFD.shift();
if (data === END_OF_FILE) {
@ -457,7 +476,7 @@ Stream.prototype.flush = function() {
return true;
}
var flushed = this._writeOut(data, encoding, fd);
var flushed = this._writeOut(data, encoding, fd, cb);
if (!flushed) return false;
}
if (this._writeWatcher) this._writeWatcher.stop();

View File

@ -3,6 +3,8 @@ var assert = require('assert');
var net = require('net');
var tcpPort = common.PORT;
var fooWritten = false;
var connectHappened = false;
var tcp = net.Server(function(s) {
tcp.close();
@ -25,23 +27,35 @@ var tcp = net.Server(function(s) {
process.exit(1);
});
});
tcp.listen(tcpPort, startClient);
function startClient() {
tcp.listen(common.PORT, function () {
var socket = net.Stream();
console.log('Connecting to socket');
socket.connect(tcpPort);
socket.on('connect', function() {
console.log('socket connected');
connectHappened = true;
});
assert.equal('opening', socket.readyState);
assert.equal(false, socket.write('foo'));
var r = socket.write('foo', function () {
fooWritten = true;
assert.ok(connectHappened);
console.error("foo written");
});
assert.equal(false, r);
socket.end('bar');
assert.equal('opening', socket.readyState);
}
});
process.on('exit', function () {
assert.ok(connectHappened);
assert.ok(fooWritten);
});

View File

@ -8,6 +8,7 @@ var tests_run = 0;
function pingPongTest(port, host) {
var N = 1000;
var count = 0;
var sentPongs = 0;
var sent_final_ping = false;
var server = net.createServer({ allowHalfOpen: true }, function(socket) {
@ -25,7 +26,10 @@ function pingPongTest(port, host) {
assert.equal(true, socket.readable);
assert.equal(true, count <= N);
if (/PING/.exec(data)) {
socket.write('PONG');
socket.write('PONG', function () {
sentPongs++;
console.error('sent PONG');
});
}
});
@ -85,8 +89,9 @@ function pingPongTest(port, host) {
});
client.addListener('close', function() {
console.log('client.endd');
console.log('client.end');
assert.equal(N + 1, count);
assert.equal(N + 1, sentPongs);
assert.equal(true, sent_final_ping);
tests_run += 1;
});