tls: Improve TLS flow control

Fixes #1775.
pull/5370/head
koichik 2011-09-27 18:25:10 +09:00
parent 49ac083dc1
commit 4cdf9d4158
1 changed files with 41 additions and 7 deletions

View File

@ -52,6 +52,7 @@ function CryptoStream(pair) {
this.readable = this.writable = true;
this._paused = false;
this._needDrain = false;
this._pending = [];
this._pendingCallbacks = [];
this._pendingBytes = 0;
@ -86,7 +87,7 @@ CryptoStream.prototype.write = function(data /* , encoding, cb */) {
data = new Buffer(data, encoding);
}
debug('clearIn data');
debug((this === this.pair.cleartext ? 'clear' : 'encrypted') + 'In data');
this._pending.push(data);
this._pendingCallbacks.push(cb);
@ -95,7 +96,26 @@ CryptoStream.prototype.write = function(data /* , encoding, cb */) {
this.pair._writeCalled = true;
this.pair.cycle();
return this._pendingBytes < 128 * 1024;
// In the following cases, write() should return a false,
// then this stream should eventually emit 'drain' event.
//
// 1. There are pending data more than 128k bytes.
// 2. A forward stream shown below is paused.
// A) EncryptedStream for CleartextStream.write().
// B) CleartextStream for EncryptedStream.write().
//
if (!this._needDrain) {
if (this._pendingBytes >= 128 * 1024) {
this._needDrain = true;
} else {
if (this === this.pair.cleartext) {
this._needDrain = this.pair.encrypted._paused;
} else {
this._needDrain = this.pair.cleartext._paused;
}
}
}
return !this._needDrain;
};
@ -380,11 +400,25 @@ CryptoStream.prototype._pull = function() {
assert(rv === tmp.length);
}
// If we've cleared all of incoming encrypted data, emit drain.
if (havePending && this._pending.length === 0) {
debug('drain');
this.emit('drain');
if (this.__destroyOnDrain) this.end();
// If pending data has cleared, 'drain' event should be emitted
// after write() returns a false.
// Except when a forward stream shown below is paused.
// A) EncryptedStream for CleartextStream._pull().
// B) CleartextStream for EncryptedStream._pull().
//
if (this._needDrain && this._pending.length === 0) {
var paused;
if (this === this.pair.cleartext) {
paused = this.pair.encrypted._paused;
} else {
paused = this.pair.cleartext._paused;
}
if (!paused) {
debug('drain');
process.nextTick(this.emit.bind(this, 'drain'));
this._needDrain = false;
if (this.__destroyOnDrain) this.end();
}
}
};