diff --git a/binding.cc b/binding.cc index 46db4f6..2981b8c 100644 --- a/binding.cc +++ b/binding.cc @@ -140,10 +140,25 @@ utp_napi_connection_drain (utp_napi_connection_t *self) { } inline static void -utp_napi_parse_address (struct sockaddr *name, char *ip, int *port) { - struct sockaddr_in *name_in = (struct sockaddr_in *) name; - *port = ntohs(name_in->sin_port); - uv_ip4_name(name_in, ip, 17); +utp_napi_parse_address (struct sockaddr *name, char *ip, int *port, int *family) { + if (name->sa_family == AF_INET6) { + struct sockaddr_in6 *name_in = (struct sockaddr_in6 *) name; + *port = ntohs(name_in->sin6_port); + uv_ip6_name(name_in, ip, 46); + *family = 6; + } else { + struct sockaddr_in *name_in = (struct sockaddr_in *) name; + *port = ntohs(name_in->sin_port); + uv_ip4_name(name_in, ip, 17); + *family = 4; + } +} + +static inline socklen_t +sockaddr_len (struct sockaddr *addr) { + return addr->sa_family == AF_INET6 + ? sizeof(struct sockaddr_in6) + : sizeof(struct sockaddr_in); } static void @@ -175,20 +190,23 @@ on_uv_read (uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct s if (nread > 0) { const unsigned char *base = (const unsigned char *) buf->base; - if (utp_process_udp(self->utp, base, nread, addr, sizeof(struct sockaddr))) return; + socklen_t slen = sockaddr_len((struct sockaddr *) addr); + if (utp_process_udp(self->utp, base, nread, addr, slen)) return; } int port; - char ip[17]; - utp_napi_parse_address((struct sockaddr *) addr, ip, &port); + int family; + char ip[46]; + utp_napi_parse_address((struct sockaddr *) addr, ip, &port, &family); UTP_NAPI_CALLBACK(self->on_message, { napi_value ret; - napi_value argv[3]; + napi_value argv[4]; napi_create_int32(env, nread, &(argv[0])); napi_create_uint32(env, port, &(argv[1])); napi_create_string_utf8(env, ip, NAPI_AUTO_LENGTH, &(argv[2])); - NAPI_MAKE_CALLBACK_AND_ALLOC(env, NULL, ctx, callback, 3, argv, ret, nread) + napi_create_uint32(env, family, &(argv[3])); + NAPI_MAKE_CALLBACK_AND_ALLOC(env, NULL, ctx, callback, 4, argv, ret, nread) }) } @@ -299,23 +317,25 @@ static uint64 on_utp_accept (utp_callback_arguments *a) { utp_napi_t *self = (utp_napi_t *) utp_context_get_userdata(a->context); - struct sockaddr addr; + struct sockaddr_storage addr; socklen_t addr_len = sizeof(addr); - utp_getpeername(a->socket, &addr, &addr_len); + utp_getpeername(a->socket, (struct sockaddr *) &addr, &addr_len); int port; - char ip[17]; - utp_napi_parse_address(&addr, ip, &port); + int family; + char ip[46]; + utp_napi_parse_address((struct sockaddr *) &addr, ip, &port, &family); self->next_connection->socket = a->socket; utp_set_userdata(a->socket, self->next_connection); UTP_NAPI_CALLBACK(self->on_connection, { - napi_value argv[2]; + napi_value argv[3]; napi_create_uint32(env, port, &(argv[0])); napi_create_string_utf8(env, ip, NAPI_AUTO_LENGTH, &(argv[1])); + napi_create_uint32(env, family, &(argv[2])); napi_value next; - NAPI_MAKE_CALLBACK(env, NULL, ctx, callback, 2, argv, &next) // will never throw due to the event being NTed in js + NAPI_MAKE_CALLBACK(env, NULL, ctx, callback, 3, argv, &next) // will never throw due to the event being NTed in js utp_napi_connection_t *connection; size_t connection_size; napi_get_buffer_info(env, next, (void **) &connection, &connection_size); @@ -479,15 +499,21 @@ NAPI_METHOD(utp_napi_bind) { NAPI_ARGV(3) NAPI_ARGV_BUFFER_CAST(utp_napi_t *, self, 0) NAPI_ARGV_UINT32(port, 1) - NAPI_ARGV_UTF8(ip, 17, 2) + NAPI_ARGV_UTF8(ip, 46, 2) uv_udp_t *handle = &(self->handle); int err; - struct sockaddr_in addr; - - err = uv_ip4_addr((char *) &ip, port, &addr); - if (err < 0) UTP_NAPI_THROW(err) + struct sockaddr_storage addr; + socklen_t addrlen; + + if (strchr((char *) &ip, ':')) { + err = uv_ip6_addr((char *) &ip, port, (struct sockaddr_in6 *) &addr); + if (err < 0) UTP_NAPI_THROW(err) + } else { + err = uv_ip4_addr((char *) &ip, port, (struct sockaddr_in *) &addr); + if (err < 0) UTP_NAPI_THROW(err) + } err = uv_udp_bind(handle, (const struct sockaddr*) &addr, 0); if (err < 0) UTP_NAPI_THROW(err) @@ -510,14 +536,20 @@ NAPI_METHOD(utp_napi_local_port) { NAPI_ARGV_BUFFER_CAST(utp_napi_t *, self, 0) int err; - struct sockaddr name; + struct sockaddr_storage name; int name_len = sizeof(name); - err = uv_udp_getsockname(&(self->handle), &name, &name_len); + err = uv_udp_getsockname(&(self->handle), (struct sockaddr *) &name, &name_len); if (err < 0) UTP_NAPI_THROW(err) - struct sockaddr_in *name_in = (struct sockaddr_in *) &name; - int port = ntohs(name_in->sin_port); + int port; + if (name.ss_family == AF_INET6) { + struct sockaddr_in6 *name_in = (struct sockaddr_in6 *) &name; + port = ntohs(name_in->sin6_port); + } else { + struct sockaddr_in *name_in = (struct sockaddr_in *) &name; + port = ntohs(name_in->sin_port); + } NAPI_RETURN_UINT32(port) } @@ -542,7 +574,7 @@ NAPI_METHOD(utp_napi_send) { NAPI_ARGV_UINT32(offset, 3) NAPI_ARGV_UINT32(len, 4) NAPI_ARGV_UINT32(port, 5) - NAPI_ARGV_UTF8(ip, 17, 6) + NAPI_ARGV_UTF8(ip, 46, 6) uv_udp_send_t *req = &(send_req->req); @@ -550,10 +582,14 @@ NAPI_METHOD(utp_napi_send) { bufs.base = buf + offset; bufs.len = len; - struct sockaddr_in addr; + struct sockaddr_storage addr; int err; - err = uv_ip4_addr((char *) &ip, port, &addr); + if (strchr((char *) &ip, ':')) { + err = uv_ip6_addr((char *) &ip, port, (struct sockaddr_in6 *) &addr); + } else { + err = uv_ip4_addr((char *) &ip, port, (struct sockaddr_in *) &addr); + } if (err) UTP_NAPI_THROW(err) err = uv_udp_send(req, &(self->handle), &bufs, 1, (const struct sockaddr *) &addr, on_uv_send); @@ -703,21 +739,28 @@ NAPI_METHOD(utp_napi_connect) { NAPI_ARGV_BUFFER_CAST(utp_napi_t *, self, 0) NAPI_ARGV_BUFFER_CAST(utp_napi_connection_t *, conn, 1) NAPI_ARGV_UINT32(port, 2) - NAPI_ARGV_UTF8(ip, 17, 3) + NAPI_ARGV_UTF8(ip, 46, 3) int err; - struct sockaddr_in addr; + struct sockaddr_storage addr; + socklen_t addrlen; // TODO: error handle conn->socket = utp_create_socket(self->utp); utp_set_userdata(conn->socket, conn); - err = uv_ip4_addr((char *) &ip, port, &addr); + if (strchr((char *) &ip, ':')) { + err = uv_ip6_addr((char *) &ip, port, (struct sockaddr_in6 *) &addr); + addrlen = sizeof(struct sockaddr_in6); + } else { + err = uv_ip4_addr((char *) &ip, port, (struct sockaddr_in *) &addr); + addrlen = sizeof(struct sockaddr_in); + } if (err) UTP_NAPI_THROW(err) // TODO: error handle - utp_connect(conn->socket, (struct sockaddr *) &addr, sizeof(struct sockaddr_in)); + utp_connect(conn->socket, (struct sockaddr *) &addr, addrlen); return NULL; } diff --git a/index.js b/index.js index d6a4414..87f9a5c 100644 --- a/index.js +++ b/index.js @@ -1,278 +1,285 @@ const binding = require('./lib/binding') const Connection = require('./lib/connection') -const util = require('util') const events = require('events') const dns = require('dns') +const net = require('net') const set = require('unordered-set') const EMPTY = Buffer.alloc(0) -const IPv4Pattern = /^((?:[0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])[.]){3}(?:[0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])$/ - -module.exports = UTP - -function UTP (opts) { - if (!(this instanceof UTP)) return new UTP(opts) - events.EventEmitter.call(this) - - this.connections = [] - - this._sending = [] - this._sent = [] - this._offset = 0 - this._buffer = Buffer.allocUnsafe(2 * 65536) - this._handle = Buffer.alloc(binding.sizeof_utp_napi_t) - this._nextConnection = Buffer.alloc(binding.sizeof_utp_napi_connection_t) - this._address = null - this._inited = false - this._refed = true - this._closing = false - this._closed = false - this._allowHalfOpen = !opts || opts.allowHalfOpen !== false - this._acceptConnections = new Uint32Array(this._handle.buffer, this._handle.byteOffset + binding.offsetof_utp_napi_t_accept_connections, 1) - this.maxConnections = 0 -} - -util.inherits(UTP, events.EventEmitter) - -UTP.createServer = function (opts, onconnection) { - if (typeof opts === 'function') return UTP.createServer(null, opts) - const server = new UTP(opts) - if (onconnection) server.on('connection', onconnection) - return server -} -UTP.connect = function (port, host, opts) { - const udp = new UTP(opts) - return udp.connect(port, host).on('close', ononeoffclose) -} +module.exports = class UTP extends events.EventEmitter { + constructor (opts) { + super() + this.connections = [] -UTP.prototype._init = function () { - this._inited = true + this._sending = [] + this._sent = [] + this._offset = 0 + this._buffer = Buffer.allocUnsafe(2 * 65536) + this._handle = Buffer.alloc(binding.sizeof_utp_napi_t) + this._nextConnection = Buffer.alloc(binding.sizeof_utp_napi_connection_t) + this._address = null + this._inited = false + this._refed = true + this._closing = false + this._closed = false + this._allowHalfOpen = !opts || opts.allowHalfOpen !== false + this._acceptConnections = new Uint32Array(this._handle.buffer, this._handle.byteOffset + binding.offsetof_utp_napi_t_accept_connections, 1) + this.maxConnections = 0 + } - binding.utp_napi_init(this._handle, this, - this._nextConnection, - this._buffer, - this._onmessage, - this._onsend, - this._onconnection, - this._onclose, - this._realloc - ) + static createServer (opts, onconnection) { + if (typeof opts === 'function') return UTP.createServer(null, opts) + const server = new UTP(opts) + if (onconnection) server.on('connection', onconnection) + return server + } - if (!this._refed) this.unref() -} + static connect (port, host, opts) { + const udp = new UTP(opts) + return udp.connect(port, host).on('close', ononeoffclose) + } -UTP.prototype.firewall = function (yes) { - this._acceptConnections[0] = yes ? 0 : 1 -} + _init () { + this._inited = true -UTP.prototype.ref = function () { - if (this._inited) binding.utp_napi_ref(this._handle) - this._refed = true -} + binding.utp_napi_init(this._handle, this, + this._nextConnection, + this._buffer, + this._onmessage, + this._onsend, + this._onconnection, + this._onclose, + this._realloc + ) -UTP.prototype.unref = function () { - if (this._inited) binding.utp_napi_unref(this._handle) - this._refed = false -} + if (!this._refed) this.unref() + } -UTP.prototype.address = function () { - if (!this._address || this._closing) throw new Error('Socket not bound') - return { - address: this._address, - family: 'IPv4', - port: binding.utp_napi_local_port(this._handle) + firewall (yes) { + this._acceptConnections[0] = yes ? 0 : 1 } -} -UTP.prototype.getRecvBufferSize = function () { - if (!this._inited) throw new Error('getRecvBufferSize EBADF') - if (this._closing) return 0 - return binding.utp_napi_recv_buffer(this._handle, 0) -} + ref () { + if (this._inited) binding.utp_napi_ref(this._handle) + this._refed = true + } -UTP.prototype.setRecvBufferSize = function (n) { - if (!this._inited) throw new Error('setRecvBufferSize EBADF') - if (this._closing) return 0 - return binding.utp_napi_recv_buffer(this._handle, n) -} + unref () { + if (this._inited) binding.utp_napi_unref(this._handle) + this._refed = false + } -UTP.prototype.getSendBufferSize = function () { - if (!this._inited) throw new Error('getSendBufferSize EBADF') - if (this._closing) return 0 - return binding.utp_napi_send_buffer(this._handle, 0) -} + address () { + if (!this._address || this._closing) throw new Error('Socket not bound') + return { + address: this._address, + family: this._address.indexOf(':') !== -1 ? 'IPv6' : 'IPv4', + port: binding.utp_napi_local_port(this._handle) + } + } -UTP.prototype.setSendBufferSize = function (n) { - if (!this._inited) throw new Error('setSendBufferSize EBADF') - if (this._closing) return 0 - return binding.utp_napi_send_buffer(this._handle, n) -} + getRecvBufferSize () { + if (!this._inited) throw new Error('getRecvBufferSize EBADF') + if (this._closing) return 0 + return binding.utp_napi_recv_buffer(this._handle, 0) + } -UTP.prototype.setTTL = function (ttl) { - if (!this._inited) throw new Error('setTTL EBADF') - if (this._closing) return - binding.utp_napi_set_ttl(this._handle, ttl) -} + setRecvBufferSize (n) { + if (!this._inited) throw new Error('setRecvBufferSize EBADF') + if (this._closing) return 0 + return binding.utp_napi_recv_buffer(this._handle, n) + } -UTP.prototype.send = function (buf, offset, len, port, host, cb) { - if (!cb) cb = noop - if (!isIP(host)) return this._resolveAndSend(buf, offset, len, port, host, cb) - if (this._closing) return process.nextTick(cb, new Error('Socket is closed')) - if (!this._address) this.bind(0) + getSendBufferSize () { + if (!this._inited) throw new Error('getSendBufferSize EBADF') + if (this._closing) return 0 + return binding.utp_napi_send_buffer(this._handle, 0) + } - var send = this._sent.pop() - if (!send) { - send = new SendRequest() - binding.utp_napi_send_request_init(send._handle, send) + setSendBufferSize (n) { + if (!this._inited) throw new Error('setSendBufferSize EBADF') + if (this._closing) return 0 + return binding.utp_napi_send_buffer(this._handle, n) } - send._index = this._sending.push(send) - 1 - send._buffer = buf - send._callback = cb + setTTL (ttl) { + if (!this._inited) throw new Error('setTTL EBADF') + if (this._closing) return + binding.utp_napi_set_ttl(this._handle, ttl) + } - binding.utp_napi_send(this._handle, send._handle, send._buffer, offset, len, port, host) -} + send (buf, offset, len, port, host, cb) { + if (!cb) cb = noop + if (!isIP(host)) return this._resolveAndSend(buf, offset, len, port, host, cb) + if (this._closing) return process.nextTick(cb, new Error('Socket is closed')) + if (!this._address) this.bind(0) -UTP.prototype._resolveAndSend = function (buf, offset, len, port, host, cb) { - const self = this + var send = this._sent.pop() + if (!send) { + send = new SendRequest() + binding.utp_napi_send_request_init(send._handle, send) + } - dns.lookup(host, onlookup) + send._index = this._sending.push(send) - 1 + send._buffer = buf + send._callback = cb - function onlookup (err, ip) { - if (err) return cb(err) - if (!ip) return cb(new Error('Could not resolve ' + host)) - self.send(buf, offset, len, port, ip, cb) + binding.utp_napi_send(this._handle, send._handle, send._buffer, offset, len, port, host) } -} -UTP.prototype.close = function (onclose) { - if (this._closed) return process.nextTick(callOnClose, this, onclose) - if (onclose) this.once('close', onclose) - if (this._closing) return - this._closing = true - this._closeMaybe() -} - -UTP.prototype._closeMaybe = function () { - if (this._closing && !this.connections.length && !this._sending.length && this._inited && !this._closed) { - this._closed = true - binding.utp_napi_close(this._handle) + _resolveAndSend (buf, offset, len, port, host, cb) { + const self = this + const sockIs6 = self._address && self._address.indexOf(':') !== -1 + + dns.lookup(host, { all: true }, onlookup) + + function onlookup (err, addrs) { + if (err) return cb(err) + if (!addrs || !addrs.length) return cb(new Error('Could not resolve ' + host)) + var ip = addrs[0].address + for (var i = 0; i < addrs.length; i++) { + if ((addrs[i].family === 6) === sockIs6) { + ip = addrs[i].address + break + } + } + self.send(buf, offset, len, port, ip, cb) + } } -} - -UTP.prototype.connect = function (port, ip) { - if (!this._inited) this.bind() - if (!ip) ip = '127.0.0.1' - const conn = new Connection(this, port, ip, null, this._allowHalfOpen) - if (!isIP(ip)) conn._resolveAndConnect(port, ip) - else conn._connect(port, ip || '127.0.0.1') - return conn -} - -UTP.prototype.listen = function (port, ip, onlistening) { - if (!this._address) this.bind(port, ip, onlistening) - this.firewall(false) -} -UTP.prototype.bind = function (port, ip, onlistening) { - if (typeof port === 'function') return this.bind(0, null, port) - if (typeof ip === 'function') return this.bind(port, null, ip) - if (!port) port = 0 - if (!ip) ip = '0.0.0.0' + close (onclose) { + if (this._closed) return process.nextTick(callOnClose, this, onclose) + if (onclose) this.once('close', onclose) + if (this._closing) return + this._closing = true + this._closeMaybe() + } - if (!this._inited) this._init() - if (this._closing) return + _closeMaybe () { + if (this._closing && !this.connections.length && !this._sending.length && this._inited && !this._closed) { + this._closed = true + binding.utp_napi_close(this._handle) + } + } - if (this._address) { - this.emit('error', new Error('Socket already bound')) - return + connect (port, ip) { + const is6 = ip && ip.indexOf(':') !== -1 + if (this._inited) this.bind(0, is6 ? '::' : '0.0.0.0') + if (!ip) ip = '127.0.0.1' + const family = isIP(ip) + const conn = new Connection(this, port, ip, null, this._allowHalfOpen, family) + if (!family) conn._resolveAndConnect(port, ip) + else conn._connect(port, ip || '127.0.0.1') + return conn } - if (onlistening) this.once('listening', onlistening) - if (!isIP(ip)) return this._resolveAndBind(port, ip) + listen (port, ip, onlistening) { + if (!this._address) this.bind(port, ip, onlistening) + this.firewall(false) + } - this._address = ip + bind (port, ip, onlistening) { + if (typeof port === 'function') return this.bind(0, null, port) + if (typeof ip === 'function') return this.bind(port, null, ip) + if (!port) port = 0 + if (!ip) ip = '0.0.0.0' - try { - binding.utp_napi_bind(this._handle, port, ip) - } catch (err) { - this._address = null - process.nextTick(emitError, this, err) - return - } + if (!this._inited) this._init() + if (this._closing) return - process.nextTick(emitListening, this) -} + if (this._address) { + this.emit('error', new Error('Socket already bound')) + return + } -UTP.prototype._resolveAndBind = function (port, host) { - const self = this + if (onlistening) this.once('listening', onlistening) + if (!isIP(ip)) return this._resolveAndBind(port, ip) - dns.lookup(host, function (err, ip) { - if (err) return self.emit('error', err) - self.bind(port, ip) - }) -} + this._address = ip -UTP.prototype._realloc = function () { - this._buffer = Buffer.allocUnsafe(this._buffer.length) - this._offset = 0 - return this._buffer -} + try { + binding.utp_napi_bind(this._handle, port, ip) + } catch (err) { + this._address = null + process.nextTick(emitError, this, err) + return + } -UTP.prototype._onmessage = function (size, port, address) { - if (size < 0) { - this.emit('error', new Error('Read failed (status: ' + size + ')')) - return EMPTY + process.nextTick(emitListening, this) } - const message = this._buffer.slice(this._offset, this._offset += size) - this.emit('message', message, { address, family: 'IPv4', port }) + _resolveAndBind (port, host) { + const self = this + + dns.lookup(host, function (err, ip) { + if (err) return self.emit('error', err) + self.bind(port, ip) + }) + } - if (this._buffer.length - this._offset <= 65536) { + _realloc () { this._buffer = Buffer.allocUnsafe(this._buffer.length) this._offset = 0 return this._buffer } - return EMPTY -} + _onmessage (size, port, address, family) { + if (size < 0) { + this.emit('error', new Error('Read failed (status: ' + size + ')')) + return EMPTY + } -UTP.prototype._onsend = function (send, status) { - const cb = send._callback + const message = this._buffer.slice(this._offset, this._offset += size) + this.emit('message', message, { address, family: family === 6 ? 'IPv6' : 'IPv4', port }) - send._callback = send._buffer = null - set.remove(this._sending, send) - this._sent.push(send) - if (this._closing) this._closeMaybe() + if (this._buffer.length - this._offset <= 65536) { + this._buffer = Buffer.allocUnsafe(this._buffer.length) + this._offset = 0 + return this._buffer + } - cb(status < 0 ? new Error('Send failed (status: ' + status + ')') : null) -} + return EMPTY + } -UTP.prototype._onconnection = function (port, addr) { - const conn = new Connection(this, port, addr, this._nextConnection, this._allowHalfOpen) - process.nextTick(emitConnection, this, conn) - this._nextConnection = Buffer.alloc(binding.sizeof_utp_napi_connection_t) - return this._nextConnection -} + _onsend (send, status) { + const cb = send._callback -UTP.prototype._onclose = function () { - binding.utp_napi_destroy(this._handle, this._sent.map(toHandle)) - this._handle = null - this.emit('close') + send._callback = send._buffer = null + set.remove(this._sending, send) + this._sent.push(send) + if (this._closing) this._closeMaybe() + + cb(status < 0 ? new Error('Send failed (status: ' + status + ')') : null) + } + + _onconnection (port, addr, family) { + const conn = new Connection(this, port, addr, this._nextConnection, this._allowHalfOpen, family) + process.nextTick(emitConnection, this, conn) + this._nextConnection = Buffer.alloc(binding.sizeof_utp_napi_connection_t) + return this._nextConnection + } + + _onclose () { + binding.utp_napi_destroy(this._handle, this._sent.map(toHandle)) + this._handle = null + this.emit('close') + } } -function SendRequest () { - this._handle = Buffer.alloc(binding.sizeof_utp_napi_send_request_t) - this._buffer = null - this._callback = null - this._index = null +class SendRequest { + constructor () { + this._handle = Buffer.alloc(binding.sizeof_utp_napi_send_request_t) + this._buffer = null + this._callback = null + this._index = null + } } function noop () {} function isIP (ip) { - return IPv4Pattern.test(ip) + return net.isIP(ip) } function toHandle (obj) { diff --git a/lib/connection.js b/lib/connection.js index 122f0bc..6f94dc1 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -1,6 +1,5 @@ const binding = require('./binding') const stream = require('readable-stream') -const util = require('util') const unordered = require('unordered-set') const dns = require('dns') const timeout = require('timeout-refresh') @@ -13,220 +12,227 @@ const UTP_ERRORS = [ 'UTP_UNKNOWN' ] -module.exports = Connection - -function Connection (utp, port, address, handle, halfOpen) { - stream.Duplex.call(this) - - this.remoteAddress = address - this.remoteFamily = 'IPv4' - this.remotePort = port - this.destroyed = false - - this._index = -1 - this._utp = utp - this._handle = handle || Buffer.alloc(binding.sizeof_utp_napi_connection_t) - this._buffer = Buffer.allocUnsafe(65536 * 2) - this._offset = 0 - this._view = new Uint32Array(this._handle.buffer, this._handle.byteOffset, 2) - this._callback = null - this._writing = null - this._error = null - this._connected = false - this._needsConnect = !handle - this._timeout = null - this._contentSize = 0 - this._allowOpen = halfOpen ? 2 : 1 - - this.on('finish', this._shutdown) - - binding.utp_napi_connection_init(this._handle, this, this._buffer, - this._onread, - this._ondrain, - this._onend, - this._onerror, - this._onclose, - this._onconnect, - this._realloc - ) - - unordered.add(utp.connections, this) - if (utp.maxConnections && utp.connections.length >= utp.maxConnections) { - utp.firewall(true) - } -} - -util.inherits(Connection, stream.Duplex) - -Connection.prototype.setTimeout = function (ms, ontimeout) { - if (ontimeout) this.once('timeout', ontimeout) - if (this._timeout) this._timeout.destroy() - this._timeout = timeout(ms, this._ontimeout, this) -} - -Connection.prototype._ontimeout = function () { - this.emit('timeout') -} +module.exports = class Connection extends stream.Duplex { + constructor (utp, port, address, handle, halfOpen, family) { + super() -Connection.prototype.setInteractive = function (interactive) { - this.setPacketSize(this.interactive ? 0 : 65536) -} + this.remoteAddress = address + this.remoteFamily = family === 6 ? 'IPv6' : 'IPv4' + this.remotePort = port + this.destroyed = false -Connection.prototype.setContentSize = function (size) { - this._view[0] = size < 65536 ? (size >= 0 ? size : 0) : 65536 - this._contentSize = size -} + this._index = -1 + this._utp = utp + this._handle = handle || Buffer.alloc(binding.sizeof_utp_napi_connection_t) + this._buffer = Buffer.allocUnsafe(65536 * 2) + this._offset = 0 + this._view = new Uint32Array(this._handle.buffer, this._handle.byteOffset, 2) + this._callback = null + this._writing = null + this._error = null + this._connected = false + this._needsConnect = !handle + this._timeout = null + this._contentSize = 0 + this._allowOpen = halfOpen ? 2 : 1 + + this.on('finish', this._shutdown) + + binding.utp_napi_connection_init(this._handle, this, this._buffer, + this._onread, + this._ondrain, + this._onend, + this._onerror, + this._onclose, + this._onconnect, + this._realloc + ) + + unordered.add(utp.connections, this) + if (utp.maxConnections && utp.connections.length >= utp.maxConnections) { + utp.firewall(true) + } + } -Connection.prototype.setPacketSize = function (size) { - if (size > 65536) size = 65536 - this._view[0] = size - this._contentSize = 0 -} + setTimeout (ms, ontimeout) { + if (ontimeout) this.once('timeout', ontimeout) + if (this._timeout) this._timeout.destroy() + this._timeout = timeout(ms, this._ontimeout, this) + } -Connection.prototype.address = function () { - if (this.destroyed) return null - return this._utp.address() -} + _ontimeout () { + this.emit('timeout') + } -Connection.prototype._read = function () { - // TODO: backpressure -} + setInteractive (interactive) { + this.setPacketSize(this.interactive ? 0 : 65536) + } -Connection.prototype._write = function (data, enc, cb) { - if (this.destroyed) return + setContentSize (size) { + this._view[0] = size < 65536 ? (size >= 0 ? size : 0) : 65536 + this._contentSize = size + } - if (!this._connected || !binding.utp_napi_connection_write(this._handle, data)) { - this._callback = cb - this._writing = new Array(1) - this._writing[0] = data - return + setPacketSize (size) { + if (size > 65536) size = 65536 + this._view[0] = size + this._contentSize = 0 } - cb(null) -} + address () { + if (this.destroyed) return null + return this._utp.address() + } -Connection.prototype._writev = function (datas, cb) { - if (this.destroyed) return + _read () { + // TODO: backpressure + } - const bufs = new Array(datas.length) - for (var i = 0; i < datas.length; i++) bufs[i] = datas[i].chunk + _write (data, enc, cb) { + if (this.destroyed) return - if (bufs.length > 256) return this._write(Buffer.concat(bufs), null, cb) + if (!this._connected || !binding.utp_napi_connection_write(this._handle, data)) { + this._callback = cb + this._writing = new Array(1) + this._writing[0] = data + return + } - if (!binding.utp_napi_connection_writev(this._handle, bufs)) { - this._callback = cb - this._writing = bufs - return + cb(null) } - cb(null) -} + _writev (datas, cb) { + if (this.destroyed) return -Connection.prototype._realloc = function () { - this._buffer = Buffer.allocUnsafe(this._buffer.length) - this._offset = 0 - return this._buffer -} + const bufs = new Array(datas.length) + for (var i = 0; i < datas.length; i++) bufs[i] = datas[i].chunk -Connection.prototype._onread = function (size) { - if (!this._connected) this._onconnect() // makes the server wait for reads before writes - if (this._timeout) this._timeout.refresh() + if (bufs.length > 256) return this._write(Buffer.concat(bufs), null, cb) - const buf = this._buffer.slice(this._offset, this._offset += size) + if (!binding.utp_napi_connection_writev(this._handle, bufs)) { + this._callback = cb + this._writing = bufs + return + } - if (this._contentSize) { - if (size > this._contentSize) size = this._contentSize - this._contentSize -= size - if (this._contentSize < 65536) this._view[0] = this._contentSize + cb(null) } - this.push(buf) - - // 64kb + 4kb as max package buffer is 64kb and we wanna make sure we have room for that - // plus the next udp package - if (this._buffer.length - this._offset <= 69632) { + _realloc () { this._buffer = Buffer.allocUnsafe(this._buffer.length) this._offset = 0 return this._buffer } - return EMPTY -} + _onread (size) { + if (!this._connected) this._onconnect() // makes the server wait for reads before writes + if (this._timeout) this._timeout.refresh() -Connection.prototype._ondrain = function () { - this._writing = null - const cb = this._callback - this._callback = null - cb(null) -} + const buf = this._buffer.slice(this._offset, this._offset += size) -Connection.prototype._onclose = function () { - unordered.remove(this._utp.connections, this) - if (!this._utp.maxConnections || this._utp.connections.length < this._utp.maxConnections) { - this._utp.firewall(false) + if (this._contentSize) { + if (size > this._contentSize) size = this._contentSize + this._contentSize -= size + if (this._contentSize < 65536) this._view[0] = this._contentSize + } + + this.push(buf) + + // 64kb + 4kb as max package buffer is 64kb and we wanna make sure we have room for that + // plus the next udp package + if (this._buffer.length - this._offset <= 69632) { + this._buffer = Buffer.allocUnsafe(this._buffer.length) + this._offset = 0 + return this._buffer + } + + return EMPTY } - this._handle = null - if (this._error) this.emit('error', this._error) - this.emit('close') - this._utp._closeMaybe() -} -Connection.prototype._onerror = function (status) { - this.destroy(createUTPError(status)) -} + _ondrain () { + this._writing = null + const cb = this._callback + this._callback = null + cb(null) + } -Connection.prototype._onend = function () { - if (this._timeout) this._timeout.destroy() - this.push(null) - this._destroyMaybe() -} + _onclose () { + unordered.remove(this._utp.connections, this) + if (!this._utp.maxConnections || this._utp.connections.length < this._utp.maxConnections) { + this._utp.firewall(false) + } + this._handle = null + if (this._error) this.emit('error', this._error) + this.emit('close') + this._utp._closeMaybe() + } -Connection.prototype._resolveAndConnect = function (port, host) { - const self = this - dns.lookup(host, function (err, ip) { - if (err) return self.destroy(err) - if (!ip) return self.destroy(new Error('Could not resolve ' + host)) - self._connect(port, ip) - }) -} + _onerror (status) { + this.destroy(createUTPError(status)) + } -Connection.prototype._connect = function (port, ip) { - if (this.destroyed) return - this._needsConnect = false - this.remoteAddress = ip - binding.utp_napi_connect(this._utp._handle, this._handle, port, ip) -} + _onend () { + if (this._timeout) this._timeout.destroy() + this.push(null) + this._destroyMaybe() + } -Connection.prototype._onconnect = function () { - if (this._timeout) this._timeout.refresh() + _resolveAndConnect (port, host) { + const self = this + const sockIs6 = self._utp._address && self._utp._address.indexOf(':') !== -1 + dns.lookup(host, { all: true }, function (err, addrs) { + if (err) return self.destroy(err) + if (!addrs || !addrs.length) return self.destroy(new Error('Could not resolve ' + host)) + var ip = addrs[0].address + for (var i = 0; i < addrs.length; i++) { + if ((addrs[i].family === 6) === sockIs6) { + ip = addrs[i].address + break + } + } + self._connect(port, ip) + }) + } - this._connected = true - if (this._writing) { - const cb = this._callback - const data = this._writing[0] - this._callback = null - this._writing = null - this._write(data, null, cb) + _connect (port, ip) { + if (this.destroyed) return + this._needsConnect = false + this.remoteAddress = ip + this.remoteFamily = ip.indexOf(':') !== -1 ? 'IPv6' : 'IPv4' + binding.utp_napi_connect(this._utp._handle, this._handle, port, ip) } - this.emit('connect') -} -Connection.prototype.destroy = function (err) { - if (this.destroyed) return - this.destroyed = true - if (err) this._error = err - if (this._needsConnect) return process.nextTick(onbindingclose, this) - binding.utp_napi_connection_close(this._handle) -} + _onconnect () { + if (this._timeout) this._timeout.refresh() + + this._connected = true + if (this._writing) { + const cb = this._callback + const data = this._writing[0] + this._callback = null + this._writing = null + this._write(data, null, cb) + } + this.emit('connect') + } -Connection.prototype._destroyMaybe = function () { - if (this._allowOpen && !--this._allowOpen) this.destroy() -} + destroy (err) { + if (this.destroyed) return + this.destroyed = true + if (err) this._error = err + if (this._needsConnect) return process.nextTick(onbindingclose, this) + binding.utp_napi_connection_close(this._handle) + } -Connection.prototype._shutdown = function () { - if (this.destroyed) return - binding.utp_napi_connection_shutdown(this._handle) - this._destroyMaybe() + _destroyMaybe () { + if (this._allowOpen && !--this._allowOpen) this.destroy() + } + + _shutdown () { + if (this.destroyed) return + binding.utp_napi_connection_shutdown(this._handle) + this._destroyMaybe() + } } function onbindingclose (self) { diff --git a/test/sockets.js b/test/sockets.js index af9a7b0..9063b2f 100644 --- a/test/sockets.js +++ b/test/sockets.js @@ -81,7 +81,7 @@ tape('combine server and connection', function (t) { socket.on('connection', function (client) { gotClient = true - t.same(client.remotePort, socket.address().port) + t.notEqual(client.remotePort, socket.address().port, 'uses dedicated ephemeral port') t.same(client.remoteAddress, '127.0.0.1') client.pipe(client) })