Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
463085c
First trial version
Jul 29, 2022
7fbca8a
Completed balancing code (compiled, not tested)
Aug 1, 2022
6602b15
Updated to latest master
Aug 4, 2022
29735ff
First version compiled with new recv buffer. Nothing tested.
Aug 19, 2022
3549bb8
Merged last upstream. Fixed compiling against ENABLE_NEW_RCVBUFFER
Aug 22, 2022
027072a
Fixed referring to buffer pointer in the socket
Aug 24, 2022
7a5853b
Fixed some bux during the first run
Aug 24, 2022
44803f2
Merge remote-tracking branch 'origin/master' into dev-add-group-balan…
Aug 24, 2022
6862b97
Fixed getting group status with the reading function
Aug 25, 2022
087b8db
Fixed bux in receiver buffer. Fixed handshake resource creation order…
Aug 30, 2022
b5d95d2
Added a workaround to read-ready check for a group-member socket
Aug 31, 2022
f03b09e
Added specific algorithm for tracking group loss for balancing groups
Sep 2, 2022
38510ab
Fixed broken release build
ethouris Sep 2, 2022
23dec40
Fixed bug: wrong loss range found in the buffer. Added broadcast grou…
ethouris Sep 2, 2022
e739133
Added common group losses handling
Sep 22, 2022
92639ad
Implemented receiver loss management for new bonding
ethouris Sep 22, 2022
2de148e
Cosmetics
ethouris Sep 22, 2022
047f126
Merge branch 'master' into dev-add-group-balancing
ethouris Sep 22, 2022
d07e71a
Minor bugfixes. Some line reordering and comment fixes
ethouris Sep 22, 2022
dbc3729
Fixed one deadlock. Added verification in snd-loss-removal procedure
Sep 23, 2022
b514108
Added handling of balancing group for srt-test-live. Fixed permissions.
ethouris Sep 26, 2022
a1bb540
Fixed balancing mode sending. Fixed loss list selective extraction. A…
Oct 5, 2022
1f8faac
Improved and fixed balancing loss detection
ethouris Oct 24, 2022
faf9bec
Merged against latest master
Oct 25, 2022
deeaf05
Some cosmetic fixes after the update
Oct 26, 2022
974a6ed
Merged with master (changes for AEAD)
Oct 27, 2022
c103433
Updated taking pts for stats and logs only where needed. Added more e…
Nov 3, 2022
b3f768f
Improved logging for the receiver side
ethouris Nov 3, 2022
e979a68
Updated from master and post-fixed
Nov 3, 2022
a5d0e73
Merge remote-tracking branch 'refs/remotes/ethouris/dev-add-group-bal…
Nov 3, 2022
b15876e
Added some comments regarding the new TSBPD triggering rules
Nov 7, 2022
37ec000
Updated to the latest master and fixed
Nov 8, 2022
e217b7d
Updated to latest upstream
Nov 23, 2022
251c9c5
Updated, part 1. Not tested (only UT)
Dec 6, 2022
4b39b7b
Merge branch 'master' into dev-add-group-balancing
Dec 6, 2022
631adb0
Some cosmetics and updates from upstream
Dec 7, 2022
26d11de
Updated to latest upstream
Dec 7, 2022
db18435
Fixed atomic operator bug (reported by gcc-11)
Dec 7, 2022
9f7db49
Fixed: for backup link sync use the common group buffer start sequence
Dec 8, 2022
35450ca
Fixed deadlock on 2L locked GroupLock. Fixed crash when discarding a …
Dec 15, 2022
a826e3c
Trial fix for the rogue ACK in backup groups
Dec 16, 2022
b116838
Some cosmetic log fixes
ethouris Dec 16, 2022
eaf51e4
Updated to latest upstream
ethouris Dec 19, 2022
dc2dcdf
Updated lock info in the comment
Jul 18, 2023
4302615
Updated and fixed
Feb 21, 2024
7bdd82c
Merged and post-fixed
Feb 22, 2024
432b0c7
Fixed some problems around macros that caused build breaks
Feb 22, 2024
f416392
Fixed gcc extension causing BB on Windows
Feb 22, 2024
5b99754
Fixed some build breaks reported by CI
Feb 26, 2024
328c359
Fixed C++03 incompat case
Feb 26, 2024
2e182a8
Updated with latest changes in receiver buffer. NOT TESTED, one IPE i…
Jun 20, 2024
498d980
Updated and fixed
Sep 4, 2024
b9d0c48
Updted and fixed
Apr 9, 2025
5115561
Merged with latest dev and fixed. Fixed managed flag on accepted memb…
Feb 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/socketoptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ const SocketOption srt_options [] {
{ "packetfilter", 0, SRTO_PACKETFILTER, SocketOption::PRE, SocketOption::STRING, nullptr },
{ "groupconnect", 0, SRTO_GROUPCONNECT, SocketOption::PRE, SocketOption::INT, nullptr},
{ "groupminstabletimeo", 0, SRTO_GROUPMINSTABLETIMEO, SocketOption::PRE, SocketOption::INT, nullptr},
{ "groupconfig", 0, SRTO_GROUPCONFIG, SocketOption::PRE, SocketOption::STRING, nullptr},
{ "bindtodevice", 0, SRTO_BINDTODEVICE, SocketOption::PRE, SocketOption::STRING, nullptr},
{ "retransmitalgo", 0, SRTO_RETRANSMITALGO, SocketOption::PRE, SocketOption::INT, nullptr },
{ "cryptomode", 0, SRTO_CRYPTOMODE, SocketOption::PRE, SocketOption::INT, nullptr },
Expand Down
9 changes: 5 additions & 4 deletions docs/API/API-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2138,9 +2138,10 @@ number of bytes retrieved will be at most the maximum payload of one MTU.
The [`SRTO_PAYLOADSIZE`](API-socket-options.md#SRTO_PAYLOADSIZE) value configured by the sender
is not negotiated, and not known to the receiver.
The [`SRTO_PAYLOADSIZE`](API-socket-options.md#SRTO_PAYLOADSIZE) value set on the SRT receiver
is mainly used for heuristics. However, the receiver is prepared to receive
the whole MTU as configured with [`SRTO_MSS`](API-socket-options.md#SRTO_MSS).
In this mode, however, with default settings of [`SRTO_TSBPDMODE`](API-socket-options.md#SRTO_TSBPDMODE)
is mainly used for heuristics and as the minimum size of the buffer in this
call. However, the receiver is prepared to receive the whole MTU as configured
with [`SRTO_MSS`](API-socket-options.md#SRTO_MSS). In this mode, however, with
default settings of [`SRTO_TSBPDMODE`](API-socket-options.md#SRTO_TSBPDMODE)
and [`SRTO_TLPKTDROP`](API-socket-options.md#SRTO_TLPKTDROP), the message will be
received only when its time to play has come, and until then it will be kept in the
receiver buffer. Also, when the time to play has come for a message that is next to
Expand All @@ -2149,7 +2150,7 @@ the currently lost one, it will be delivered and the lost one dropped.
| Returns | |
|:----------------------------- |:--------------------------------------------------------- |
| Size value \> 0 | Size of the data received, if successful. |
| 0 | If the connection has been closed |
| 0 | No message is ready for retrieval |
| `SRT_ERROR` | (-1) when an error occurs |
| <img width=240px height=1px/> | <img width=710px height=1px/> |

Expand Down
68 changes: 49 additions & 19 deletions docs/dev/utilities.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,30 @@ Example:
cout << endl;
```

2.a. `BIT` macro - for simplifying macro definitions
----------------------------------------------------

The `BIT` macro allows to define symbolic constants assigned to bits.

Example:
```
#define SRTGROUP_MASK BIT(30)
```

Considered were other methods to define it, like:

* an inline function: requires `constexpr`, available in C++11
* a user-defined literal, like `30_bit`, available in C++17

but this can be used as long as C++03-compatibility must be maintained.

2.b. `IsSet`: test if a bit is set in a bitmask
-----------------------------------------------

This function should be used for testing if a runtime value of the type
representing a bit set through a 32-bit integer contains a single bit set.


3. DynamicStruct: a simple array that can be only indexed with a dedicated type.
--------------------------------------------------------------------------------

Expand All @@ -96,9 +120,14 @@ compile error.
4. FixedArray: a simple wrapper for a dynamically allocated array.
------------------------------------------------------------------

Provides a wrapper of all basic operations, `operator[]` as well as basic
It's a wrapper for a dynamically-allocated array with constant size. The
wrapper provides of all basic operations, `operator[]` as well as basic
container methods: `begin(), end(), data(), size()` to satisfy the concept
of the STL random-access container.
of the STL random-access container. Important properties:

* The size is constant for the lifetime, but it can be runtime-defined.
* You can use your custom type for indexer values in `operator[]`.
* The `operator[]` method checks the index value.


5. HeapSet: a partially sorted container using the heap tree concept
Expand All @@ -112,7 +141,7 @@ class HeapSet

This container implements a concept of a partially sorted container which
guarantees always the element at the head to be the earliest in the sorting
order, and allows elements to be added to the container with partial storing.
order, and allows elements to be added to the container with partial sorting.
The element is added at the quickest findable position in the tree, while
pulling the earliest element causes tree rebalancing.

Expand All @@ -121,7 +150,7 @@ The types for the template instantiation are:
- NodeType: The type of the value kept in the container (representation of
the contained objects). This type must be a lightweight-value type, so prefer
things like integers, pointers or iterators. There must also exist a trap
representation for this type.
representation for this type (a value of "no object").

- Access: a class that provides static methods according to the requirements

Expand Down Expand Up @@ -165,7 +194,7 @@ NodeType is possible, just use different AccessType).
HeapSet state attributes:

- `none()` : returns the trap representation for NodeType (as provided by
the AccessType class), for convemience
the AccessType class), for convenience
- `npos` : an internal static constant assigned from std::string::npos
- `raw()` : returns the constant reference to the internal heap array
- `empty(), size()` : same as for the internal array
Expand Down Expand Up @@ -245,7 +274,7 @@ Differences:

- inserts only a default value
- returns the reference to the value in the map
- works for value types that are not copiable
- works for value types that are not copyable

The reference is returned because to return the node you would have
to search for it after using operator[].
Expand Down Expand Up @@ -287,9 +316,10 @@ string with surrounding `[]` and values separated by space. Used in logging.
if the value is already there, in which case nothing is inserted.

* `Tie`: similar to `std::tie` for C++03: binds two variables by exposing
they references so that this can be used in the assignment
their references so that this can be used in the assignment

* `All`: returns a pair of iterators extracted from `begin()` and `end()`
* `All`: returns a pair of iterators extracted from `begin()` and `end()`.
This can be used in conjunction with `Tie` by assigning to its result

* `Size`: a version of std::size from C++11 - for a fixed array it returns
the number of declared elements; for other types it's size() method result.
Expand All @@ -300,10 +330,10 @@ string with surrounding `[]` and values separated by space. Used in logging.
iterator concept is supported, though; for random-access containers
you should do it manually with checking size() and distance()

* `FringeValues`: Takes all values from the container and marks in the
output map, how many values of that kind were found. The output map
will then contain only unique values as keys and the value is the
number of found occurrences of this very value
* `FringeValues`: Take the values from the container and counts how many
times each unique value occur by inserting the values into the map.
The values in the output map represent the number of times a particular
value occurs.


12. CallbackHolder
Expand All @@ -321,7 +351,7 @@ the call regarding the opaque pointer.

This utility is used in window.cpp where it is required to calculate the median
value basing on the value in the very middle and filtered out values exceeding
its range of 1/8 and 8 times. Returned is a structure that shows the median and
its range by 1/8 and 8 times. Returned is a structure that shows the median and
also the lower and upper value used for filtering.

This calculation does more-less the following:
Expand Down Expand Up @@ -357,11 +387,11 @@ number of elements taken into account, through a pair.
* AccumulatePassFilterParallel

This function sums up all values in the array (from p to end) and
simultaneously elements from `para`, stated it points to an array of the same
size. The first array is used as a driver for which elements to include and
which to skip, and this is done for both arrays at particular index position.
Returner is the sum of the elements passed from the first array and from the
`para` array, as well as the number of included elements.
simultaneously elements from `para`, assuming that it points to an array of
the same size. The first array is used as a driver for which elements to
include and which to skip, and this is done for both arrays at particular index
position. Returned is the sum of the elements passed from the first array and
from the `para` array, as well as the number of included elements.


14. DriftTracer
Expand All @@ -377,7 +407,7 @@ up to maximum history is kept in the container. A special value is declared as
taken as a legitimate difference to fix.

The values of `drift()` and `overdrift()` can be read at any time, however if
you want to depend on the fact that they have been changed lately, you have to
you want to rely on the fact that they have been changed lately, you have to
check the return value from update().

IMPORTANT: drift() can be called at any time, just remember that this value may
Expand Down
56 changes: 39 additions & 17 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,17 @@ void CUDTSocket::setBrokenClosed()

bool CUDTSocket::readReady() const
{
#if SRT_ENABLE_BONDING

// If this is a group member socket, then reading happens exclusively from
// the group and the socket is only used as a connection point, packet
// dispatching and single link management. Data buffering and hence ability
// to deliver a packet through API is exclusively the matter of group,
// therefore a single socket is never "read ready".

if (m_GroupOf)
return false;
#endif
if (m_UDT.m_bConnected && m_UDT.isRcvBufferReady())
return true;

Expand Down Expand Up @@ -306,7 +317,7 @@ void CUDTUnited::cleanupAllSockets()
{
CUDTSocket* s = i->second;

#if ENABLE_BONDING
#if SRT_ENABLE_BONDING
if (s->m_GroupOf)
{
s->removeFromGroup(false);
Expand All @@ -329,7 +340,7 @@ void CUDTUnited::cleanupAllSockets()
}
m_Sockets.clear();

#if ENABLE_BONDING
#if SRT_ENABLE_BONDING
for (groups_t::iterator j = m_Groups.begin(); j != m_Groups.end(); ++j)
{
delete j->second;
Expand Down Expand Up @@ -902,6 +913,10 @@ int CUDTUnited::newConnection(const SRTSOCKET listener,
// be removed from the accept queue at this time.
should_submit_to_accept = g->groupPending_LOCKED();

// Ok, whether handled in the background, or reported through accept,
// all group-member sockets should be managed.
ns->core().m_bManaged = true;

// Update the status in the group so that the next
// operation can include the socket in the group operation.
CUDTGroup::SocketData* gm = ns->m_GroupMemberData;
Expand All @@ -914,14 +929,17 @@ int CUDTUnited::newConnection(const SRTSOCKET listener,
gm->laststatus = SRTS_CONNECTED;

g->setGroupConnected();


// Add also per-direction subscription for the about-to-be-accepted socket.
// Both first accepted socket that makes the group-accept and every next
// socket that adds a new link.
int read_modes = SRT_EPOLL_IN | SRT_EPOLL_ERR;
// In the new recvbuffer mode (and common receiver buffer) there's no waiting for reception
// on a socket and no reading from a socket directly is being done; instead the reading API
// is directly bound to the group and reading happens directly from the group's buffer.
// This includes also a situation of a newly connected socket, which will be delivering packets
// into the same common receiver buffer for the group, so readable will be the group itself
// when it has its own common buffer read-ready, by whatever reason. Packets to the buffer
// will be delivered by the sockets' receiver threads, so all these things happen strictly
// in the background.

// Keep per-socket sender ready EID.
int write_modes = SRT_EPOLL_OUT | SRT_EPOLL_ERR;
epoll_add_usock_INTERNAL(g->m_RcvEID, ns, &read_modes);
epoll_add_usock_INTERNAL(g->m_SndEID, ns, &write_modes);

// With app reader, do not set groupPacketArrival (block the
Expand Down Expand Up @@ -1865,7 +1883,7 @@ SRTSOCKET CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets,
// Do it after setting all stored options, as some of them may
// influence some group data.

groups::SocketData data = groups::prepareSocketData(ns);
groups::SocketData data = groups::prepareSocketData(ns, g.type());
if (targets[tii].token != -1)
{
// Reuse the token, if specified by the caller
Expand Down Expand Up @@ -1958,7 +1976,6 @@ SRTSOCKET CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets,
// connection succeeded or failed and whether the new socket is
// ready to use or needs to be closed.
epoll_add_usock_INTERNAL(g.m_SndEID, ns, &connect_modes);
epoll_add_usock_INTERNAL(g.m_RcvEID, ns, &connect_modes);

// Adding a socket on which we need to block to BOTH these tracking EIDs
// and the blocker EID. We'll simply remove from them later all sockets that
Expand Down Expand Up @@ -2085,7 +2102,6 @@ SRTSOCKET CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets,
f->sndstate = SRT_GST_BROKEN;
f->rcvstate = SRT_GST_BROKEN;
epoll_remove_socket_INTERNAL(g.m_SndEID, ns);
epoll_remove_socket_INTERNAL(g.m_RcvEID, ns);
}
else
{
Expand Down Expand Up @@ -2171,7 +2187,6 @@ SRTSOCKET CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets,

epoll_remove_socket_INTERNAL(eid, y->second);
epoll_remove_socket_INTERNAL(g.m_SndEID, y->second);
epoll_remove_socket_INTERNAL(g.m_RcvEID, y->second);
}
}

Expand Down Expand Up @@ -2211,7 +2226,6 @@ SRTSOCKET CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets,

epoll_remove_socket_INTERNAL(eid, s);
epoll_remove_socket_INTERNAL(g.m_SndEID, s);
epoll_remove_socket_INTERNAL(g.m_RcvEID, s);

continue;
}
Expand Down Expand Up @@ -2401,7 +2415,7 @@ void CUDTUnited::deleteGroup_LOCKED(CUDTGroup* g)
CUDTSocket* s = i->second;
if (s->m_GroupOf == g)
{
HLOGC(smlog.Debug, log << "deleteGroup: IPE: existing @" << s->id() << " points to a dead group!");
LOGC(smlog.Error, log << "deleteGroup: IPE: existing @" << s->id() << " points to a dead group!");
s->m_GroupOf = NULL;
s->m_GroupMemberData = NULL;
}
Expand All @@ -2414,7 +2428,7 @@ void CUDTUnited::deleteGroup_LOCKED(CUDTGroup* g)
CUDTSocket* s = i->second;
if (s->m_GroupOf == g)
{
HLOGC(smlog.Debug, log << "deleteGroup: IPE: closed @" << s->id() << " points to a dead group!");
LOGC(smlog.Error, log << "deleteGroup: IPE: closed @" << s->id() << " points to a dead group!");
s->m_GroupOf = NULL;
s->m_GroupMemberData = NULL;
}
Expand Down Expand Up @@ -2965,6 +2979,8 @@ int CUDTUnited::select(std::set<SRTSOCKET>* readfds, std::set<SRTSOCKET>* writef
return count;
}

// XXX This may crash when a member socket is added to selectEx.
// Consider revising to prevent a member socket from being used.
int CUDTUnited::selectEx(const vector<SRTSOCKET>& fds,
vector<SRTSOCKET>* readfds,
vector<SRTSOCKET>* writefds,
Expand All @@ -2991,7 +3007,7 @@ int CUDTUnited::selectEx(const vector<SRTSOCKET>& fds,
{
CUDTSocket* s = locateSocket(*i);

if ((!s) || s->core().m_bBroken || (s->m_Status == SRTS_CLOSED))
if ((!s) || s->core().m_bBroken || (s->m_Status == SRTS_CLOSED) || s->m_GroupOf)
{
if (exceptfds)
{
Expand Down Expand Up @@ -3350,6 +3366,12 @@ void CUDTUnited::checkBrokenSockets()
continue;
}
else

// Additional note on group receiver: with the new group
// receiver m_pRcvBuffer in the socket core is NULL always,
// but that's not a problem - you can close the member socket
// safely without worrying about reading data because they are
// in the group anyway.
{
CUDT& u = s->core();

Expand Down
Loading
Loading