diff --git a/.github/workflows/test-node.yml b/.github/workflows/test-node.yml new file mode 100644 index 0000000..a93c431 --- /dev/null +++ b/.github/workflows/test-node.yml @@ -0,0 +1,23 @@ +name: Build Status +on: + push: + branches: + - master + pull_request: + branches: + - master +jobs: + build: + strategy: + matrix: + node-version: [lts/*] + os: [ubuntu-latest, macos-latest, windows-latest] + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v2 + - name: Use Node.js ${{ matrix.node-version }} + uses: actions/setup-node@v2 + with: + node-version: ${{ matrix.node-version }} + - run: npm install + - run: npm test diff --git a/.gitignore b/.gitignore index 8f8bb55..c749896 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ node_modules +package-lock.json build +coverage sandbox.js sandbox/ diff --git a/README.md b/README.md index a56e180..62901b0 100644 --- a/README.md +++ b/README.md @@ -165,7 +165,7 @@ receives a client message because of that. The socket api allows you to reuse the same underlying UDP socket to both connect to other clients on accept incoming connections. It also mimicks the node core [dgram socket](https://nodejs.org/api/dgram.html#dgram_class_dgram_socket) api. -#### `socket = utp([options])` +#### `socket = new utp.Socket([options])` Create a new utp socket. @@ -249,7 +249,7 @@ npm install To rebuild it simply do: ```sh -node-gyp build +npx node-gyp-build ``` ## License diff --git a/binding.cc b/binding.cc index 46db4f6..6c80182 100644 --- a/binding.cc +++ b/binding.cc @@ -314,12 +314,14 @@ on_utp_accept (utp_callback_arguments *a) { napi_value argv[2]; napi_create_uint32(env, port, &(argv[0])); napi_create_string_utf8(env, ip, NAPI_AUTO_LENGTH, &(argv[1])); - napi_value next; - NAPI_MAKE_CALLBACK(env, NULL, ctx, callback, 2, argv, &next) // will never throw due to the event being NTed in js - utp_napi_connection_t *connection; - size_t connection_size; - napi_get_buffer_info(env, next, (void **) &connection, &connection_size); - self->next_connection = connection; + napi_value next = NULL; + NAPI_MAKE_CALLBACK(env, NULL, ctx, callback, 2, argv, &next) + if (next != NULL) { + utp_napi_connection_t *connection; + size_t connection_size; + napi_get_buffer_info(env, next, (void **) &connection, &connection_size); + self->next_connection = connection; + } }) return 0; @@ -636,27 +638,13 @@ NAPI_METHOD(utp_napi_connection_init) { return NULL; } -NAPI_METHOD(utp_napi_connection_on_close) { - // To trigger a manual teardown if connect was never called - // on a client connection +NAPI_METHOD(utp_napi_connection_destroy) { NAPI_ARGV(1) NAPI_ARGV_BUFFER_CAST(utp_napi_connection_t *, self, 0) - utp_napi_connection_destroy(self); - return NULL; -} -NAPI_METHOD(utp_napi_connection_write) { - NAPI_ARGV(2) - NAPI_ARGV_BUFFER_CAST(utp_napi_connection_t *, self, 0) - NAPI_ARGV_BUFFER(buf, 1) - - self->send_buffer_next = self->send_buffer; - self->send_buffer_next->iov_base = buf; - self->send_buffer_next->iov_len = buf_len; - self->send_buffer_missing = 1; + utp_napi_connection_destroy(self); - int drained = utp_napi_connection_drain(self); - NAPI_RETURN_UINT32(drained) + return NULL; } NAPI_METHOD(utp_napi_connection_writev) { @@ -740,10 +728,9 @@ NAPI_INIT() { NAPI_EXPORT_FUNCTION(utp_napi_send_buffer) NAPI_EXPORT_FUNCTION(utp_napi_recv_buffer) NAPI_EXPORT_FUNCTION(utp_napi_connection_init) - NAPI_EXPORT_FUNCTION(utp_napi_connection_write) NAPI_EXPORT_FUNCTION(utp_napi_connection_writev) NAPI_EXPORT_FUNCTION(utp_napi_connection_close) NAPI_EXPORT_FUNCTION(utp_napi_connection_shutdown) - NAPI_EXPORT_FUNCTION(utp_napi_connection_on_close) + NAPI_EXPORT_FUNCTION(utp_napi_connection_destroy) NAPI_EXPORT_FUNCTION(utp_napi_connect) } diff --git a/index.js b/index.js index d6a4414..7650771 100644 --- a/index.js +++ b/index.js @@ -1,300 +1,360 @@ -const binding = require('./lib/binding') -const Connection = require('./lib/connection') -const util = require('util') -const events = require('events') +const EventEmitter = require('events') +const net = require('net') const dns = require('dns') +const { Duplex } = require('streamx') +const timeout = require('timeout-refresh') const set = require('unordered-set') +const b4a = require('b4a') +const binding = require('./lib/binding') +const UTPConnection = require('./lib/utp-connection') +const UTPSocket = require('./lib/utp-socket') -const EMPTY = Buffer.alloc(0) -const IPv4Pattern = /^((?:[0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])[.]){3}(?:[0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])$/ - -module.exports = UTP - -function UTP (opts) { - if (!(this instanceof UTP)) return new UTP(opts) - events.EventEmitter.call(this) - - this.connections = [] - - this._sending = [] - this._sent = [] - this._offset = 0 - this._buffer = Buffer.allocUnsafe(2 * 65536) - this._handle = Buffer.alloc(binding.sizeof_utp_napi_t) - this._nextConnection = Buffer.alloc(binding.sizeof_utp_napi_connection_t) - this._address = null - this._inited = false - this._refed = true - this._closing = false - this._closed = false - this._allowHalfOpen = !opts || opts.allowHalfOpen !== false - this._acceptConnections = new Uint32Array(this._handle.buffer, this._handle.byteOffset + binding.offsetof_utp_napi_t_accept_connections, 1) - this.maxConnections = 0 -} +class Socket extends EventEmitter { + constructor (opts) { + super() -util.inherits(UTP, events.EventEmitter) + this.connections = [] + this.maxConnections = 0 -UTP.createServer = function (opts, onconnection) { - if (typeof opts === 'function') return UTP.createServer(null, opts) - const server = new UTP(opts) - if (onconnection) server.on('connection', onconnection) - return server -} + this._address = null + this._socket = new UTPSocket() + this._allowHalfOpen = !opts || opts.allowHalfOpen !== false -UTP.connect = function (port, host, opts) { - const udp = new UTP(opts) - return udp.connect(port, host).on('close', ononeoffclose) -} + this._socket.onerror = this._onerror.bind(this) + this._socket.onconnection = this._onconnection.bind(this) + this._socket.onmessage = this._onmessage.bind(this) + this._socket.onclose = this._onclose.bind(this) + } -UTP.prototype._init = function () { - this._inited = true + firewall (enable) { + this._socket.firewall(enable) + } - binding.utp_napi_init(this._handle, this, - this._nextConnection, - this._buffer, - this._onmessage, - this._onsend, - this._onconnection, - this._onclose, - this._realloc - ) + address () { + if (!this._socket.bound || this._socket.closing) throw new Error('Socket not bound') + return { + address: this._address, + family: 'IPv4', + port: binding.utp_napi_local_port(this._socket._handle) + } + } - if (!this._refed) this.unref() -} + getRecvBufferSize () { + if (!this._socket.inited) throw new Error('getRecvBufferSize EBADF') + if (this._socket.closing) return 0 + return binding.utp_napi_recv_buffer(this._socket._handle, 0) + } -UTP.prototype.firewall = function (yes) { - this._acceptConnections[0] = yes ? 0 : 1 -} + setRecvBufferSize (n) { + if (!this._socket.inited) throw new Error('setRecvBufferSize EBADF') + if (this._socket.closing) return 0 + return binding.utp_napi_recv_buffer(this._socket._handle, n) + } -UTP.prototype.ref = function () { - if (this._inited) binding.utp_napi_ref(this._handle) - this._refed = true -} + getSendBufferSize () { + if (!this._socket.inited) throw new Error('getSendBufferSize EBADF') + if (this._socket.closing) return 0 + return binding.utp_napi_send_buffer(this._socket._handle, 0) + } -UTP.prototype.unref = function () { - if (this._inited) binding.utp_napi_unref(this._handle) - this._refed = false -} + setSendBufferSize (n) { + if (!this._socket.inited) throw new Error('setSendBufferSize EBADF') + if (this._socket.closing) return 0 + return binding.utp_napi_send_buffer(this._socket._handle, n) + } -UTP.prototype.address = function () { - if (!this._address || this._closing) throw new Error('Socket not bound') - return { - address: this._address, - family: 'IPv4', - port: binding.utp_napi_local_port(this._handle) + setTTL (ttl) { + if (!this._socket.inited) throw new Error('setTTL EBADF') + if (this._socket.closing) return + binding.utp_napi_set_ttl(this._socket._handle, ttl) } -} -UTP.prototype.getRecvBufferSize = function () { - if (!this._inited) throw new Error('getRecvBufferSize EBADF') - if (this._closing) return 0 - return binding.utp_napi_recv_buffer(this._handle, 0) -} + connect (port, ip) { + if (!this._socket.bound) this.bind() + if (!ip) ip = '127.0.0.1' + + return new Connection( + this, + null, + port, + ip, + this._allowHalfOpen + ) + } -UTP.prototype.setRecvBufferSize = function (n) { - if (!this._inited) throw new Error('setRecvBufferSize EBADF') - if (this._closing) return 0 - return binding.utp_napi_recv_buffer(this._handle, n) -} + listen (port, ip, onlistening) { + if (!this._socket.bound) this.bind(port, ip, onlistening) + this.firewall(false) + } -UTP.prototype.getSendBufferSize = function () { - if (!this._inited) throw new Error('getSendBufferSize EBADF') - if (this._closing) return 0 - return binding.utp_napi_send_buffer(this._handle, 0) -} + send (buf, offset, len, port, host, onsent) { + if (!this._socket.bound) this.bind() + if (!net.isIPv4(host)) return this._resolveAndSend(buf, offset, len, port, host, onsent) -UTP.prototype.setSendBufferSize = function (n) { - if (!this._inited) throw new Error('setSendBufferSize EBADF') - if (this._closing) return 0 - return binding.utp_napi_send_buffer(this._handle, n) -} + try { + const request = this._socket.send(buf, offset, len, port, host) -UTP.prototype.setTTL = function (ttl) { - if (!this._inited) throw new Error('setTTL EBADF') - if (this._closing) return - binding.utp_napi_set_ttl(this._handle, ttl) -} + if (onsent) request.onsent = onsent + } catch (err) { + if (onsent) onsent(err) + } + } + + bind (port, ip, onlistening) { + if (typeof port === 'function') return this.bind(0, null, port) + if (typeof ip === 'function') return this.bind(port, null, ip) + if (!port) port = 0 + if (!ip) ip = '0.0.0.0' + + if (this._socket.closed || this._socket.closing) return + + if (this._address) { + this.emit('error', new Error('Socket already bound')) + return + } + + if (onlistening) this.once('listening', onlistening) + + if (!net.isIPv4(ip)) return this._resolveAndBind(port, ip) -UTP.prototype.send = function (buf, offset, len, port, host, cb) { - if (!cb) cb = noop - if (!isIP(host)) return this._resolveAndSend(buf, offset, len, port, host, cb) - if (this._closing) return process.nextTick(cb, new Error('Socket is closed')) - if (!this._address) this.bind(0) + this._address = ip - var send = this._sent.pop() - if (!send) { - send = new SendRequest() - binding.utp_napi_send_request_init(send._handle, send) + try { + this._socket.bind(port, ip) + this.emit('listening') + } catch (err) { + this._address = null + this.emit('error', err) + } } - send._index = this._sending.push(send) - 1 - send._buffer = buf - send._callback = cb + close (cb) { + if (this._socket.closed) return cb() - binding.utp_napi_send(this._handle, send._handle, send._buffer, offset, len, port, host) -} + if (cb) this.once('close', cb) -UTP.prototype._resolveAndSend = function (buf, offset, len, port, host, cb) { - const self = this + this._socket.close() + } - dns.lookup(host, onlookup) + _resolveAndSend (buf, offset, len, port, host, onsent) { + dns.lookup(host, { family: 4 }, (err, ip) => { + if (err) this.emit('error', err) + else this.send(buf, offset, len, port, ip, onsent) + }) + } - function onlookup (err, ip) { - if (err) return cb(err) - if (!ip) return cb(new Error('Could not resolve ' + host)) - self.send(buf, offset, len, port, ip, cb) + _resolveAndBind (port, host) { + dns.lookup(host, { family: 4 }, (err, ip) => { + if (err) this.emit('error', err) + else this.bind(port, ip) + }) } -} -UTP.prototype.close = function (onclose) { - if (this._closed) return process.nextTick(callOnClose, this, onclose) - if (onclose) this.once('close', onclose) - if (this._closing) return - this._closing = true - this._closeMaybe() -} + _onerror (err) { + this.emit('error', err) + } -UTP.prototype._closeMaybe = function () { - if (this._closing && !this.connections.length && !this._sending.length && this._inited && !this._closed) { - this._closed = true - binding.utp_napi_close(this._handle) + _onconnection (port, ip, handle) { + this.emit('connection', new Connection(this, handle, port, ip, this._allowHalfOpen)) } -} -UTP.prototype.connect = function (port, ip) { - if (!this._inited) this.bind() - if (!ip) ip = '127.0.0.1' - const conn = new Connection(this, port, ip, null, this._allowHalfOpen) - if (!isIP(ip)) conn._resolveAndConnect(port, ip) - else conn._connect(port, ip || '127.0.0.1') - return conn -} + _onmessage (buffer, address) { + this.emit('message', buffer, address) + } + + _onclose () { + this.emit('close') + } + + static createServer (opts, onconnection) { + if (typeof opts === 'function') { + onconnection = opts + opts = {} + } + const server = new Socket(opts) + if (onconnection) server.on('connection', onconnection) + return server + } -UTP.prototype.listen = function (port, ip, onlistening) { - if (!this._address) this.bind(port, ip, onlistening) - this.firewall(false) + static connect (port, host, opts) { + const socket = new Socket(opts) + return socket + .connect(port, host) + .on('close', () => socket.close()) + } } -UTP.prototype.bind = function (port, ip, onlistening) { - if (typeof port === 'function') return this.bind(0, null, port) - if (typeof ip === 'function') return this.bind(port, null, ip) - if (!port) port = 0 - if (!ip) ip = '0.0.0.0' +module.exports = Socket.Socket = Socket + +class Connection extends Duplex { + constructor (socket, connection, port, address, halfOpen) { + super({ mapWritable: toBuffer, eagerOpen: true }) + + this.remoteAddress = address + this.remoteFamily = 'IPv4' + this.remotePort = port - if (!this._inited) this._init() - if (this._closing) return + this._index = -1 + this._socket = socket + this._connection = connection || new UTPConnection(this._socket._socket) + this._view = new Uint32Array(this._connection._handle.buffer, this._connection._handle.byteOffset, 2) + this._timeout = null + this._contentSize = 0 - if (this._address) { - this.emit('error', new Error('Socket already bound')) - return + this._opening = null + this._destroying = null + + this._connection.onerror = this._onerror.bind(this) + this._connection.ondata = this._ondata.bind(this) + this._connection.onend = this._onend.bind(this) + this._connection.onconnect = this._onconnect.bind(this) + this._connection.onclose = this._onclose.bind(this) + + set.add(this._socket.connections, this) + + if ( + this._socket.maxConnections > 0 && + this._socket.connections.length >= this._socket.maxConnections + ) { + this._socket.firewall(true) + } + + if (!halfOpen) this.on('end', () => this.end()) } - if (onlistening) this.once('listening', onlistening) - if (!isIP(ip)) return this._resolveAndBind(port, ip) + setTimeout (ms, ontimeout) { + if (ontimeout) this.once('timeout', ontimeout) + if (this._timeout) this._timeout.destroy() + this._timeout = timeout(ms, this._ontimeout, this) + } - this._address = ip + setInteractive (interactive) { + this.setPacketSize(interactive ? 0 : 65536) + } - try { - binding.utp_napi_bind(this._handle, port, ip) - } catch (err) { - this._address = null - process.nextTick(emitError, this, err) - return + setContentSize (size) { + this._view[0] = size < 65536 ? (size >= 0 ? size : 0) : 65536 + this._contentSize = size } - process.nextTick(emitListening, this) -} + setPacketSize (size) { + if (size > 65536) size = 65536 + this._view[0] = size + this._contentSize = 0 + } + + address () { + return this.destroyed ? null : this._socket.address() + } -UTP.prototype._resolveAndBind = function (port, host) { - const self = this + _open (cb) { + if (this._connection.connected) { + if (this._connection.writable) return cb(null) + } else { + if (net.isIPv4(this.remoteAddress)) this._connect() + else this._resolveAndConnect() + } - dns.lookup(host, function (err, ip) { - if (err) return self.emit('error', err) - self.bind(port, ip) - }) -} + this._opening = cb + } -UTP.prototype._realloc = function () { - this._buffer = Buffer.allocUnsafe(this._buffer.length) - this._offset = 0 - return this._buffer -} + _continueOpen (err) { + const cb = this._opening -UTP.prototype._onmessage = function (size, port, address) { - if (size < 0) { - this.emit('error', new Error('Read failed (status: ' + size + ')')) - return EMPTY + if (cb) { + this._opening = null + cb(err) + } } - const message = this._buffer.slice(this._offset, this._offset += size) - this.emit('message', message, { address, family: 'IPv4', port }) + _predestroy () { + this._continueOpen(new Error('Socket was destroyed')) + } - if (this._buffer.length - this._offset <= 65536) { - this._buffer = Buffer.allocUnsafe(this._buffer.length) - this._offset = 0 - return this._buffer + _destroy (cb) { + set.remove(this._socket.connections, this) + + if ( + this._socket.maxConnections <= 0 || + this._socket.connections.length < this._socket.maxConnections + ) { + this._socket.firewall(false) + } + + if (this._connection.closed) cb(null) + else { + this._destroying = cb + this._connection.close() + } } - return EMPTY -} + _final (cb) { + this._connection.shutdown() + cb(null) + } -UTP.prototype._onsend = function (send, status) { - const cb = send._callback + _writev (batch, cb) { + this._connection.writev(batch.length > 256 ? [b4a.concat(batch)] : batch, cb) + } - send._callback = send._buffer = null - set.remove(this._sending, send) - this._sent.push(send) - if (this._closing) this._closeMaybe() + _connect () { + this._connection.connect(this.remotePort, this.remoteAddress) + } - cb(status < 0 ? new Error('Send failed (status: ' + status + ')') : null) -} + _resolveAndConnect () { + dns.lookup(this.remoteAddress, { family: 4 }, (err, ip) => { + if (err) this.destroy(err) + else { + this.remoteAddress = ip + this._connect() + } + }) + } -UTP.prototype._onconnection = function (port, addr) { - const conn = new Connection(this, port, addr, this._nextConnection, this._allowHalfOpen) - process.nextTick(emitConnection, this, conn) - this._nextConnection = Buffer.alloc(binding.sizeof_utp_napi_connection_t) - return this._nextConnection -} + _ontimeout () { + this.emit('timeout') + } -UTP.prototype._onclose = function () { - binding.utp_napi_destroy(this._handle, this._sent.map(toHandle)) - this._handle = null - this.emit('close') -} + _onerror (err) { + this.destroy(err) + } -function SendRequest () { - this._handle = Buffer.alloc(binding.sizeof_utp_napi_send_request_t) - this._buffer = null - this._callback = null - this._index = null -} + _ondata (buffer) { + if (this._timeout) this._timeout.refresh() -function noop () {} + let size = buffer.byteLength -function isIP (ip) { - return IPv4Pattern.test(ip) -} + if (this._contentSize) { + if (size > this._contentSize) size = this._contentSize + this._contentSize -= size + if (this._contentSize < 65536) this._view[0] = this._contentSize + } -function toHandle (obj) { - return obj._handle -} + this.push(buffer) + } -function callOnClose (self, onclose) { - if (onclose) onclose.call(self) -} + _onend () { + if (this._timeout) this._timeout.destroy() + this.push(null) + } -function emitListening (self) { - self.emit('listening') -} + _onconnect () { + this._continueOpen() + this.emit('connect') + } -function emitConnection (self, connection) { - self.emit('connection', connection) -} + _onclose () { + const cb = this._destroying -function emitError (self, err) { - self.emit('error', err) + if (cb) { + this._destroying = null + cb(null) + } else { + this.destroy() + } + } } -function ononeoffclose () { - this._utp.close() +function toBuffer (data) { + return typeof data === 'string' ? b4a.from(data) : data } diff --git a/lib/connection.js b/lib/connection.js deleted file mode 100644 index 122f0bc..0000000 --- a/lib/connection.js +++ /dev/null @@ -1,242 +0,0 @@ -const binding = require('./binding') -const stream = require('readable-stream') -const util = require('util') -const unordered = require('unordered-set') -const dns = require('dns') -const timeout = require('timeout-refresh') - -const EMPTY = Buffer.alloc(0) -const UTP_ERRORS = [ - 'UTP_ECONNREFUSED', - 'UTP_ECONNRESET', - 'UTP_ETIMEDOUT', - 'UTP_UNKNOWN' -] - -module.exports = Connection - -function Connection (utp, port, address, handle, halfOpen) { - stream.Duplex.call(this) - - this.remoteAddress = address - this.remoteFamily = 'IPv4' - this.remotePort = port - this.destroyed = false - - this._index = -1 - this._utp = utp - this._handle = handle || Buffer.alloc(binding.sizeof_utp_napi_connection_t) - this._buffer = Buffer.allocUnsafe(65536 * 2) - this._offset = 0 - this._view = new Uint32Array(this._handle.buffer, this._handle.byteOffset, 2) - this._callback = null - this._writing = null - this._error = null - this._connected = false - this._needsConnect = !handle - this._timeout = null - this._contentSize = 0 - this._allowOpen = halfOpen ? 2 : 1 - - this.on('finish', this._shutdown) - - binding.utp_napi_connection_init(this._handle, this, this._buffer, - this._onread, - this._ondrain, - this._onend, - this._onerror, - this._onclose, - this._onconnect, - this._realloc - ) - - unordered.add(utp.connections, this) - if (utp.maxConnections && utp.connections.length >= utp.maxConnections) { - utp.firewall(true) - } -} - -util.inherits(Connection, stream.Duplex) - -Connection.prototype.setTimeout = function (ms, ontimeout) { - if (ontimeout) this.once('timeout', ontimeout) - if (this._timeout) this._timeout.destroy() - this._timeout = timeout(ms, this._ontimeout, this) -} - -Connection.prototype._ontimeout = function () { - this.emit('timeout') -} - -Connection.prototype.setInteractive = function (interactive) { - this.setPacketSize(this.interactive ? 0 : 65536) -} - -Connection.prototype.setContentSize = function (size) { - this._view[0] = size < 65536 ? (size >= 0 ? size : 0) : 65536 - this._contentSize = size -} - -Connection.prototype.setPacketSize = function (size) { - if (size > 65536) size = 65536 - this._view[0] = size - this._contentSize = 0 -} - -Connection.prototype.address = function () { - if (this.destroyed) return null - return this._utp.address() -} - -Connection.prototype._read = function () { - // TODO: backpressure -} - -Connection.prototype._write = function (data, enc, cb) { - if (this.destroyed) return - - if (!this._connected || !binding.utp_napi_connection_write(this._handle, data)) { - this._callback = cb - this._writing = new Array(1) - this._writing[0] = data - return - } - - cb(null) -} - -Connection.prototype._writev = function (datas, cb) { - if (this.destroyed) return - - const bufs = new Array(datas.length) - for (var i = 0; i < datas.length; i++) bufs[i] = datas[i].chunk - - if (bufs.length > 256) return this._write(Buffer.concat(bufs), null, cb) - - if (!binding.utp_napi_connection_writev(this._handle, bufs)) { - this._callback = cb - this._writing = bufs - return - } - - cb(null) -} - -Connection.prototype._realloc = function () { - this._buffer = Buffer.allocUnsafe(this._buffer.length) - this._offset = 0 - return this._buffer -} - -Connection.prototype._onread = function (size) { - if (!this._connected) this._onconnect() // makes the server wait for reads before writes - if (this._timeout) this._timeout.refresh() - - const buf = this._buffer.slice(this._offset, this._offset += size) - - if (this._contentSize) { - if (size > this._contentSize) size = this._contentSize - this._contentSize -= size - if (this._contentSize < 65536) this._view[0] = this._contentSize - } - - this.push(buf) - - // 64kb + 4kb as max package buffer is 64kb and we wanna make sure we have room for that - // plus the next udp package - if (this._buffer.length - this._offset <= 69632) { - this._buffer = Buffer.allocUnsafe(this._buffer.length) - this._offset = 0 - return this._buffer - } - - return EMPTY -} - -Connection.prototype._ondrain = function () { - this._writing = null - const cb = this._callback - this._callback = null - cb(null) -} - -Connection.prototype._onclose = function () { - unordered.remove(this._utp.connections, this) - if (!this._utp.maxConnections || this._utp.connections.length < this._utp.maxConnections) { - this._utp.firewall(false) - } - this._handle = null - if (this._error) this.emit('error', this._error) - this.emit('close') - this._utp._closeMaybe() -} - -Connection.prototype._onerror = function (status) { - this.destroy(createUTPError(status)) -} - -Connection.prototype._onend = function () { - if (this._timeout) this._timeout.destroy() - this.push(null) - this._destroyMaybe() -} - -Connection.prototype._resolveAndConnect = function (port, host) { - const self = this - dns.lookup(host, function (err, ip) { - if (err) return self.destroy(err) - if (!ip) return self.destroy(new Error('Could not resolve ' + host)) - self._connect(port, ip) - }) -} - -Connection.prototype._connect = function (port, ip) { - if (this.destroyed) return - this._needsConnect = false - this.remoteAddress = ip - binding.utp_napi_connect(this._utp._handle, this._handle, port, ip) -} - -Connection.prototype._onconnect = function () { - if (this._timeout) this._timeout.refresh() - - this._connected = true - if (this._writing) { - const cb = this._callback - const data = this._writing[0] - this._callback = null - this._writing = null - this._write(data, null, cb) - } - this.emit('connect') -} - -Connection.prototype.destroy = function (err) { - if (this.destroyed) return - this.destroyed = true - if (err) this._error = err - if (this._needsConnect) return process.nextTick(onbindingclose, this) - binding.utp_napi_connection_close(this._handle) -} - -Connection.prototype._destroyMaybe = function () { - if (this._allowOpen && !--this._allowOpen) this.destroy() -} - -Connection.prototype._shutdown = function () { - if (this.destroyed) return - binding.utp_napi_connection_shutdown(this._handle) - this._destroyMaybe() -} - -function onbindingclose (self) { - binding.utp_napi_connection_on_close(self._handle) -} - -function createUTPError (code) { - const str = UTP_ERRORS[code < 0 ? 3 : code] - const err = new Error(str) - err.code = str - err.errno = code - return err -} diff --git a/lib/utp-connection.js b/lib/utp-connection.js new file mode 100644 index 0000000..e3d9d48 --- /dev/null +++ b/lib/utp-connection.js @@ -0,0 +1,198 @@ +const set = require('unordered-set') +const b4a = require('b4a') +const binding = require('./binding') + +const EMPTY = b4a.alloc(0) + +const INITIALIZED = 1 +const CONNECTING = 1 << 1 +const CONNECTED = 1 << 2 +const CLOSING = 1 << 3 +const CLOSED = 1 << 4 +const CLOSABLE = 1 << 5 +const WRITABLE = 1 << 6 +const SHUTDOWN = 1 << 7 + +class UTPConnection { + constructor (socket, handle) { + this._socket = socket + this._handle = handle || b4a.alloc(binding.sizeof_utp_napi_connection_t) + + this._index = null + this._state = 0 + this._buffer = b4a.allocUnsafe(65536 * 2) + this._offset = 0 + this._writing = null + + this.onerror = noop + this.onclose = noop + this.onconnect = noop + this.ondata = noop + this.onend = noop + + set.add(this._socket._connections, this) + + // Server connections start out connected, but are only writable after the + // first read + if (handle) { + this._init() + this._state |= (CONNECTED | CLOSABLE) + } + } + + get inited () { + return (this._state & INITIALIZED) !== 0 + } + + get connecting () { + return (this._state & CONNECTING) !== 0 + } + + get connected () { + return (this._state & CONNECTED) !== 0 + } + + get closing () { + return (this._state & CLOSING) !== 0 + } + + get closed () { + return (this._state & CLOSED) !== 0 + } + + get writable () { + return (this._state & WRITABLE) !== 0 + } + + connect (port, ip) { + if ((this._state & (CONNECTED | CONNECTING)) !== 0) throw new Error('Connection is already connected') + if ((this._state & INITIALIZED) === 0) this._init() + + this._socket._ensureBound() + + this._state |= (CONNECTING | CLOSABLE) + + binding.utp_napi_connect(this._socket._handle, this._handle, port, ip) + } + + writev (batch, cb) { + if ((this._state & WRITABLE) === 0) return cb(new Error('Connection is not writable')) + + const drained = binding.utp_napi_connection_writev(this._handle, batch) === 1 + + if (drained) cb(null) + else this._writing = [cb, batch] + + return drained + } + + shutdown () { + if ((this._state & INITIALIZED) === 0) return + if ((this._state & SHUTDOWN) !== 0) return + + this._state &= ~(CONNECTED | CONNECTING | WRITABLE) + this._state |= SHUTDOWN + + binding.utp_napi_connection_shutdown(this._handle) + } + + close () { + if ((this._state & CLOSED) !== 0) return + if ((this._state & INITIALIZED) === 0) return this._onclose() + + if ((this._state & CLOSING) === 0) { + this._state |= CLOSING + + if ((this._state & CLOSABLE) !== 0) { + binding.utp_napi_connection_close(this._handle) + } else { + binding.utp_napi_connection_destroy(this._handle) + } + } + } + + _init () { + if ((this._state & INITIALIZED) !== 0) return + + this._state |= INITIALIZED + + binding.utp_napi_connection_init(this._handle, this, this._buffer, + this._onread, + this._ondrain, + this._onend, + this._onerror, + this._onclose, + this._onconnect, + this._realloc + ) + } + + _onread (size) { + if ((this._state & (WRITABLE | SHUTDOWN)) === 0) this._onconnect() + + const buffer = this._buffer.subarray(this._offset, this._offset += size) + + this.ondata(buffer) + + if (this._buffer.length - this._offset <= 69632) return this._realloc() + + return EMPTY + } + + _ondrain () { + const cb = this._writing[0] + this._wiriting = null + cb(null) + } + + _onend () { + this.onend() + } + + _onerror (code) { + this.onerror(createUTPError(code)) + } + + _onclose () { + set.remove(this._socket._connections, this) + + this._state &= ~CLOSING + this._state |= CLOSED + + if (this._socket.idle) this._socket._onidle() + + this.onclose() + } + + _onconnect () { + this._state &= ~CONNECTING + this._state |= (CONNECTED | WRITABLE) + + this.onconnect() + } + + _realloc () { + this._buffer = b4a.allocUnsafe(this._buffer.length) + this._offset = 0 + return this._buffer + } +} + +module.exports = UTPConnection + +const UTP_ERRORS = [ + 'UTP_ECONNREFUSED', + 'UTP_ECONNRESET', + 'UTP_ETIMEDOUT', + 'UTP_UNKNOWN' +] + +function createUTPError (code) { + const str = UTP_ERRORS[code < 0 ? 3 : code] + const err = new Error(str) + err.code = str + err.errno = code + return err +} + +function noop () {} diff --git a/lib/utp-send-request.js b/lib/utp-send-request.js new file mode 100644 index 0000000..ebce89a --- /dev/null +++ b/lib/utp-send-request.js @@ -0,0 +1,79 @@ +const set = require('unordered-set') +const b4a = require('b4a') +const binding = require('./binding') + +const INITIALIZED = 1 +const SENDING = 1 << 1 + +class UTPSendRequest { + constructor (socket) { + this._socket = socket + + this._index = null + this._handle = b4a.alloc(binding.sizeof_utp_napi_send_request_t) + this._state = 0 + this._buffer = null + + this.onsent = noop + + this._init() + } + + get inited () { + return (this._state & INITIALIZED) !== 0 + } + + get sending () { + return (this._state & SENDING) !== 0 + } + + send (buffer, offset, length, port, ip) { + if ((this._state & SENDING) !== 0) throw new Error('Request is already sending') + + this._state |= SENDING + + set.add(this._socket._sending, this) + + this._buffer = buffer + + binding.utp_napi_send(this._socket._handle, this._handle, + buffer, + offset, + length, + port, + ip + ) + } + + finish (status) { + if ((this._state & SENDING) === 0) throw new Error('Request is not sending') + + this._state &= ~SENDING + + set.remove(this._socket._sending, this) + + this._socket._sent.push(this) + + this._buffer = null + + if (this._socket.idle) this._socket._onidle() + + this.onsent(status < 0 ? new Error('Send failed (status: ' + status + ')') : null) + } + + static toHandle (request) { + return request._handle + } + + _init () { + if ((this._state & INITIALIZED) !== 0) return + + this._state |= INITIALIZED + + binding.utp_napi_send_request_init(this._handle, this) + } +} + +module.exports = UTPSendRequest + +function noop () {} diff --git a/lib/utp-socket.js b/lib/utp-socket.js new file mode 100644 index 0000000..8c718cd --- /dev/null +++ b/lib/utp-socket.js @@ -0,0 +1,182 @@ +const b4a = require('b4a') +const binding = require('./binding') +const UTPConnection = require('./utp-connection') +const UTPSendRequest = require('./utp-send-request') + +const EMPTY = b4a.alloc(0) + +const INITIALIZED = 1 +const BOUND = 1 << 1 +const CLOSING = 1 << 2 +const CLOSED = 1 << 3 +const UNREFED = 1 << 4 + +class UTPSocket { + constructor () { + this._connections = [] + this._sending = [] + this._sent = [] + this._handle = b4a.alloc(binding.sizeof_utp_napi_t) + this._state = 0 + this._nextConnection = b4a.alloc(binding.sizeof_utp_napi_connection_t) + this._buffer = b4a.allocUnsafe(65536 * 2) + this._offset = 0 + this._accept = new Uint32Array( + this._handle.buffer, + this._handle.byteOffset + binding.offsetof_utp_napi_t_accept_connections, + 1 + ) + + this.onmessage = noop + this.onerror = noop + this.onclose = noop + this.onconnection = noop + } + + get inited () { + return (this._state & INITIALIZED) !== 0 + } + + get bound () { + return (this._state & BOUND) !== 0 + } + + get closing () { + return (this._state & CLOSING) !== 0 + } + + get closed () { + return (this._state & CLOSED) !== 0 + } + + get unrefed () { + return (this._state & UNREFED) !== 0 + } + + get idle () { + return this._connections.length === 0 && this._sending.length === 0 + } + + firewall (enable) { + this._accept[0] = enable ? 0 : 1 + } + + ref () { + this._state &= ~UNREFED + + if ((this._state & INITIALIZED) !== 0) binding.utp_napi_ref(this._handle) + } + + unref () { + this._state |= UNREFED + + if ((this._state & INITIALIZED) !== 0) binding.utp_napi_unref(this._handle) + } + + bind (port, ip) { + if ((this._state & BOUND) !== 0) throw new Error('Socket is already bound') + if ((this._state & INITIALIZED) === 0) this._init() + + // This will throw if not successfully bound + binding.utp_napi_bind(this._handle, port, ip) + + this._state |= BOUND + } + + send (buffer, offset, len, port, ip) { + if ((this._state & (CLOSED | CLOSING)) !== 0) throw new Error('Socket is closed') + + this._ensureBound() + + const request = this._sent.pop() || new UTPSendRequest(this) + + request.send(buffer, offset, len, port, ip) + + return request + } + + close () { + if ((this._state & CLOSED) !== 0) return + if ((this._state & INITIALIZED) === 0) return this._onclose() + + if ((this._state & CLOSING) === 0) { + this._state |= CLOSING + + if (this.idle) this._onidle() + } + } + + _init () { + if ((this._state & INITIALIZED) !== 0) return + + this._state |= INITIALIZED + + binding.utp_napi_init(this._handle, this, this._nextConnection, this._buffer, + this._onmessage, + this._onsend, + this._onconnection, + this._onclose, + this._realloc + ) + + if ((this._state & UNREFED) === 0) this.ref() + else this.unref() + } + + _onidle () { + if ((this._state & CLOSED) === 0 && (this._state & CLOSING) !== 0) { + binding.utp_napi_close(this._handle) + } + } + + _onmessage (size, port, ip) { + if (size < 0) { + this.onerror(new Error('Read failed (status: ' + size + ')')) + return EMPTY + } + + const message = this._buffer.subarray(this._offset, this._offset += size) + + this.onmessage(message, { address: ip, family: 'IPv4', port }) + + if (this._buffer.length - this._offset <= 65536) return this._realloc() + + return EMPTY + } + + _onsend (request, status) { + request.finish(status) + } + + _onconnection (port, ip) { + const connection = new UTPConnection(this, this._nextConnection) + + this.onconnection(port, ip, connection) + + this._nextConnection = b4a.alloc(binding.sizeof_utp_napi_connection_t) + return this._nextConnection + } + + _onclose () { + this._state &= ~(BOUND | CLOSING) + this._state |= CLOSED + + binding.utp_napi_destroy(this._handle, this._sent.map(UTPSendRequest.toHandle)) + + this.onclose() + } + + _realloc () { + this._buffer = b4a.allocUnsafe(this._buffer.length) + this._offset = 0 + return this._buffer + } + + _ensureBound () { + if ((this._state & BOUND) === 0) this.bind(0, '127.0.0.1') + } +} + +module.exports = UTPSocket + +function noop () {} diff --git a/package.json b/package.json index cb8edea..4933efd 100644 --- a/package.json +++ b/package.json @@ -5,8 +5,8 @@ "main": "index.js", "gypfile": true, "scripts": { - "test-timeouts": "tape test/timeouts.js", - "test": "standard && tape test/net.js test/sockets.js test/udp.js", + "test-timeouts": "brittle test/timeouts.js", + "test": "standard && brittle test/net.js test/sockets.js test/udp.js", "install": "node-gyp-build", "fetch-libutp": "git submodule update --recursive --init", "prebuild": "prebuildify --napi --strip", @@ -19,16 +19,18 @@ "node": ">=8.12" }, "dependencies": { + "b4a": "^1.1.1", "napi-macros": "^2.0.0", "node-gyp-build": "^4.2.0", - "readable-stream": "^3.0.2", + "queue-tick": "^1.0.0", + "streamx": "^2.12.0", "timeout-refresh": "^1.0.0", "unordered-set": "^2.0.1" }, "devDependencies": { + "brittle": "^2.3.1", "prebuildify": "^4.1.2", - "standard": "^14.3.1", - "tape": "^4.11.0" + "standard": "^17.0.0" }, "repository": { "type": "git", diff --git a/test/net.js b/test/net.js index 77fcf6a..3d25e96 100644 --- a/test/net.js +++ b/test/net.js @@ -1,402 +1,478 @@ -const tape = require('tape') -const utp = require('../') +const test = require('brittle') +const utp = require('..') -tape('server + connect', function (t) { - var connected = false +test('server + connect', (t) => withServer(t, async (server) => { + const close = t.test('connect and close sockets') + close.plan(4) - const server = utp.createServer(function (socket) { - connected = true - socket.write('hello mike') - socket.end() + server.on('connection', (socket) => { + close.pass('server socket connected') + socket + .on('close', () => close.pass('server socket closed')) + .resume() + .end() }) - server.listen(function () { - var socket = utp.connect(server.address().port) - - socket.on('connect', function () { - socket.destroy() - server.close() - t.ok(connected, 'connected successfully') - t.end() - }) - - socket.write('hello joe') + server.listen(() => { + const socket = utp.connect(server.address().port) + socket + .on('connect', () => { + socket.end() + close.pass('client socket connected') + }) + .on('close', () => close.pass('client socket closed')) + .write('foo') }) -}) -tape('server + connect with resolve', function (t) { - var connected = false + await close +})) - const server = utp.createServer(function (socket) { - connected = true - socket.write('hello mike') - socket.end() +test('server + connect with resolve', (t) => withServer(t, async (server) => { + const close = t.test('connect and close sockets') + close.plan(4) + + server.on('connection', (socket) => { + close.pass('server socket connected') + socket + .on('close', () => close.pass('server socket closed')) + .resume() + .end() }) - server.listen(function () { + server.listen(() => { const socket = utp.connect(server.address().port, 'localhost') - - socket.on('connect', function () { - socket.destroy() - server.close() - t.ok(connected, 'connected successfully') - t.end() - }) - - socket.write('hello joe') + socket + .on('connect', () => { + socket.end() + close.pass('client socket connected') + }) + .on('close', () => close.pass('client socket closed')) + .write('foo') }) -}) -tape('bad resolve', function (t) { - t.plan(2) + await close +})) - const socket = utp.connect(10000, 'domain.does-not-exist') +test('emits end and close', (t) => withServer(t, async (server) => { + const end = t.test('end') + end.plan(4) - socket.on('connect', function () { - t.fail('should not connect') + server.on('connection', (socket) => { + socket + .on('end', () => { + end.pass('server socket ended') + socket.end() + }) + .on('close', () => { + end.pass('server socket closed') + }) + .resume() }) - socket.on('error', function () { - t.pass('errored') + server.listen(() => { + const socket = utp.connect(server.address().port) + socket + .on('connect', () => socket.end()) + .on('end', () => end.pass('client socket ended')) + .on('close', () => end.pass('client socket closed')) + .write('foo') }) - socket.on('close', function () { - t.pass('closed') - t.end() - }) -}) + await end +})) -tape('server immediate close', function (t) { - t.plan(2) +test('client immediately destroys', (t) => withServer(t, async (server) => { + const close = t.test('close sockets') + close.plan(1) - const server = utp.createServer(function (socket) { - socket.write('hi') - socket.end() - server.close(function () { - t.pass('closed') - }) + server.on('connection', () => close.fail('server socket connected')) + + server.listen(() => { + const socket = utp.connect(server.address().port) + socket + .on('close', () => close.pass('client socket closed')) + .destroy() + }) + + await close +})) + +// The socket socket for some reason never emits the `close` event. The test is +// therefore skipped for now. +test.skip('client destroys on connect', (t) => withServer(t, async (server) => { + const close = t.test('close sockets') + close.plan(4) + + server.on('connection', (socket) => { + close.pass('server socket connected') + socket + .on('end', () => { + close.pass('server socket ended') + socket.destroy() + }) + .on('close', () => close.pass('server socket closed')) + .resume() }) - server.listen(0, function () { - var socket = utp.connect(server.address().port) + server.listen(() => { + const socket = utp.connect(server.address().port) + socket + .on('connect', () => socket.destroy()) + .on('close', () => close.pass('client socket closed')) + .write('foo') + }) - socket.write('hi') - socket.once('connect', function () { - socket.end() - }) + await close +})) - socket.on('close', function () { - t.pass('client closed') - }) +test('server immediately destroys', (t) => withServer(t, async (server) => { + const close = t.test('close sockets') + close.plan(2) + + server.on('connection', (socket) => { + socket + .on('close', () => close.pass('server socket closed')) + .destroy() }) -}) -tape.skip('only server sends', function (t) { - // this is skipped because it doesn't work. - // utpcat has the same issue so this seems to be a bug - // in libutp it self - // in practice this is less of a problem as most protocols - // exchange a handshake message. would be great to get fixed though - var server = utp.createServer(function (socket) { - socket.write('hi') + server.listen(() => { + const socket = utp.connect(server.address().port) + socket + .on('close', () => close.pass('client socket closed')) + .on('error', () => { /* UTP_ECONNRESET */ }) + .write('foo') }) - server.listen(0, function () { - var socket = utp.connect(server.address().port) + await close +})) - socket.on('data', function (data) { - t.same(data, Buffer.from('hi')) - socket.destroy() - server.close() - }) - }) -}) +test('client sends first', (t) => withServer(t, async (server) => { + const writes = t.test('write and close sockets') + writes.plan(3) -tape('server listens on a port in use', function (t) { - if (Number(process.versions.node.split('.')[0]) === 0) { - t.pass('skipping since node 0.10 forces SO_REUSEADDR') - t.end() - return - } + server.on('connection', (socket) => { + socket + .on('data', (data) => { + writes.alike(data, Buffer.from('foo')) + socket.end() + }) + .on('close', () => writes.pass('server socket closed')) + }) - const server = utp.createServer() - server.listen(0, function () { - const server2 = utp.createServer() - server2.listen(server.address().port, function () { - t.fail('should not be listening') - }) - server2.on('error', function () { - server.close() - server2.close() - t.pass('had error') - t.end() - }) + server.listen(() => { + const socket = utp.connect(server.address().port) + socket + .on('connect', () => { + socket.end('foo') + }) + .on('close', () => writes.pass('client socket closed')) }) -}) -tape('echo server', function (t) { - const server = utp.createServer(function (socket) { - socket.pipe(socket) - socket.on('data', function (data) { - t.same(data, Buffer.from('hello')) - }) - socket.on('end', function () { - socket.end() - }) + await writes +})) + +test('server sends first', (t) => withServer(t, async (server) => { + const writes = t.test('write and close sockets') + writes.plan(4) + + server.on('connection', (socket) => { + socket + .on('close', () => writes.pass('server socket closed')) + .resume() + .end('foo') }) - server.listen(0, function () { + server.listen(() => { const socket = utp.connect(server.address().port) - socket.write('hello') - socket.on('data', function (data) { - socket.end() - server.close() - t.same(data, Buffer.from('hello')) - t.end() + // Due to https://github.com/bittorrent/libutp/issues/118, the server socket + // won't open before the client writes. + socket.setTimeout(100, () => { + writes.pass('client socket timed out') + socket.write('foo') }) + + socket + .on('data', (data) => { + writes.alike(data, Buffer.from('foo')) + socket.end() + }) + .on('close', () => writes.pass('client socket closed')) }) + + await writes +})) + +test('bad resolve', (t) => { + t.plan(2) + + const socket = utp.connect(10000, 'domain.does-not-exist') + socket + .on('connect', () => t.fail('should not connect')) + .on('error', () => t.pass('errored')) + .on('close', () => t.pass('closed')) }) -tape('echo server back and fourth', function (t) { - var echoed = 0 +test('server listens on a port in use', (t) => withServer(t, (a) => withServer(t, async (b) => { + const error = t.test('error on listen') + error.plan(1) - const server = utp.createServer(function (socket) { - socket.pipe(socket) - socket.on('data', function (data) { - echoed++ - t.same(data, Buffer.from('hello')) - }) + a.listen(() => { + b + .on('error', () => error.pass('had error')) + .listen(a.address().port, () => { + error.fail('should not be listening') + }) }) - server.listen(0, function () { - const socket = utp.connect(server.address().port) + await error +}))) - var rounds = 10 +test('echo server', (t) => withServer(t, async (server) => { + const writes = t.test('write and close sockets') + writes.plan(4) - socket.write('hello') - socket.on('data', function (data) { - if (--rounds) return socket.write(data) - socket.end() - server.close() - t.same(echoed, 10) - t.same(Buffer.from('hello'), data) - t.end() - }) + server.on('connection', (socket) => { + socket.pipe(socket) + socket + .on('data', (data) => writes.alike(data, Buffer.from('hello'))) + .on('close', () => writes.pass('server socket closed')) }) -}) -tape('echo big message', function (t) { - var packets = 0 + server.listen(() => { + const socket = utp.connect(server.address().port) + socket + .on('data', (data) => writes.alike(data, Buffer.from('hello'))) + .on('close', () => writes.pass('client socket closed')) + .end('hello') + }) - const big = Buffer.alloc(8 * 1024 * 1024) - big.fill('yolo') + await writes +})) - const server = utp.createServer(function (socket) { - socket.on('data', () => packets++) - socket.pipe(socket) +test('echo server back and fourth', (t) => withServer(t, async (server) => { + const writes = t.test('write and close sockets') + writes.plan(22) + + let echoed = 0 + + server.on('connection', (socket) => { + socket + .on('data', (data) => { + echoed++ + writes.alike(data, Buffer.from('hello')) + }) + .on('close', () => writes.pass('server socket closed')) + .pipe(socket) }) - server.listen(0, function () { - const then = Date.now() + server.listen(() => { const socket = utp.connect(server.address().port) - const buffer = Buffer.alloc(big.length) - var ptr = 0 + let rounds = 10 - socket.write(big) - socket.on('data', function (data) { - packets++ - data.copy(buffer, ptr) - ptr += data.length - if (big.length === ptr) { - socket.end() - server.close() - t.same(buffer, big) - t.pass('echo took ' + (Date.now() - then) + 'ms (' + packets + ' packets)') - t.end() - } - }) + socket + .on('data', (data) => { + writes.alike(data, Buffer.from('hello')) + if (--rounds) socket.write(data) + else socket.end() + }) + .on('close', () => writes.pass('client socket closed')) + .write('hello') }) -}) -tape('echo big message with setContentSize', function (t) { - var packets = 0 + await writes - const big = Buffer.alloc(8 * 1024 * 1024) - big.fill('yolo') + t.is(echoed, 10) +})) - const server = utp.createServer(function (socket) { - socket.setContentSize(big.length) - socket.on('data', () => packets++) - socket.pipe(socket) +test('echo big message', (t) => withServer(t, async (server) => { + const writes = t.test('write and close sockets') + writes.plan(3) + + let packets = 0 + + const sending = Buffer.alloc(8 * 1024 * 1024) + sending.fill('yolo') + + server.on('connection', (socket) => { + socket + .on('data', () => packets++) + .on('close', () => writes.pass('server socket closed')) + .pipe(socket) }) - server.listen(0, function () { + server.listen(() => { const then = Date.now() const socket = utp.connect(server.address().port) - const buffer = Buffer.alloc(big.length) + const buffer = Buffer.alloc(sending.length) - var ptr = 0 + let total = 0 - socket.setContentSize(big.length) - socket.write(big) - socket.on('data', function (data) { - packets++ - data.copy(buffer, ptr) - ptr += data.length - if (big.length === ptr) { - socket.end() - server.close() - t.same(buffer, big) - t.pass('echo took ' + (Date.now() - then) + 'ms (' + packets + ' packets)') - t.end() - } - }) - }) -}) - -tape('two connections', function (t) { - var count = 0 - var gotA = false - var gotB = false + socket + .on('data', (data) => { + packets++ + data.copy(buffer, total) + total += data.length - const server = utp.createServer(function (socket) { - count++ - socket.pipe(socket) + if (sending.length === total) { + writes.alike(buffer, sending) + writes.comment('echo took ' + (Date.now() - then) + 'ms (' + packets + ' packets)') + socket.end() + } + }) + .on('close', () => writes.pass('client socket closed')) + .write(sending) }) - server.listen(0, function () { - const socket1 = utp.connect(server.address().port) - const socket2 = utp.connect(server.address().port) + await writes +})) - socket1.write('a') - socket2.write('b') +test('echo big message with setContentSize', (t) => withServer(t, async (server) => { + const writes = t.test('write and close sockets') + writes.plan(3) - socket1.on('data', function (data) { - gotA = true - t.same(data, Buffer.from('a')) - if (gotB) done() - }) + let packets = 0 - socket2.on('data', function (data) { - gotB = true - t.same(data, Buffer.from('b')) - if (gotA) done() - }) + const sending = Buffer.alloc(8 * 1024 * 1024) + sending.fill('yolo') - function done () { - socket1.end() - socket2.end() - server.close() - t.ok(gotA) - t.ok(gotB) - t.same(count, 2) - t.end() - } + server.on('connection', (socket) => { + socket.setContentSize(sending.length) + socket + .on('data', () => packets++) + .on('close', () => writes.pass('server socket closed')) + .pipe(socket) }) -}) -tape('emits close', function (t) { - var serverClosed = false - var clientClosed = false + server.listen(() => { + const then = Date.now() + const socket = utp.connect(server.address().port) + const buffer = Buffer.alloc(sending.length) + + let total = 0 + + socket.setContentSize(sending.length) + socket + .on('data', (data) => { + packets++ + data.copy(buffer, total) + total += data.length + + if (sending.length === total) { + writes.alike(buffer, sending) + writes.comment('echo took ' + (Date.now() - then) + 'ms (' + packets + ' packets)') + socket.end() + } + }) + .on('close', () => writes.pass('client socket closed')) + .write(sending) + }) - const server = utp.createServer(function (socket) { - socket.resume() - socket.on('end', function () { - socket.end() - }) - socket.on('close', function () { - serverClosed = true - if (clientClosed) done() - }) + await writes +})) + +test('two connections', (t) => withServer(t, async (server) => { + const writes = t.test('write and close sockets') + writes.plan(6) + + server.on('connection', (socket) => { + socket + .on('close', () => writes.pass('server socket closed')) + .pipe(socket) }) - server.listen(0, function () { - const socket = utp.connect(server.address().port) - socket.write('hi') - socket.end() // utp does not support half open - socket.resume() - socket.on('close', function () { - clientClosed = true - if (serverClosed) done() - }) + server.listen(() => { + const a = utp.connect(server.address().port) + const b = utp.connect(server.address().port) + + a + .on('data', (data) => writes.alike(data, Buffer.from('a'))) + .on('close', () => writes.pass('a closed')) + .end('a') + + b + .on('data', (data) => writes.alike(data, Buffer.from('b'))) + .on('close', () => writes.pass('b closed')) + .end('b') }) - function done () { - server.close() - t.ok(serverClosed) - t.ok(clientClosed) - t.end() - } -}) + await writes +})) -tape('flushes', function (t) { - var sent = '' - const server = utp.createServer(function (socket) { - var buf = '' - socket.setEncoding('utf-8') - socket.on('data', function (data) { - buf += data - }) - socket.on('end', function () { - server.close() - socket.end() - t.same(buf, sent) - t.end() - }) +test('flushes', (t) => withServer(t, async (server) => { + const writes = t.test('writes') + writes.plan(1) + + const sent = [] + server.on('connection', (socket) => { + const recv = [] + socket + .on('data', (data) => recv.push(data)) + .on('end', () => { + socket.end() + writes.alike(Buffer.concat(recv), Buffer.concat(sent)) + }) }) - server.listen(0, function () { + server.listen(() => { const socket = utp.connect(server.address().port) - for (var i = 0; i < 50; i++) { - socket.write(i + '\n') - sent += i + '\n' + for (let i = 0; i < 50; i++) { + const data = Buffer.from([0x30 + i]) + socket.write(data) + sent.push(data) } socket.end() }) -}) -tape('close waits for connections to close', function (t) { - var sent = '' - const server = utp.createServer(function (socket) { - var buf = '' - socket.setEncoding('utf-8') - socket.on('data', function (data) { - buf += data - }) - socket.on('end', function () { - socket.end() - t.same(buf, sent) - t.end() - }) - server.close() + await writes +})) + +test('close waits for connections to close', (t) => withServer(t, async (server) => { + const close = t.test('close') + close.plan(2) + + const sent = [] + server.on('connection', (socket) => { + const recv = [] + socket + .on('data', (data) => recv.push(data)) + .on('end', () => { + socket.end() + close.alike(Buffer.concat(recv), Buffer.concat(sent)) + }) + server.close(() => close.pass('server closed')) }) - server.listen(0, function () { + server.listen(() => { const socket = utp.connect(server.address().port) - for (var i = 0; i < 50; i++) { - socket.write(i + '\n') - sent += i + '\n' + for (let i = 0; i < 50; i++) { + const data = Buffer.from([0x30 + i]) + socket.write(data) + sent.push(data) } socket.end() }) -}) -tape('disable half open', function (t) { + await close +})) + +test('disable half open', (t) => { t.plan(2) - const server = utp.createServer({ allowHalfOpen: false }, function (socket) { - socket.on('data', function (data) { - t.same(data, Buffer.from('a')) - }) - socket.on('close', function () { - server.close(function () { - t.pass('everything closed') + + const server = utp.createServer({ allowHalfOpen: false }, (socket) => { + socket + .on('data', (data) => { + t.alike(data, Buffer.from('a')) + }) + .on('close', () => { + server.close(() => { + t.pass('everything closed') + }) }) - }) }) - server.listen(0, function () { + server.listen(() => { const socket = utp.connect(server.address().port, '127.0.0.1', { allowHalfOpen: true }) socket.write('a') @@ -404,44 +480,71 @@ tape('disable half open', function (t) { }) }) -tape('timeout', function (t) { - t.plan(3) +test('timeout', (t) => withServer(t, async (server) => { + const close = t.test('close') + close.plan(2) - var serverClosed = false - var clientClosed = false - var missing = 2 - - const server = utp.createServer(function (socket) { - socket.setTimeout(100, function () { - t.pass('timed out') - socket.destroy() - }) - socket.resume() - socket.write('hi') - socket.on('close', function () { - serverClosed = true - done() - }) + server.on('connection', (socket) => { + socket + .on('close', () => close.pass('server closed')) + .setTimeout(100, () => socket.destroy()) }) - server.listen(0, function () { + server.listen(() => { const socket = utp.connect(server.address().port) - socket.write('hi') - socket.resume() - socket.on('end', function () { - socket.destroy() - }) - socket.on('close', function () { - clientClosed = true - done() - }) + socket + .on('end', () => socket.end()) + .on('close', () => close.pass('client closed')) + .write('hello') // why required? + }) + + await close +})) + +test('exception in connection listener', (t) => withServer(t, async (server) => { + const close = t.test('close sockets') + close.plan(3) + + const err = new Error() + + server.on('connection', (socket) => { + socket + .on('close', () => close.pass('server socket closed')) + .resume() + .end() + throw err }) - function done () { - if (--missing) return + process.once('uncaughtException', (thrown) => { + close.is(thrown, err) server.close() - t.ok(clientClosed) - t.ok(serverClosed) - t.end() + }) + + server.listen(() => { + const socket = utp.connect(server.address().port) + socket + .on('close', () => close.pass('client socket closed')) + .end('foo') + }) + + await close +})) + +async function withServer (t, cb) { + const server = utp.createServer() + + try { + await cb(server) + } finally { + const close = t.test('close server') + close.plan(2) + + close.is(server.connections.length, 0, 'connections closed') + + for (const connection of server.connections) connection.destroy() + + server.close(() => close.pass('server closed')) + + await close } -}) +} diff --git a/test/sockets.js b/test/sockets.js index af9a7b0..81783a8 100644 --- a/test/sockets.js +++ b/test/sockets.js @@ -1,130 +1,129 @@ -const tape = require('tape') +const test = require('brittle') const dgram = require('dgram') const utp = require('../') -tape('dgram-like socket', function (t) { - const socket = utp() +test('dgram-like socket', (t) => { + t.plan(3) + + const socket = new utp.Socket() socket.on('message', function (buf, rinfo) { - t.same(rinfo.port, socket.address().port) - t.same(rinfo.address, '127.0.0.1') - t.same(buf, Buffer.from('hello')) + t.is(rinfo.port, socket.address().port) + t.is(rinfo.address, '127.0.0.1') + t.alike(buf, Buffer.from('hello')) socket.close() - t.end() }) - socket.bind(function () { + socket.bind(() => { socket.send(Buffer.from('hello'), 0, 5, socket.address().port, '127.0.0.1') }) }) -tape('double close', function (t) { - const socket = utp() +test('double close', (t) => { + t.plan(1) + + const socket = new utp.Socket() - socket.on('close', function () { - socket.close(function () { + socket.on('close', () => { + socket.close(() => { t.pass('closed twice') - t.end() }) }) - socket.bind(0, function () { + socket.bind(0, () => { socket.close() }) }) -tape('echo socket', function (t) { - const socket = utp() +test('echo socket', (t) => { + t.plan(3) + + const socket = new utp.Socket() socket.on('message', function (buf, rinfo) { socket.send(buf, 0, buf.length, rinfo.port, rinfo.address) }) - socket.bind(function () { - var other = dgram.createSocket('udp4') + socket.bind(() => { + const other = dgram.createSocket('udp4') other.on('message', function (buf, rinfo) { - t.same(rinfo.port, socket.address().port) - t.same(rinfo.address, '127.0.0.1') - t.same(buf, Buffer.from('hello')) + t.is(rinfo.port, socket.address().port) + t.is(rinfo.address, '127.0.0.1') + t.alike(buf, Buffer.from('hello')) socket.close() other.close() - t.end() }) other.send(Buffer.from('hello'), 0, 5, socket.address().port, '127.0.0.1') }) }) -tape('echo socket with resolve', function (t) { - const socket = utp() +test('echo socket with resolve', (t) => { + t.plan(3) + + const socket = new utp.Socket() socket.on('message', function (buf, rinfo) { socket.send(buf, 0, buf.length, rinfo.port, 'localhost') }) - socket.bind(function () { + socket.bind(() => { const other = dgram.createSocket('udp4') other.on('message', function (buf, rinfo) { - t.same(rinfo.port, socket.address().port) - t.same(rinfo.address, '127.0.0.1') - t.same(buf, Buffer.from('hello')) + t.is(rinfo.port, socket.address().port) + t.is(rinfo.address, '127.0.0.1') + t.alike(buf, Buffer.from('hello')) socket.close() other.close() - t.end() }) other.send(Buffer.from('hello'), 0, 5, socket.address().port, '127.0.0.1') }) }) -tape('combine server and connection', function (t) { - const socket = utp() - var gotClient = false +test('combine server and connection', (t) => { + t.plan(3) + + const socket = new utp.Socket() socket.on('connection', function (client) { - gotClient = true - t.same(client.remotePort, socket.address().port) - t.same(client.remoteAddress, '127.0.0.1') + t.is(client.remotePort, socket.address().port) + t.is(client.remoteAddress, '127.0.0.1') client.pipe(client) }) - socket.listen(function () { - var client = socket.connect(socket.address().port) + socket.listen(() => { + const client = socket.connect(socket.address().port) client.write('hi') client.on('data', function (data) { + client.end() socket.close() - client.destroy() - t.same(data, Buffer.from('hi')) - t.ok(gotClient) - t.end() + t.alike(data, Buffer.from('hi')) }) }) }) -tape('both ends write first', function (t) { - var missing = 2 - const socket = utp() +test('both ends write first', async (t) => { + const close = t.test('close') + close.plan(2) + + const socket = new utp.Socket() socket.on('connection', function (connection) { connection.write('a') connection.on('data', function (data) { - t.same(data, Buffer.from('b')) + close.alike(data, Buffer.from('b')) connection.end() - done() }) }) - socket.listen(0, function () { - var connection = socket.connect(socket.address().port) + socket.listen(0, () => { + const connection = socket.connect(socket.address().port) connection.write('b') connection.on('data', function (data) { - t.same(data, Buffer.from('a')) + close.alike(data, Buffer.from('a')) connection.end() - done() }) }) - function done () { - if (--missing) return - socket.close() - t.end() - } + await close + socket.close() }) diff --git a/test/timeouts.js b/test/timeouts.js index 42f2f91..1c8b1c7 100644 --- a/test/timeouts.js +++ b/test/timeouts.js @@ -1,22 +1,25 @@ -const tape = require('tape') +const test = require('brittle') const dgram = require('dgram') const utp = require('../') -tape('connection timeout. this may take >20s', function (t) { +test('connection timeout. this may take >20s', (t) => { + t.plan(1) + const socket = dgram.createSocket('udp4') - socket.bind(0, function () { + socket.bind(0, () => { const connection = utp.connect(socket.address().port) connection.on('error', function (err) { socket.close() - t.same(err.message, 'UTP_ETIMEDOUT') - t.end() + t.is(err.message, 'UTP_ETIMEDOUT') }) }) }) -tape('write timeout. this may take >20s', function (t) { +test('write timeout. this may take >20s', (t) => { + t.plan(3) + const server = utp.createServer() - var connection + let connection server.on('connection', function (socket) { t.pass('server received connection') @@ -24,24 +27,25 @@ tape('write timeout. this may take >20s', function (t) { socket.destroy() }) - server.on('close', function () { + server.on('close', () => { connection.write('hello?') }) - server.listen(function () { + server.listen(() => { connection = utp.connect(server.address().port) - connection.on('connect', function () { + connection.on('connect', () => { t.pass('connected to server') }) connection.on('error', function (err) { - t.same(err.message, 'UTP_ETIMEDOUT') - t.end() + t.is(err.message, 'UTP_ETIMEDOUT') }) }) }) -tape('server max connections. this may take >20s', function (t) { - var inc = 0 +test('server max connections. this may take >20s', (t) => { + t.plan(4) + + let inc = 0 const server = utp.createServer({ allowHalfOpen: false }, function (socket) { inc++ t.ok(inc < 3) @@ -49,25 +53,24 @@ tape('server max connections. this may take >20s', function (t) { }) server.maxConnections = 2 - server.listen(0, function () { - var a = utp.connect(server.address().port) + server.listen(0, () => { + const a = utp.connect(server.address().port) a.write('hi') - a.on('connect', function () { - var b = utp.connect(server.address().port) + a.on('connect', () => { + const b = utp.connect(server.address().port) b.write('hi') - b.on('connect', function () { - var c = utp.connect(server.address().port) + b.on('connect', () => { + const c = utp.connect(server.address().port) c.write('hi') - c.on('connect', function () { + c.on('connect', () => { t.fail('only 2 connections') }) - c.on('error', function () { + c.on('error', () => { a.destroy() b.destroy() c.destroy() server.close() t.pass('should error') - t.end() }) }) }) diff --git a/test/udp.js b/test/udp.js index 047a61a..02480a7 100644 --- a/test/udp.js +++ b/test/udp.js @@ -1,84 +1,94 @@ -const tape = require('tape') +const test = require('brittle') const utp = require('../') -tape('bind', function (t) { - const sock = utp() +test('bind', (t) => { + t.plan(4) - sock.bind(function () { + const sock = new utp.Socket() + + sock.bind(() => { const { port, address } = sock.address() - t.same(address, '0.0.0.0') - t.same(typeof port, 'number') + t.is(address, '0.0.0.0') + t.is(typeof port, 'number') t.ok(port > 0 && port < 65536) - sock.close(() => t.end()) + sock.close(() => t.pass()) }) }) -tape('bind, close, bind', function (t) { - const sock = utp() +test('bind, close, bind', (t) => { + t.plan(6) + + const sock = new utp.Socket() - sock.bind(function () { + sock.bind(() => { const { port, address } = sock.address() - t.same(address, '0.0.0.0') - t.same(typeof port, 'number') + t.is(address, '0.0.0.0') + t.is(typeof port, 'number') t.ok(port > 0 && port < 65536) - sock.close(function () { - const otherSock = utp() + sock.close(() => { + const otherSock = new utp.Socket() - otherSock.bind(port, function () { + otherSock.bind(port, () => { const addr = otherSock.address() - t.same(addr.port, port) - t.same(addr.address, address) - otherSock.close(() => t.end()) + t.is(addr.port, port) + t.is(addr.address, address) + otherSock.close(() => t.pass()) }) }) }) }) -tape('bind after error', function (t) { - const a = utp() - const b = utp() +test('bind after error', (t) => { + t.plan(3) - a.listen(function () { - b.once('error', function (err) { - t.ok(err, 'should error') - b.listen(function () { - t.pass('should still bind') - a.close(() => b.close(() => t.end())) + const a = new utp.Socket() + const b = new utp.Socket() + + a.listen(() => { + b + .once('error', (err) => { + t.ok(err, 'should error') + b.listen(() => { + t.pass('should still bind') + a.close(() => b.close(() => t.pass())) + }) }) - }) - b.listen(a.address().port) + .listen(a.address().port) }) }) -tape('send message', function (t) { - const sock = utp() +test('send message', (t) => { + t.plan(4) + + const sock = new utp.Socket() - sock.bind(0, '127.0.0.1', function () { + sock.bind(0, '127.0.0.1', () => { const addr = sock.address() - sock.on('message', function (message, rinfo) { - t.same(rinfo, addr) - t.same(message, Buffer.from('hello')) - sock.close(() => t.end()) + sock.on('message', (message, rinfo) => { + t.alike(rinfo, addr) + t.alike(message, Buffer.from('hello')) + sock.close(() => t.pass()) }) - sock.send(Buffer.from('hello'), 0, 5, addr.port, addr.address, function (err) { - t.error(err, 'no error') + sock.send(Buffer.from('hello'), 0, 5, addr.port, addr.address, (err) => { + t.absent(err, 'no error') }) }) }) -tape('send after close', function (t) { - const sock = utp() +test('send after close', (t) => { + t.plan(2) + + const sock = new utp.Socket() - sock.bind(0, '127.0.0.1', function () { + sock.bind(0, '127.0.0.1', () => { const { port, address } = sock.address() - sock.send(Buffer.from('hello'), 0, 5, port, address, function (err) { - t.error(err, 'no error') - sock.close(function () { - sock.send(Buffer.from('world'), 0, 5, port, address, function (err) { + sock.send(Buffer.from('hello'), 0, 5, port, address, (err) => { + t.absent(err, 'no error') + sock.close(() => { + sock.send(Buffer.from('world'), 0, 5, port, address, (err) => { t.ok(err, 'should error') - t.end() }) }) }) diff --git a/ucat.js b/ucat.js index 5e343ad..c3403d3 100755 --- a/ucat.js +++ b/ucat.js @@ -1,10 +1,10 @@ #!/usr/bin/env node const socket = require('./')({ allowHalfOpen: false }) -var host = null -var port = 0 +let host = null +let port = 0 -for (var i = 2; i < process.argv.length; i++) { +for (let i = 2; i < process.argv.length; i++) { if (process.argv[i][0] !== '-') { const parts = process.argv[i].split(':') port = Number(parts.pop()) || 0