Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

### NEXT

- `PortManager`: Replace `uint64_t` hash token with exact-tuple `PortRangeKey` ([PR #1812](https://github.com/versatica/mediasoup/pull/1812), by @999purple999 and @penguinol).

### 3.20.1

- Node: Make all public methods/getters that return an object/array, return a clone of that object/array ([PR #1811](https://github.com/versatica/mediasoup/pull/1811)).
Expand Down Expand Up @@ -1196,7 +1198,7 @@ Migrate `npm-scripts.js` to `npm-scripts.mjs` (ES Module) ([PR #1093](https://gi
### 3.5.9

- `libwebrtc`: Apply patch by @sspanak and @Ivaka to avoid crash. Related issue: #357.
- `PortManager.cpp`: Do not use `UV_UDP_RECVMMSG` in Windows due to a bug in `libuv` 1.37.0.
- `PortManager`: Do not use `UV_UDP_RECVMMSG` in Windows due to a bug in `libuv` 1.37.0.
- Update Node deps.

### 3.5.8
Expand Down Expand Up @@ -1290,7 +1292,7 @@ Migrate `npm-scripts.js` to `npm-scripts.mjs` (ES Module) ([PR #1093](https://gi

### 3.4.7

- `PortManager.cpp`: Do not limit the number of failed `bind()` attempts to 20 since it does not work well in scenarios that launch tons of `Workers` with same port range. Instead iterate all ports in the range given to the Worker.
- `PortManager`: Do not limit the number of failed `bind()` attempts to 20 since it does not work well in scenarios that launch tons of `Workers` with same port range. Instead iterate all ports in the range given to the Worker.
- Do not copy `catch.hpp` into `test/include/` but make the GYP `mediasoup-worker-test` target include the corresponding folder in `deps/catch`.

### 3.4.6
Expand Down
102 changes: 91 additions & 11 deletions worker/include/RTC/PortManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,86 @@

#include "common.hpp"
#include "RTC/Transport.hpp"
#include "Utils.hpp"
#include <uv.h>
#include <ankerl/unordered_dense.h>
#include <ostream>
#include <string>
#include <vector>

namespace RTC
{
class PortManager
{
private:
public:
enum class Protocol : uint8_t
{
UDP = 1,
TCP
};

/**
* Opaque-ish key identifying one (protocol, bind address, port range)
* tuple. Issued by Bind*() and consumed by Unbind(). Callers store it as
* an opaque token; equality and hashing are exact-tuple based, so distinct
* tuples never collide regardless of how close their numeric
* representations are.
*/
class PortRangeKey
{
private:
friend class PortManager;
friend struct PortRangeKeyHash;

public:
PortRangeKey() = default;
PortRangeKey(
Protocol protocol, const sockaddr_storage& bindAddr, uint16_t minPort, uint16_t maxPort);

bool operator==(const PortRangeKey& other) const noexcept;
bool operator!=(const PortRangeKey& other) const noexcept
{
return !(*this == other);
}

public:
Protocol GetProtocol() const
{
return this->protocol;
}
const sockaddr_storage& GetSockaddrStorage() const
{
return this->bindAddr;
}
uint16_t GetMinPort() const
{
return this->minPort;
}
uint16_t GetMaxPort() const
{
return this->maxPort;
}

private:
Protocol protocol{ Protocol::UDP };
sockaddr_storage bindAddr{};
uint16_t minPort{ 0u };
uint16_t maxPort{ 0u };
};

struct PortRangeKeyHash
{
/**
* Hash function. Uses ankerl::unordered_dense per-field hashes combined
* with the standard boost-style `hash_combine` seed mixer. We deliberately
* do NOT hash the raw `sockaddr_storage` bytes (padding is
* caller-controlled and would cause structurally-equal keys to hash
* differently); instead we hash the same fields that `operator==`
* inspects.
*/
size_t operator()(const PortRangeKey& key) const noexcept;
};

private:
struct PortRange
{
Expand All @@ -42,9 +106,9 @@ namespace RTC
uint16_t minPort,
uint16_t maxPort,
RTC::Transport::SocketFlags& flags,
uint64_t& hash)
PortRangeKey& key)
{
return reinterpret_cast<uv_udp_t*>(Bind(Protocol::UDP, ip, minPort, maxPort, flags, hash));
return reinterpret_cast<uv_udp_t*>(Bind(Protocol::UDP, ip, minPort, maxPort, flags, key));
}
static uv_tcp_t* BindTcp(std::string& ip, uint16_t port, RTC::Transport::SocketFlags& flags)
{
Expand All @@ -55,11 +119,11 @@ namespace RTC
uint16_t minPort,
uint16_t maxPort,
RTC::Transport::SocketFlags& flags,
uint64_t& hash)
PortRangeKey& key)
{
return reinterpret_cast<uv_tcp_t*>(Bind(Protocol::TCP, ip, minPort, maxPort, flags, hash));
return reinterpret_cast<uv_tcp_t*>(Bind(Protocol::TCP, ip, minPort, maxPort, flags, key));
}
static void Unbind(uint64_t hash, uint16_t port);
static void Unbind(const PortRangeKey& key, uint16_t port);
void Dump(int indentation = 0) const;

private:
Expand All @@ -71,15 +135,31 @@ namespace RTC
uint16_t minPort,
uint16_t maxPort,
RTC::Transport::SocketFlags& flags,
uint64_t& hash);
static uint64_t GeneratePortRangeHash(
Protocol protocol, sockaddr_storage* bindAddr, uint16_t minPort, uint16_t maxPort);
static PortRange& GetOrCreatePortRange(uint64_t hash, uint16_t minPort, uint16_t maxPort);
PortRangeKey& key);
static PortRange& GetOrCreatePortRange(const PortRangeKey& key, uint16_t minPort, uint16_t maxPort);
static uint8_t ConvertSocketFlags(RTC::Transport::SocketFlags& flags, Protocol protocol, int family);

private:
static thread_local ankerl::unordered_dense::map<uint64_t, PortRange> mapPortRanges;
static thread_local ankerl::unordered_dense::map<PortRangeKey, PortRange, PortRangeKeyHash> mapPortRanges;
};

/**
* For Catch2 to print it nicely.
*/
inline std::ostream& operator<<(std::ostream& os, const PortManager::PortRangeKey& k)
{
const std::string protocolStr = (k.GetProtocol() == PortManager::Protocol::UDP) ? "udp" : "tcp";

int family;
uint16_t port;
std::string ip;
auto* storage = const_cast<sockaddr_storage*>(std::addressof(k.GetSockaddrStorage()));

Utils::IP::GetAddressInfo(reinterpret_cast<sockaddr*>(storage), family, ip, port);

return os << "{protocol:" << protocolStr << ", family:" << family << ", ip:" << ip
<< ", minPort:" << k.GetMinPort() << ", maxPort:" << k.GetMaxPort() << "}";
}
} // namespace RTC

#endif
5 changes: 3 additions & 2 deletions worker/include/RTC/TcpServer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "common.hpp"
#include "handles/TcpConnectionHandle.hpp"
#include "handles/TcpServerHandle.hpp"
#include "RTC/PortManager.hpp"
#include "RTC/TcpConnection.hpp"
#include "RTC/Transport.hpp"
#include <string>
Expand Down Expand Up @@ -37,7 +38,7 @@ namespace RTC
uint16_t minPort,
uint16_t maxPort,
RTC::Transport::SocketFlags& flags,
uint64_t& portRangeHash);
RTC::PortManager::PortRangeKey& portRangeKey);
~TcpServer() override;

/* Pure virtual methods inherited from ::TcpServerHandle. */
Expand All @@ -50,7 +51,7 @@ namespace RTC
Listener* listener{ nullptr };
RTC::TcpConnection::Listener* connListener{ nullptr };
bool fixedPort{ false };
uint64_t portRangeHash{ 0u };
RTC::PortManager::PortRangeKey portRangeKey{};
};
} // namespace RTC

Expand Down
5 changes: 3 additions & 2 deletions worker/include/RTC/UdpSocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include "common.hpp"
#include "handles/UdpSocketHandle.hpp"
#include "RTC/PortManager.hpp"
#include "RTC/Transport.hpp"
#include <string>

Expand Down Expand Up @@ -33,7 +34,7 @@ namespace RTC
uint16_t minPort,
uint16_t maxPort,
RTC::Transport::SocketFlags& flags,
uint64_t& portRangeHash);
RTC::PortManager::PortRangeKey& portRangeKey);
~UdpSocket() override;

/* Pure virtual methods inherited from ::UdpSocketHandle. */
Expand All @@ -45,7 +46,7 @@ namespace RTC
// Passed by argument.
Listener* listener{ nullptr };
bool fixedPort{ false };
uint64_t portRangeHash{ 0u };
RTC::PortManager::PortRangeKey portRangeKey{};
};
} // namespace RTC

Expand Down
1 change: 1 addition & 0 deletions worker/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ test_sources = [
'test/src/testHelpers.cpp',
'test/src/RTC/TestKeyFrameRequestManager.cpp',
'test/src/RTC/TestNackGenerator.cpp',
'test/src/RTC/TestPortManager.cpp',
'test/src/RTC/TestRateCalculator.cpp',
'test/src/RTC/TestRtpEncodingParameters.cpp',
'test/src/RTC/TestSeqManager.cpp',
Expand Down
9 changes: 5 additions & 4 deletions worker/src/RTC/PipeTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "RTC/PipeTransport.hpp"
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include "RTC/PortManager.hpp"
#include "RTC/SCTP/packet/Packet.hpp"
#include "Settings.hpp"
#include "Utils.hpp"
Expand Down Expand Up @@ -72,15 +73,15 @@ namespace RTC
{
if (this->listenInfo.portRange.min != 0 && this->listenInfo.portRange.max != 0)
{
uint64_t portRangeHash{ 0u };
RTC::PortManager::PortRangeKey portRangeKey;

this->udpSocket = new RTC::UdpSocket(
this,
this->listenInfo.ip,
this->listenInfo.portRange.min,
this->listenInfo.portRange.max,
this->listenInfo.flags,
portRangeHash);
portRangeKey);
}
else if (this->listenInfo.port != 0)
{
Expand All @@ -92,15 +93,15 @@ namespace RTC
// required.
else
{
uint64_t portRangeHash{ 0u };
RTC::PortManager::PortRangeKey portRangeKey;

this->udpSocket = new RTC::UdpSocket(
this,
this->listenInfo.ip,
Settings::configuration.rtcMinPort,
Settings::configuration.rtcMaxPort,
this->listenInfo.flags,
portRangeHash);
portRangeKey);
}

if (this->listenInfo.sendBufferSize != 0)
Expand Down
17 changes: 9 additions & 8 deletions worker/src/RTC/PlainTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "RTC/PlainTransport.hpp"
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include "RTC/PortManager.hpp"
#include "RTC/SCTP/packet/Packet.hpp"
#include "Settings.hpp"
#include "Utils.hpp"
Expand Down Expand Up @@ -151,15 +152,15 @@ namespace RTC
{
if (this->listenInfo.portRange.min != 0 && this->listenInfo.portRange.max != 0)
{
uint64_t portRangeHash{ 0u };
RTC::PortManager::PortRangeKey portRangeKey;

this->udpSocket = new RTC::UdpSocket(
this,
this->listenInfo.ip,
this->listenInfo.portRange.min,
this->listenInfo.portRange.max,
this->listenInfo.flags,
portRangeHash);
portRangeKey);
}
else if (this->listenInfo.port != 0)
{
Expand All @@ -171,15 +172,15 @@ namespace RTC
// required.
else
{
uint64_t portRangeHash{ 0u };
RTC::PortManager::PortRangeKey portRangeKey;

this->udpSocket = new RTC::UdpSocket(
this,
this->listenInfo.ip,
Settings::configuration.rtcMinPort,
Settings::configuration.rtcMaxPort,
this->listenInfo.flags,
portRangeHash);
portRangeKey);
}

if (this->listenInfo.sendBufferSize != 0)
Expand All @@ -198,15 +199,15 @@ namespace RTC
{
if (this->rtcpListenInfo.portRange.min != 0 && this->rtcpListenInfo.portRange.max != 0)
{
uint64_t portRangeHash{ 0u };
RTC::PortManager::PortRangeKey portRangeKey;

this->rtcpUdpSocket = new RTC::UdpSocket(
this,
this->rtcpListenInfo.ip,
this->rtcpListenInfo.portRange.min,
this->rtcpListenInfo.portRange.max,
this->rtcpListenInfo.flags,
portRangeHash);
portRangeKey);
}
else if (this->rtcpListenInfo.port != 0)
{
Expand All @@ -218,15 +219,15 @@ namespace RTC
// required.
else
{
uint64_t portRangeHash{ 0u };
RTC::PortManager::PortRangeKey portRangeKey;

this->rtcpUdpSocket = new RTC::UdpSocket(
this,
this->rtcpListenInfo.ip,
Settings::configuration.rtcMinPort,
Settings::configuration.rtcMaxPort,
this->rtcpListenInfo.flags,
portRangeHash);
portRangeKey);
}

if (this->rtcpListenInfo.sendBufferSize != 0)
Expand Down
Loading
Loading