mirror of https://github.com/nodejs/node.git
tls: allow reading data into a static buffer
Refs: #25436 PR-URL: https://github.com/nodejs/node/pull/35753 Refs: https://github.com/nodejs/node/pull/25436 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Minwoo Jung <nodecorelab@gmail.com>pull/35808/head
parent
ab660a5416
commit
761c1b0797
|
@ -0,0 +1,104 @@
|
|||
'use strict';
|
||||
const common = require('../common.js');
|
||||
const bench = common.createBenchmark(main, {
|
||||
dur: [5],
|
||||
type: ['buf', 'asc', 'utf'],
|
||||
sendchunklen: [256, 32 * 1024, 128 * 1024, 16 * 1024 * 1024],
|
||||
recvbuflen: [0, 64 * 1024, 1024 * 1024],
|
||||
recvbufgenfn: ['true', 'false']
|
||||
});
|
||||
|
||||
const fixtures = require('../../test/common/fixtures');
|
||||
let options;
|
||||
let recvbuf;
|
||||
let received = 0;
|
||||
const tls = require('tls');
|
||||
|
||||
function main({ dur, type, sendchunklen, recvbuflen, recvbufgenfn }) {
|
||||
if (isFinite(recvbuflen) && recvbuflen > 0)
|
||||
recvbuf = Buffer.alloc(recvbuflen);
|
||||
|
||||
let encoding;
|
||||
let chunk;
|
||||
switch (type) {
|
||||
case 'buf':
|
||||
chunk = Buffer.alloc(sendchunklen, 'b');
|
||||
break;
|
||||
case 'asc':
|
||||
chunk = 'a'.repeat(sendchunklen);
|
||||
encoding = 'ascii';
|
||||
break;
|
||||
case 'utf':
|
||||
chunk = 'ü'.repeat(sendchunklen / 2);
|
||||
encoding = 'utf8';
|
||||
break;
|
||||
default:
|
||||
throw new Error('invalid type');
|
||||
}
|
||||
|
||||
options = {
|
||||
key: fixtures.readKey('rsa_private.pem'),
|
||||
cert: fixtures.readKey('rsa_cert.crt'),
|
||||
ca: fixtures.readKey('rsa_ca.crt'),
|
||||
ciphers: 'AES256-GCM-SHA384'
|
||||
};
|
||||
|
||||
let socketOpts;
|
||||
if (recvbuf === undefined) {
|
||||
socketOpts = { port: common.PORT, rejectUnauthorized: false };
|
||||
} else {
|
||||
let buffer = recvbuf;
|
||||
if (recvbufgenfn === 'true') {
|
||||
let bufidx = -1;
|
||||
const bufpool = [
|
||||
recvbuf,
|
||||
Buffer.from(recvbuf),
|
||||
Buffer.from(recvbuf),
|
||||
];
|
||||
buffer = () => {
|
||||
bufidx = (bufidx + 1) % bufpool.length;
|
||||
return bufpool[bufidx];
|
||||
};
|
||||
}
|
||||
socketOpts = {
|
||||
port: common.PORT,
|
||||
rejectUnauthorized: false,
|
||||
onread: {
|
||||
buffer,
|
||||
callback: function(nread, buf) {
|
||||
received += nread;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
const server = tls.createServer(options, (socket) => {
|
||||
socket.on('data', (buf) => {
|
||||
socket.on('drain', write);
|
||||
write();
|
||||
});
|
||||
|
||||
function write() {
|
||||
while (false !== socket.write(chunk, encoding));
|
||||
}
|
||||
});
|
||||
|
||||
let conn;
|
||||
server.listen(common.PORT, () => {
|
||||
conn = tls.connect(socketOpts, () => {
|
||||
setTimeout(done, dur * 1000);
|
||||
bench.start();
|
||||
conn.write('hello');
|
||||
});
|
||||
|
||||
conn.on('data', (chunk) => {
|
||||
received += chunk.length;
|
||||
});
|
||||
});
|
||||
|
||||
function done() {
|
||||
const mbits = (received * 8) / (1024 * 1024);
|
||||
bench.end(mbits);
|
||||
process.exit(0);
|
||||
}
|
||||
}
|
|
@ -1343,6 +1343,9 @@ being issued by trusted CA (`options.ca`).
|
|||
<!-- YAML
|
||||
added: v0.11.3
|
||||
changes:
|
||||
- version: REPLACEME
|
||||
pr-url: https://github.com/nodejs/node/pull/35753
|
||||
description: Added `onread` option.
|
||||
- version:
|
||||
- v14.1.0
|
||||
- v13.14.0
|
||||
|
@ -1456,6 +1459,10 @@ changes:
|
|||
[`tls.createSecureContext()`][]. If a `secureContext` is _not_ provided, one
|
||||
will be created by passing the entire `options` object to
|
||||
`tls.createSecureContext()`.
|
||||
* `onread` {Object} If the `socket` option is missing, incoming data is
|
||||
stored in a single `buffer` and passed to the supplied `callback` when
|
||||
data arrives on the socket, otherwise the option is ignored. See the
|
||||
`onread` option of [`net.Socket`][] for details.
|
||||
* ...: [`tls.createSecureContext()`][] options that are used if the
|
||||
`secureContext` option is missing, otherwise they are ignored.
|
||||
* ...: Any [`socket.connect()`][] option not already listed.
|
||||
|
|
|
@ -502,6 +502,7 @@ function TLSSocket(socket, opts) {
|
|||
pauseOnCreate: tlsOptions.pauseOnConnect,
|
||||
manualStart: true,
|
||||
highWaterMark: tlsOptions.highWaterMark,
|
||||
onread: !socket ? tlsOptions.onread : null,
|
||||
});
|
||||
|
||||
// Proxy for API compatibility
|
||||
|
@ -1592,6 +1593,7 @@ exports.connect = function connect(...args) {
|
|||
enableTrace: options.enableTrace,
|
||||
pskCallback: options.pskCallback,
|
||||
highWaterMark: options.highWaterMark,
|
||||
onread: options.onread,
|
||||
});
|
||||
|
||||
tlssock[kConnectOptions] = options;
|
||||
|
|
85
lib/net.js
85
lib/net.js
|
@ -304,57 +304,56 @@ function Socket(options) {
|
|||
if (options.handle) {
|
||||
this._handle = options.handle; // private
|
||||
this[async_id_symbol] = getNewAsyncId(this._handle);
|
||||
} else {
|
||||
const onread = options.onread;
|
||||
if (onread !== null && typeof onread === 'object' &&
|
||||
(isUint8Array(onread.buffer) || typeof onread.buffer === 'function') &&
|
||||
typeof onread.callback === 'function') {
|
||||
if (typeof onread.buffer === 'function') {
|
||||
this[kBuffer] = true;
|
||||
this[kBufferGen] = onread.buffer;
|
||||
} else {
|
||||
this[kBuffer] = onread.buffer;
|
||||
}
|
||||
this[kBufferCb] = onread.callback;
|
||||
}
|
||||
if (options.fd !== undefined) {
|
||||
const { fd } = options;
|
||||
let err;
|
||||
} else if (options.fd !== undefined) {
|
||||
const { fd } = options;
|
||||
let err;
|
||||
|
||||
// createHandle will throw ERR_INVALID_FD_TYPE if `fd` is not
|
||||
// a valid `PIPE` or `TCP` descriptor
|
||||
this._handle = createHandle(fd, false);
|
||||
// createHandle will throw ERR_INVALID_FD_TYPE if `fd` is not
|
||||
// a valid `PIPE` or `TCP` descriptor
|
||||
this._handle = createHandle(fd, false);
|
||||
|
||||
err = this._handle.open(fd);
|
||||
err = this._handle.open(fd);
|
||||
|
||||
// While difficult to fabricate, in some architectures
|
||||
// `open` may return an error code for valid file descriptors
|
||||
// which cannot be opened. This is difficult to test as most
|
||||
// un-openable fds will throw on `createHandle`
|
||||
// While difficult to fabricate, in some architectures
|
||||
// `open` may return an error code for valid file descriptors
|
||||
// which cannot be opened. This is difficult to test as most
|
||||
// un-openable fds will throw on `createHandle`
|
||||
if (err)
|
||||
throw errnoException(err, 'open');
|
||||
|
||||
this[async_id_symbol] = this._handle.getAsyncId();
|
||||
|
||||
if ((fd === 1 || fd === 2) &&
|
||||
(this._handle instanceof Pipe) && isWindows) {
|
||||
// Make stdout and stderr blocking on Windows
|
||||
err = this._handle.setBlocking(true);
|
||||
if (err)
|
||||
throw errnoException(err, 'open');
|
||||
throw errnoException(err, 'setBlocking');
|
||||
|
||||
this[async_id_symbol] = this._handle.getAsyncId();
|
||||
|
||||
if ((fd === 1 || fd === 2) &&
|
||||
(this._handle instanceof Pipe) && isWindows) {
|
||||
// Make stdout and stderr blocking on Windows
|
||||
err = this._handle.setBlocking(true);
|
||||
if (err)
|
||||
throw errnoException(err, 'setBlocking');
|
||||
|
||||
this._writev = null;
|
||||
this._write = makeSyncWrite(fd);
|
||||
// makeSyncWrite adjusts this value like the original handle would, so
|
||||
// we need to let it do that by turning it into a writable, own
|
||||
// property.
|
||||
ObjectDefineProperty(this._handle, 'bytesWritten', {
|
||||
value: 0, writable: true
|
||||
});
|
||||
}
|
||||
this._writev = null;
|
||||
this._write = makeSyncWrite(fd);
|
||||
// makeSyncWrite adjusts this value like the original handle would, so
|
||||
// we need to let it do that by turning it into a writable, own
|
||||
// property.
|
||||
ObjectDefineProperty(this._handle, 'bytesWritten', {
|
||||
value: 0, writable: true
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const onread = options.onread;
|
||||
if (onread !== null && typeof onread === 'object' &&
|
||||
(isUint8Array(onread.buffer) || typeof onread.buffer === 'function') &&
|
||||
typeof onread.callback === 'function') {
|
||||
if (typeof onread.buffer === 'function') {
|
||||
this[kBuffer] = true;
|
||||
this[kBufferGen] = onread.buffer;
|
||||
} else {
|
||||
this[kBuffer] = onread.buffer;
|
||||
}
|
||||
this[kBufferCb] = onread.callback;
|
||||
}
|
||||
|
||||
// Shut down the socket when we're finished with it.
|
||||
this.on('end', onReadableStreamEnd);
|
||||
|
||||
|
|
|
@ -0,0 +1,206 @@
|
|||
'use strict';
|
||||
const common = require('../common');
|
||||
if (!common.hasCrypto)
|
||||
common.skip('missing crypto');
|
||||
|
||||
const assert = require('assert');
|
||||
const tls = require('tls');
|
||||
const fixtures = require('../common/fixtures');
|
||||
|
||||
const options = {
|
||||
key: fixtures.readKey('agent2-key.pem'),
|
||||
cert: fixtures.readKey('agent2-cert.pem')
|
||||
};
|
||||
|
||||
const smallMessage = Buffer.from('hello world');
|
||||
// Used to test .pause(), so needs to be larger than the internal buffer
|
||||
const largeMessage = Buffer.alloc(64 * 1024).fill('hello world');
|
||||
|
||||
// Test typical usage
|
||||
tls.createServer(options, common.mustCall(function(socket) {
|
||||
this.close();
|
||||
socket.end(smallMessage);
|
||||
})).listen(0, function() {
|
||||
let received = 0;
|
||||
const buffers = [];
|
||||
const sockBuf = Buffer.alloc(8);
|
||||
tls.connect({
|
||||
port: this.address().port,
|
||||
rejectUnauthorized: false,
|
||||
onread: {
|
||||
buffer: sockBuf,
|
||||
callback: function(nread, buf) {
|
||||
assert.strictEqual(buf, sockBuf);
|
||||
received += nread;
|
||||
buffers.push(Buffer.from(buf.slice(0, nread)));
|
||||
}
|
||||
}
|
||||
}).on('data', common.mustNotCall()).on('end', common.mustCall(() => {
|
||||
assert.strictEqual(received, smallMessage.length);
|
||||
assert.deepStrictEqual(Buffer.concat(buffers), smallMessage);
|
||||
}));
|
||||
});
|
||||
|
||||
// Test Uint8Array support
|
||||
tls.createServer(options, common.mustCall(function(socket) {
|
||||
this.close();
|
||||
socket.end(smallMessage);
|
||||
})).listen(0, function() {
|
||||
let received = 0;
|
||||
let incoming = new Uint8Array(0);
|
||||
const sockBuf = new Uint8Array(8);
|
||||
tls.connect({
|
||||
port: this.address().port,
|
||||
rejectUnauthorized: false,
|
||||
onread: {
|
||||
buffer: sockBuf,
|
||||
callback: function(nread, buf) {
|
||||
assert.strictEqual(buf, sockBuf);
|
||||
received += nread;
|
||||
const newIncoming = new Uint8Array(incoming.length + nread);
|
||||
newIncoming.set(incoming);
|
||||
newIncoming.set(buf.slice(0, nread), incoming.length);
|
||||
incoming = newIncoming;
|
||||
}
|
||||
}
|
||||
}).on('data', common.mustNotCall()).on('end', common.mustCall(() => {
|
||||
assert.strictEqual(received, smallMessage.length);
|
||||
assert.deepStrictEqual(incoming, new Uint8Array(smallMessage));
|
||||
}));
|
||||
});
|
||||
|
||||
// Test Buffer callback usage
|
||||
tls.createServer(options, common.mustCall(function(socket) {
|
||||
this.close();
|
||||
socket.end(smallMessage);
|
||||
})).listen(0, function() {
|
||||
let received = 0;
|
||||
const incoming = [];
|
||||
const bufPool = [ Buffer.alloc(2), Buffer.alloc(2), Buffer.alloc(2) ];
|
||||
let bufPoolIdx = -1;
|
||||
let bufPoolUsage = 0;
|
||||
tls.connect({
|
||||
port: this.address().port,
|
||||
rejectUnauthorized: false,
|
||||
onread: {
|
||||
buffer: () => {
|
||||
++bufPoolUsage;
|
||||
bufPoolIdx = (bufPoolIdx + 1) % bufPool.length;
|
||||
return bufPool[bufPoolIdx];
|
||||
},
|
||||
callback: function(nread, buf) {
|
||||
assert.strictEqual(buf, bufPool[bufPoolIdx]);
|
||||
received += nread;
|
||||
incoming.push(Buffer.from(buf.slice(0, nread)));
|
||||
}
|
||||
}
|
||||
}).on('data', common.mustNotCall()).on('end', common.mustCall(() => {
|
||||
assert.strictEqual(received, smallMessage.length);
|
||||
assert.deepStrictEqual(Buffer.concat(incoming), smallMessage);
|
||||
assert.strictEqual(bufPoolUsage, 7);
|
||||
}));
|
||||
});
|
||||
|
||||
// Test Uint8Array callback support
|
||||
tls.createServer(options, common.mustCall(function(socket) {
|
||||
this.close();
|
||||
socket.end(smallMessage);
|
||||
})).listen(0, function() {
|
||||
let received = 0;
|
||||
let incoming = new Uint8Array(0);
|
||||
const bufPool = [ new Uint8Array(2), new Uint8Array(2), new Uint8Array(2) ];
|
||||
let bufPoolIdx = -1;
|
||||
let bufPoolUsage = 0;
|
||||
tls.connect({
|
||||
port: this.address().port,
|
||||
rejectUnauthorized: false,
|
||||
onread: {
|
||||
buffer: () => {
|
||||
++bufPoolUsage;
|
||||
bufPoolIdx = (bufPoolIdx + 1) % bufPool.length;
|
||||
return bufPool[bufPoolIdx];
|
||||
},
|
||||
callback: function(nread, buf) {
|
||||
assert.strictEqual(buf, bufPool[bufPoolIdx]);
|
||||
received += nread;
|
||||
const newIncoming = new Uint8Array(incoming.length + nread);
|
||||
newIncoming.set(incoming);
|
||||
newIncoming.set(buf.slice(0, nread), incoming.length);
|
||||
incoming = newIncoming;
|
||||
}
|
||||
}
|
||||
}).on('data', common.mustNotCall()).on('end', common.mustCall(() => {
|
||||
assert.strictEqual(received, smallMessage.length);
|
||||
assert.deepStrictEqual(incoming, new Uint8Array(smallMessage));
|
||||
assert.strictEqual(bufPoolUsage, 7);
|
||||
}));
|
||||
});
|
||||
|
||||
// Test explicit socket pause
|
||||
tls.createServer(options, common.mustCall(function(socket) {
|
||||
this.close();
|
||||
// Need larger message here to observe the pause
|
||||
socket.end(largeMessage);
|
||||
})).listen(0, function() {
|
||||
let received = 0;
|
||||
const buffers = [];
|
||||
const sockBuf = Buffer.alloc(64);
|
||||
let pauseScheduled = false;
|
||||
const client = tls.connect({
|
||||
port: this.address().port,
|
||||
rejectUnauthorized: false,
|
||||
onread: {
|
||||
buffer: sockBuf,
|
||||
callback: function(nread, buf) {
|
||||
assert.strictEqual(buf, sockBuf);
|
||||
received += nread;
|
||||
buffers.push(Buffer.from(buf.slice(0, nread)));
|
||||
if (!pauseScheduled) {
|
||||
pauseScheduled = true;
|
||||
client.pause();
|
||||
setTimeout(() => {
|
||||
client.resume();
|
||||
}, 100);
|
||||
}
|
||||
}
|
||||
}
|
||||
}).on('data', common.mustNotCall()).on('end', common.mustCall(() => {
|
||||
assert.strictEqual(received, largeMessage.length);
|
||||
assert.deepStrictEqual(Buffer.concat(buffers), largeMessage);
|
||||
}));
|
||||
});
|
||||
|
||||
// Test implicit socket pause
|
||||
tls.createServer(options, common.mustCall(function(socket) {
|
||||
this.close();
|
||||
// Need larger message here to observe the pause
|
||||
socket.end(largeMessage);
|
||||
})).listen(0, function() {
|
||||
let received = 0;
|
||||
const buffers = [];
|
||||
const sockBuf = Buffer.alloc(64);
|
||||
let pauseScheduled = false;
|
||||
const client = tls.connect({
|
||||
port: this.address().port,
|
||||
rejectUnauthorized: false,
|
||||
onread: {
|
||||
buffer: sockBuf,
|
||||
callback: function(nread, buf) {
|
||||
assert.strictEqual(buf, sockBuf);
|
||||
received += nread;
|
||||
buffers.push(Buffer.from(buf.slice(0, nread)));
|
||||
if (!pauseScheduled) {
|
||||
pauseScheduled = true;
|
||||
setTimeout(() => {
|
||||
client.resume();
|
||||
}, 100);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}).on('data', common.mustNotCall()).on('end', common.mustCall(() => {
|
||||
assert.strictEqual(received, largeMessage.length);
|
||||
assert.deepStrictEqual(Buffer.concat(buffers), largeMessage);
|
||||
}));
|
||||
});
|
Loading…
Reference in New Issue