From 0a430ae52122113cd474674e629e82b8cfca5b69 Mon Sep 17 00:00:00 2001 From: Martin Heidegger Date: Mon, 24 May 2021 03:48:39 +0900 Subject: [PATCH 1/3] moving from readable-streams to streamx --- binding.cc | 56 ++++--- index.js | 18 +- lib/connection.js | 420 ++++++++++++++++++++++++++-------------------- package.json | 4 +- test/net.js | 114 ++++++++----- test/timeouts.js | 16 +- 6 files changed, 369 insertions(+), 259 deletions(-) diff --git a/binding.cc b/binding.cc index ca10b25..dce6614 100644 --- a/binding.cc +++ b/binding.cc @@ -13,8 +13,20 @@ return NULL; \ } -#define NAPI_MAKE_CALLBACK_AND_ALLOC(env, nil, ctx, cb, n, argv, res, nread) \ - if (napi_make_callback(env, nil, ctx, cb, n, argv, &res) == napi_pending_exception) { \ +#define NAPI_MAKE_CALLBACK_AND_ALLOC(env, nil, ctx, cb, n, argv, nread) \ + napi_value res; \ + napi_status stat = napi_make_callback(env, nil, ctx, cb, n, argv, &res); \ + if (stat == napi_ok) { \ + bool is_buf; \ + napi_is_buffer(env, res, &is_buf); \ + if (is_buf) { \ + UTP_NAPI_BUFFER_ALLOC(self, res, nread) \ + } else { \ + size_t size = nread <= 0 ? 0 : nread; \ + self->buf.base += size; \ + self->buf.len -= size; \ + } \ + } else if (stat == napi_pending_exception) { \ napi_value fatal_exception; \ napi_get_and_clear_last_exception(env, &fatal_exception); \ napi_fatal_exception(env, fatal_exception); \ @@ -25,7 +37,7 @@ }) \ } \ } else { \ - UTP_NAPI_BUFFER_ALLOC(self, res, nread) \ + printf("[UTP-NATIVE]: Unexpected result of callback %i\n", stat); \ } #define UTP_NAPI_CALLBACK(fn, src) \ @@ -43,14 +55,8 @@ char *buf; \ size_t buf_len; \ napi_get_buffer_info(env, ret, (void **) &buf, &buf_len); \ - if (buf_len == 0) { \ - size_t size = nread <= 0 ? 0 : nread; \ - self->buf.base += size; \ - self->buf.len -= size; \ - } else { \ - self->buf.base = buf; \ - self->buf.len = buf_len; \ - } + self->buf.base = buf; \ + self->buf.len = buf_len; \ typedef struct { uint32_t min_recv_packet_size; @@ -71,6 +77,7 @@ typedef struct { napi_ref on_close; napi_ref on_connect; napi_ref realloc; + bool destroyed; } utp_napi_connection_t; typedef struct { @@ -175,12 +182,11 @@ on_uv_read (uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct s utp_napi_parse_address((struct sockaddr *) addr, ip, &port); UTP_NAPI_CALLBACK(self->on_message, { - napi_value ret; napi_value argv[3]; napi_create_int32(env, nread, &(argv[0])); napi_create_uint32(env, port, &(argv[1])); napi_create_string_utf8(env, ip, NAPI_AUTO_LENGTH, &(argv[2])); - NAPI_MAKE_CALLBACK_AND_ALLOC(env, NULL, ctx, callback, 3, argv, ret, nread) + NAPI_MAKE_CALLBACK_AND_ALLOC(env, NULL, ctx, callback, 3, argv, nread) }) } @@ -189,7 +195,12 @@ on_uv_close (uv_handle_t *handle) { utp_napi_t *self = (utp_napi_t *) handle->data; self->pending_close--; - if (self->pending_close > 0) return; + if (self->pending_close == 1) { + uv_close((uv_handle_t *) &(self->handle), on_uv_close); + } + if (self->pending_close > 0) { + return; + } UTP_NAPI_CALLBACK(self->on_close, { NAPI_MAKE_CALLBACK(env, NULL, ctx, callback, 0, NULL, NULL); @@ -219,6 +230,12 @@ on_utp_firewall (utp_callback_arguments *a) { inline static void utp_napi_connection_destroy (utp_napi_connection_t *self) { + if (self->destroyed) { + return; + } + if (self->buf.base == NULL) { + return; + } UTP_NAPI_CALLBACK(self->on_close, { NAPI_MAKE_CALLBACK(env, NULL, ctx, callback, 0, NULL, NULL) }) @@ -226,6 +243,7 @@ utp_napi_connection_destroy (utp_napi_connection_t *self) { self->env = env; self->buf.base = NULL; self->buf.len = 0; + self->destroyed = true; napi_delete_reference(self->env, self->ctx); napi_delete_reference(self->env, self->on_read); @@ -258,12 +276,14 @@ on_utp_state_change (utp_callback_arguments *a) { } case UTP_STATE_EOF: { + if (self->destroyed) { + return 0; + } if (self->recv_packet_size) { UTP_NAPI_CALLBACK(self->on_read, { - napi_value ret; napi_value argv[1]; napi_create_uint32(env, self->recv_packet_size, &(argv[0])); - NAPI_MAKE_CALLBACK_AND_ALLOC(env, NULL, ctx, callback, 1, argv, ret, self->recv_packet_size) + NAPI_MAKE_CALLBACK_AND_ALLOC(env, NULL, ctx, callback, 1, argv, self->recv_packet_size) self->recv_packet_size = 0; }) } @@ -342,10 +362,9 @@ on_utp_read (utp_callback_arguments *a) { } UTP_NAPI_CALLBACK(self->on_read, { - napi_value ret; napi_value argv[1]; napi_create_uint32(env, self->recv_packet_size, &(argv[0])); - NAPI_MAKE_CALLBACK_AND_ALLOC(env, NULL, ctx, callback, 1, argv, ret, self->recv_packet_size) + NAPI_MAKE_CALLBACK_AND_ALLOC(env, NULL, ctx, callback, 1, argv, self->recv_packet_size) self->recv_packet_size = 0; }) @@ -425,7 +444,6 @@ NAPI_METHOD(utp_napi_close) { err = uv_udp_recv_stop(&(self->handle)); if (err < 0) UTP_NAPI_THROW(err) - uv_close((uv_handle_t *) &(self->handle), on_uv_close); uv_close((uv_handle_t *) &(self->timer), on_uv_close); return NULL; diff --git a/index.js b/index.js index 7295799..0118413 100644 --- a/index.js +++ b/index.js @@ -5,8 +5,6 @@ const events = require('events') const dns = require('dns') const set = require('unordered-set') -const EMPTY = Buffer.alloc(0) - module.exports = UTP function UTP (opts) { @@ -132,16 +130,17 @@ UTP.prototype._closeMaybe = function () { if (this._closing && !this.connections.length && !this._sending.length && this._inited && !this._closed) { this._closed = true binding.utp_napi_close(this._handle) + } else { + for (const conn of this.connections) { + conn.destroy(new Error('server closed')) + } } } UTP.prototype.connect = function (port, ip) { if (!this._inited) this.bind() if (!ip) ip = '127.0.0.1' - const conn = new Connection(this, port, ip, null, this._allowHalfOpen) - if (!isIP(ip)) conn._resolveAndConnect(port, ip) - else conn._connect(port, ip || '127.0.0.1') - return conn + return new Connection(this, port, ip, null, this._allowHalfOpen) } UTP.prototype.listen = function (port, ip, onlistening) { @@ -197,19 +196,20 @@ UTP.prototype._realloc = function () { UTP.prototype._onmessage = function (size, port, address) { if (size < 0) { this.emit('error', new Error('Read failed (status: ' + size + ')')) - return EMPTY + return } const message = this._buffer.slice(this._offset, this._offset += size) this.emit('message', message, { address, family: 'IPv4', port }) if (this._buffer.length - this._offset <= 65536) { + // max package buffer is 64kb and we wanna make sure we have room for that + // returning the buffer indicates to the native code that + // the buffer has changed this._buffer = Buffer.allocUnsafe(this._buffer.length) this._offset = 0 return this._buffer } - - return EMPTY } UTP.prototype._onsend = function (send, status) { diff --git a/lib/connection.js b/lib/connection.js index 122f0bc..0b0f6e7 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -1,11 +1,9 @@ const binding = require('./binding') -const stream = require('readable-stream') -const util = require('util') +const streamx = require('streamx') const unordered = require('unordered-set') const dns = require('dns') const timeout = require('timeout-refresh') -const EMPTY = Buffer.alloc(0) const UTP_ERRORS = [ 'UTP_ECONNREFUSED', 'UTP_ECONNRESET', @@ -13,224 +11,274 @@ const UTP_ERRORS = [ 'UTP_UNKNOWN' ] -module.exports = Connection - -function Connection (utp, port, address, handle, halfOpen) { - stream.Duplex.call(this) - - this.remoteAddress = address - this.remoteFamily = 'IPv4' - this.remotePort = port - this.destroyed = false - - this._index = -1 - this._utp = utp - this._handle = handle || Buffer.alloc(binding.sizeof_utp_napi_connection_t) - this._buffer = Buffer.allocUnsafe(65536 * 2) - this._offset = 0 - this._view = new Uint32Array(this._handle.buffer, this._handle.byteOffset, 2) - this._callback = null - this._writing = null - this._error = null - this._connected = false - this._needsConnect = !handle - this._timeout = null - this._contentSize = 0 - this._allowOpen = halfOpen ? 2 : 1 - - this.on('finish', this._shutdown) - - binding.utp_napi_connection_init(this._handle, this, this._buffer, - this._onread, - this._ondrain, - this._onend, - this._onerror, - this._onclose, - this._onconnect, - this._realloc - ) - - unordered.add(utp.connections, this) - if (utp.maxConnections && utp.connections.length >= utp.maxConnections) { - utp.firewall(true) +module.exports = class Connection extends streamx.Duplex { + constructor (utp, port, address, handle, halfOpen) { + super({ + mapWritable: Buffer.from + }) + + this.remoteAddress = address || '127.0.0.1' + this.remoteFamily = 'IPv4' + this.remotePort = port + + this._index = -1 + this._utp = utp + this._connectCalled = !!handle + this._handle = handle || Buffer.alloc(binding.sizeof_utp_napi_connection_t) + this._buffer = Buffer.allocUnsafe(65536 * 2) + this._offset = 0 + this._view = new Uint32Array(this._handle.buffer, this._handle.byteOffset, 2) + this._callback = null + this._writing = null + this._timeout = null + this._contentSize = 0 + this._endCalled = false + this._inited = false + if (!halfOpen) { + this.once('end', () => { + if (this.writable) { + this.end() + } + }) + this.once('finish', () => { + if (this.writable) { + this.push(null) + } + }) + } + + this.once('error', unregister) + this.once('close', unregister) + + function unregister () { + this.off('error', unregister) + this.off('close', unregister) + process.nextTick(() => { + unordered.remove(utp.connections, this) + if (!utp.maxConnections || utp.connections.length < utp.maxConnections) { + utp.firewall(false) + } + utp._closeMaybe() + }) + } + + unordered.add(utp.connections, this) + if (utp.maxConnections && utp.connections.length >= utp.maxConnections) { + utp.firewall(true) + } } -} - -util.inherits(Connection, stream.Duplex) - -Connection.prototype.setTimeout = function (ms, ontimeout) { - if (ontimeout) this.once('timeout', ontimeout) - if (this._timeout) this._timeout.destroy() - this._timeout = timeout(ms, this._ontimeout, this) -} -Connection.prototype._ontimeout = function () { - this.emit('timeout') -} + _open (cb) { + const remoteAddress = this.remoteAddress + if (this._connectCalled) { + this._init(cb) + } else { + if (!isIP(remoteAddress)) this._resolveAndConnect(cb) + else this._connect(cb) + } + } -Connection.prototype.setInteractive = function (interactive) { - this.setPacketSize(this.interactive ? 0 : 65536) -} + _init (initCb) { + this._inited = true + let opened = false + binding.utp_napi_connection_init(this._handle, this, this._buffer, + onread, + this._ondrain, + this._onend, + onerror, + this._onclose, + onconnect, + this._realloc + ) + + function onread (size) { + if (!opened) { + onconnect.call(this) + } + return this._onread(size) + } + + function onerror (code) { + const error = createUTPError(code) + if (!opened) { + opened = true + initCb(error) + } else { + this.destroy(error) + } + } + + function onconnect () { + if (opened) return + opened = true + if (this._timeout) this._timeout.refresh() + + if (this._writing) { + const cb = this._callback + const data = this._writing[0] + this._callback = null + this._writing = null + this._write(data, cb) + } + process.nextTick(() => this.emit('connect')) + initCb() + } + } -Connection.prototype.setContentSize = function (size) { - this._view[0] = size < 65536 ? (size >= 0 ? size : 0) : 65536 - this._contentSize = size -} + _final (cb) { + // Sends the write-end message + binding.utp_napi_connection_shutdown(this._handle) + this._connectCalled = false + // The final message is sent out, + process.nextTick(cb) + } -Connection.prototype.setPacketSize = function (size) { - if (size > 65536) size = 65536 - this._view[0] = size - this._contentSize = 0 -} + _destroy (cb) { + if (!this._inited) { + return cb() + } + this._oncloseHook = cb + if (this._connectCalled) { + binding.utp_napi_connection_close(this._handle) + } else { + binding.utp_napi_connection_on_close(this._handle) + } + } -Connection.prototype.address = function () { - if (this.destroyed) return null - return this._utp.address() -} + setTimeout (ms, ontimeout) { + if (ontimeout) this.once('timeout', ontimeout) + if (this._timeout) this._timeout.destroy() + this._timeout = timeout(ms, this._ontimeout, this) + } -Connection.prototype._read = function () { - // TODO: backpressure -} + _ontimeout () { + this.emit('timeout') + } -Connection.prototype._write = function (data, enc, cb) { - if (this.destroyed) return + setInteractive (interactive) { + this.setPacketSize(interactive ? 0 : 65536) + } - if (!this._connected || !binding.utp_napi_connection_write(this._handle, data)) { - this._callback = cb - this._writing = new Array(1) - this._writing[0] = data - return + setContentSize (size) { + this._view[0] = size < 65536 ? (size >= 0 ? size : 0) : 65536 + this._contentSize = size } - cb(null) -} + setPacketSize (size) { + if (size > 65536) size = 65536 + this._view[0] = size + this._contentSize = 0 + } -Connection.prototype._writev = function (datas, cb) { - if (this.destroyed) return + address () { + if (this.destroyed) return null + return this._utp.address() + } - const bufs = new Array(datas.length) - for (var i = 0; i < datas.length; i++) bufs[i] = datas[i].chunk + _read (cb) { + // TODO: backpressure + cb(null) + } - if (bufs.length > 256) return this._write(Buffer.concat(bufs), null, cb) + _write (data, cb) { + if (this.destroyed) return - if (!binding.utp_napi_connection_writev(this._handle, bufs)) { - this._callback = cb - this._writing = bufs - return + if (!binding.utp_napi_connection_write(this._handle, data)) { + this._callback = cb + this._writing = [data] + return + } + cb(null) } - cb(null) -} + _writev (datas, cb) { + if (this.destroyed) return -Connection.prototype._realloc = function () { - this._buffer = Buffer.allocUnsafe(this._buffer.length) - this._offset = 0 - return this._buffer -} + const bufs = new Array(datas.length) + for (var i = 0; i < datas.length; i++) bufs[i] = datas[i].chunk -Connection.prototype._onread = function (size) { - if (!this._connected) this._onconnect() // makes the server wait for reads before writes - if (this._timeout) this._timeout.refresh() + if (bufs.length > 256) return this._write(Buffer.concat(bufs), null, cb) - const buf = this._buffer.slice(this._offset, this._offset += size) + if (!binding.utp_napi_connection_writev(this._handle, bufs)) { + this._callback = cb + this._writing = bufs + return + } - if (this._contentSize) { - if (size > this._contentSize) size = this._contentSize - this._contentSize -= size - if (this._contentSize < 65536) this._view[0] = this._contentSize + cb(null) } - this.push(buf) - - // 64kb + 4kb as max package buffer is 64kb and we wanna make sure we have room for that - // plus the next udp package - if (this._buffer.length - this._offset <= 69632) { + _realloc () { this._buffer = Buffer.allocUnsafe(this._buffer.length) this._offset = 0 return this._buffer } - return EMPTY -} - -Connection.prototype._ondrain = function () { - this._writing = null - const cb = this._callback - this._callback = null - cb(null) -} - -Connection.prototype._onclose = function () { - unordered.remove(this._utp.connections, this) - if (!this._utp.maxConnections || this._utp.connections.length < this._utp.maxConnections) { - this._utp.firewall(false) + _onread (size) { + if (this._timeout) this._timeout.refresh() + + const buf = this._buffer.slice(this._offset, this._offset += size) + if (this._contentSize) { + if (size > this._contentSize) size = this._contentSize + this._contentSize -= size + if (this._contentSize < 65536) this._view[0] = this._contentSize + } + + this.push(buf) + + if (this._buffer.length - this._offset <= 69632) { + // 64kb + 4kb as max package buffer is 64kb and we wanna make sure we have room for that + // plus the next udp package, returning the buffer indicates to the native code that + // it should use the new buffer now + this._buffer = Buffer.allocUnsafe(this._buffer.length) + this._offset = 0 + return this._buffer + } } - this._handle = null - if (this._error) this.emit('error', this._error) - this.emit('close') - this._utp._closeMaybe() -} - -Connection.prototype._onerror = function (status) { - this.destroy(createUTPError(status)) -} - -Connection.prototype._onend = function () { - if (this._timeout) this._timeout.destroy() - this.push(null) - this._destroyMaybe() -} - -Connection.prototype._resolveAndConnect = function (port, host) { - const self = this - dns.lookup(host, function (err, ip) { - if (err) return self.destroy(err) - if (!ip) return self.destroy(new Error('Could not resolve ' + host)) - self._connect(port, ip) - }) -} - -Connection.prototype._connect = function (port, ip) { - if (this.destroyed) return - this._needsConnect = false - this.remoteAddress = ip - binding.utp_napi_connect(this._utp._handle, this._handle, port, ip) -} -Connection.prototype._onconnect = function () { - if (this._timeout) this._timeout.refresh() - - this._connected = true - if (this._writing) { + _ondrain () { + this._writing = null const cb = this._callback - const data = this._writing[0] this._callback = null - this._writing = null - this._write(data, null, cb) + cb(null) } - this.emit('connect') -} -Connection.prototype.destroy = function (err) { - if (this.destroyed) return - this.destroyed = true - if (err) this._error = err - if (this._needsConnect) return process.nextTick(onbindingclose, this) - binding.utp_napi_connection_close(this._handle) -} + _onclose () { + if (this._oncloseHook) { + const cb = this._oncloseHook + this._oncloseHook = null + process.nextTick(cb) + } else { + this.destroy(new Error('Connection closed.')) + } + } -Connection.prototype._destroyMaybe = function () { - if (this._allowOpen && !--this._allowOpen) this.destroy() -} + _onend () { + if (this._timeout) this._timeout.destroy() + if (!this._endCalled) { + this._endCalled = true + this.push(null) + } + } -Connection.prototype._shutdown = function () { - if (this.destroyed) return - binding.utp_napi_connection_shutdown(this._handle) - this._destroyMaybe() -} + _resolveAndConnect (cb) { + const { remoteAddress: host } = this + dns.lookup(host, (err, ip) => { + if (err) return cb(err) + if (!ip) return cb(new Error(`Could not resolve ${host}`)) + this.remoteAddress = ip + this._connect(cb) + }) + } -function onbindingclose (self) { - binding.utp_napi_connection_on_close(self._handle) + _connect (cb) { + if (this.destroyed) { + return + } + this._connectCalled = true + binding.utp_napi_connect(this._utp._handle, this._handle, this.remotePort, this.remoteAddress) + this._init(cb) + } } function createUTPError (code) { @@ -240,3 +288,7 @@ function createUTPError (code) { err.errno = code return err } + +function isIP (ip) { + return /^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$/.test(ip) +} diff --git a/package.json b/package.json index c4cadf7..63c145a 100644 --- a/package.json +++ b/package.json @@ -6,7 +6,7 @@ "gypfile": true, "scripts": { "test-timeouts": "tape test/timeouts.js", - "test": "standard && tape test/net.js test/sockets.js test/udp.js", + "test": "standard && tape test/*.js", "install": "node-gyp-build", "fetch-libutp": "git submodule update --recursive --init", "prebuild": "prebuildify --napi --strip", @@ -21,7 +21,7 @@ "dependencies": { "napi-macros": "^2.0.0", "node-gyp-build": "^4.2.0", - "readable-stream": "^3.0.2", + "streamx": "^2.10.3", "timeout-refresh": "^1.0.0", "unordered-set": "^2.0.1" }, diff --git a/test/net.js b/test/net.js index 77fcf6a..149835d 100644 --- a/test/net.js +++ b/test/net.js @@ -1,4 +1,5 @@ const tape = require('tape') +const { Writable, pipeline } = require('streamx') const utp = require('../') tape('server + connect', function (t) { @@ -9,15 +10,19 @@ tape('server + connect', function (t) { socket.write('hello mike') socket.end() }) + server.once('close', () => t.end()) server.listen(function () { var socket = utp.connect(server.address().port) - - socket.on('connect', function () { - socket.destroy() + pipeline( + socket, + new Writable(), + error => t.error(error) + ) + socket.once('connect', function () { + socket.end() server.close() t.ok(connected, 'connected successfully') - t.end() }) socket.write('hello joe') @@ -29,9 +34,13 @@ tape('server + connect with resolve', function (t) { const server = utp.createServer(function (socket) { connected = true + socket.once('open', function () { + socket.end() + socket.pipe(new Writable()) + }) socket.write('hello mike') - socket.end() }) + server.once('close', () => t.end()) server.listen(function () { const socket = utp.connect(server.address().port, 'localhost') @@ -40,7 +49,6 @@ tape('server + connect with resolve', function (t) { socket.destroy() server.close() t.ok(connected, 'connected successfully') - t.end() }) socket.write('hello joe') @@ -56,25 +64,32 @@ tape('bad resolve', function (t) { t.fail('should not connect') }) - socket.on('error', function () { - t.pass('errored') - }) - - socket.on('close', function () { - t.pass('closed') - t.end() - }) + pipeline( + socket, + new Writable(), + error => { + t.ok(error) + t.pass('closed') + t.end() + } + ) }) tape('server immediate close', function (t) { - t.plan(2) + t.plan(3) const server = utp.createServer(function (socket) { socket.write('hi') - socket.end() - server.close(function () { - t.pass('closed') + socket.once('open', function () { + socket.end() }) + pipeline( + socket, + new Writable(), + error => { + t.error(error) + } + ) }) server.listen(0, function () { @@ -84,10 +99,16 @@ tape('server immediate close', function (t) { socket.once('connect', function () { socket.end() }) - - socket.on('close', function () { - t.pass('client closed') - }) + pipeline( + socket, + new Writable(), + error => { + t.error(error) + server.close(function () { + t.pass('closed') + }) + } + ) }) }) @@ -136,9 +157,11 @@ tape('server listens on a port in use', function (t) { tape('echo server', function (t) { const server = utp.createServer(function (socket) { - socket.pipe(socket) socket.on('data', function (data) { t.same(data, Buffer.from('hello')) + if (socket.writable) { + socket.write(data) + } }) socket.on('end', function () { socket.end() @@ -162,10 +185,12 @@ tape('echo server back and fourth', function (t) { var echoed = 0 const server = utp.createServer(function (socket) { - socket.pipe(socket) socket.on('data', function (data) { echoed++ t.same(data, Buffer.from('hello')) + if (socket.writable) { + socket.write(data) + } }) }) @@ -228,8 +253,12 @@ tape('echo big message with setContentSize', function (t) { const server = utp.createServer(function (socket) { socket.setContentSize(big.length) - socket.on('data', () => packets++) - socket.pipe(socket) + socket.on('data', (data) => { + packets++ + if (socket.writable) { + socket.write(data) + } + }) }) server.listen(0, function () { @@ -335,9 +364,8 @@ tape('flushes', function (t) { var sent = '' const server = utp.createServer(function (socket) { var buf = '' - socket.setEncoding('utf-8') socket.on('data', function (data) { - buf += data + buf += data.toString('utf8') }) socket.on('end', function () { server.close() @@ -350,7 +378,7 @@ tape('flushes', function (t) { server.listen(0, function () { const socket = utp.connect(server.address().port) for (var i = 0; i < 50; i++) { - socket.write(i + '\n') + socket.write(Buffer.from(i + '\n')) sent += i + '\n' } socket.end() @@ -359,24 +387,24 @@ tape('flushes', function (t) { tape('close waits for connections to close', function (t) { var sent = '' + var buf = '' const server = utp.createServer(function (socket) { - var buf = '' - socket.setEncoding('utf-8') socket.on('data', function (data) { - buf += data + buf += data.toString('utf8') }) socket.on('end', function () { - socket.end() - t.same(buf, sent) - t.end() + server.close() }) - server.close() + }) + server.on('close', () => { + t.same(buf, sent) + t.end() }) server.listen(0, function () { const socket = utp.connect(server.address().port) for (var i = 0; i < 50; i++) { - socket.write(i + '\n') + socket.write(Buffer.from(i + '\n')) sent += i + '\n' } socket.end() @@ -405,7 +433,7 @@ tape('disable half open', function (t) { }) tape('timeout', function (t) { - t.plan(3) + t.plan(5) var serverClosed = false var clientClosed = false @@ -419,10 +447,16 @@ tape('timeout', function (t) { socket.resume() socket.write('hi') socket.on('close', function () { + t.pass('server-socket closed') serverClosed = true done() }) }) + server.on('close', () => { + t.ok(clientClosed) + t.ok(serverClosed) + t.end() + }) server.listen(0, function () { const socket = utp.connect(server.address().port) @@ -432,6 +466,7 @@ tape('timeout', function (t) { socket.destroy() }) socket.on('close', function () { + t.pass('client-socket closed') clientClosed = true done() }) @@ -440,8 +475,5 @@ tape('timeout', function (t) { function done () { if (--missing) return server.close() - t.ok(clientClosed) - t.ok(serverClosed) - t.end() } }) diff --git a/test/timeouts.js b/test/timeouts.js index 42f2f91..2dd50f8 100644 --- a/test/timeouts.js +++ b/test/timeouts.js @@ -6,6 +6,7 @@ tape('connection timeout. this may take >20s', function (t) { const socket = dgram.createSocket('udp4') socket.bind(0, function () { const connection = utp.connect(socket.address().port) + connection.resume() connection.on('error', function (err) { socket.close() t.same(err.message, 'UTP_ETIMEDOUT') @@ -30,6 +31,7 @@ tape('write timeout. this may take >20s', function (t) { server.listen(function () { connection = utp.connect(server.address().port) + connection.resume() connection.on('connect', function () { t.pass('connected to server') }) @@ -44,9 +46,15 @@ tape('server max connections. this may take >20s', function (t) { var inc = 0 const server = utp.createServer({ allowHalfOpen: false }, function (socket) { inc++ - t.ok(inc < 3) + if (inc > 2) { + t.fail('too many connections') + } socket.write('hi') }) + server.on('close', function () { + t.pass('should error') + t.end() + }) server.maxConnections = 2 server.listen(0, function () { @@ -61,13 +69,13 @@ tape('server max connections. this may take >20s', function (t) { c.on('connect', function () { t.fail('only 2 connections') }) - c.on('error', function () { + c.on('error', function (error) { + t.equals(error.message, 'UTP_ETIMEDOUT') a.destroy() b.destroy() c.destroy() - server.close() t.pass('should error') - t.end() + server.close() }) }) }) From 54790942507088de2af7d2368fc6e32290d9f4f1 Mon Sep 17 00:00:00 2001 From: Martin Heidegger Date: Tue, 25 May 2021 16:00:00 +0900 Subject: [PATCH 2/3] reverted simplification for bindings --- binding.cc | 25 ++++++++++--------------- index.js | 5 ++++- lib/connection.js | 3 +++ 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/binding.cc b/binding.cc index dce6614..d24ece2 100644 --- a/binding.cc +++ b/binding.cc @@ -15,18 +15,7 @@ #define NAPI_MAKE_CALLBACK_AND_ALLOC(env, nil, ctx, cb, n, argv, nread) \ napi_value res; \ - napi_status stat = napi_make_callback(env, nil, ctx, cb, n, argv, &res); \ - if (stat == napi_ok) { \ - bool is_buf; \ - napi_is_buffer(env, res, &is_buf); \ - if (is_buf) { \ - UTP_NAPI_BUFFER_ALLOC(self, res, nread) \ - } else { \ - size_t size = nread <= 0 ? 0 : nread; \ - self->buf.base += size; \ - self->buf.len -= size; \ - } \ - } else if (stat == napi_pending_exception) { \ + if (napi_make_callback(env, nil, ctx, cb, n, argv, &res) == napi_pending_exception) { \ napi_value fatal_exception; \ napi_get_and_clear_last_exception(env, &fatal_exception); \ napi_fatal_exception(env, fatal_exception); \ @@ -37,7 +26,7 @@ }) \ } \ } else { \ - printf("[UTP-NATIVE]: Unexpected result of callback %i\n", stat); \ + UTP_NAPI_BUFFER_ALLOC(self, res, nread) \ } #define UTP_NAPI_CALLBACK(fn, src) \ @@ -55,8 +44,14 @@ char *buf; \ size_t buf_len; \ napi_get_buffer_info(env, ret, (void **) &buf, &buf_len); \ - self->buf.base = buf; \ - self->buf.len = buf_len; \ + if (buf_len == 0) { \ + size_t size = nread <= 0 ? 0 : nread; \ + self->buf.base += size; \ + self->buf.len -= size; \ + } else { \ + self->buf.base = buf; \ + self->buf.len = buf_len; \ + } typedef struct { uint32_t min_recv_packet_size; diff --git a/index.js b/index.js index 0118413..857c6ef 100644 --- a/index.js +++ b/index.js @@ -7,6 +7,8 @@ const set = require('unordered-set') module.exports = UTP +const EMPTY = Buffer.alloc(0) + function UTP (opts) { if (!(this instanceof UTP)) return new UTP(opts) events.EventEmitter.call(this) @@ -196,7 +198,7 @@ UTP.prototype._realloc = function () { UTP.prototype._onmessage = function (size, port, address) { if (size < 0) { this.emit('error', new Error('Read failed (status: ' + size + ')')) - return + return EMPTY } const message = this._buffer.slice(this._offset, this._offset += size) @@ -210,6 +212,7 @@ UTP.prototype._onmessage = function (size, port, address) { this._offset = 0 return this._buffer } + return EMPTY } UTP.prototype._onsend = function (send, status) { diff --git a/lib/connection.js b/lib/connection.js index 0b0f6e7..32dcaf8 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -4,6 +4,7 @@ const unordered = require('unordered-set') const dns = require('dns') const timeout = require('timeout-refresh') +const EMPTY = Buffer.alloc(0) const UTP_ERRORS = [ 'UTP_ECONNREFUSED', 'UTP_ECONNRESET', @@ -234,6 +235,8 @@ module.exports = class Connection extends streamx.Duplex { this._offset = 0 return this._buffer } + + return EMPTY } _ondrain () { From cdbb968c4e6d44d78997fa2257cb9da72a010d7b Mon Sep 17 00:00:00 2001 From: Martin Heidegger Date: Tue, 25 May 2021 16:01:54 +0900 Subject: [PATCH 3/3] limiting package script tests to fast tests --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 63c145a..696dde7 100644 --- a/package.json +++ b/package.json @@ -6,7 +6,7 @@ "gypfile": true, "scripts": { "test-timeouts": "tape test/timeouts.js", - "test": "standard && tape test/*.js", + "test": "standard && tape test/net.js test/sockets.js test/udp.j", "install": "node-gyp-build", "fetch-libutp": "git submodule update --recursive --init", "prebuild": "prebuildify --napi --strip",