From b79e4f6e56846a6050e9ac9a8f5b2fc2bf948b19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Wed, 18 Jun 2025 12:44:06 +0200 Subject: [PATCH 1/7] [FOR TESTING] Add `consumer.degrade()` --- node/src/Consumer.ts | 78 ++++++++++++++++ worker/fbs/consumer.fbs | 6 ++ worker/fbs/request.fbs | 2 + worker/include/RTC/SimpleConsumer.hpp | 22 ++++- worker/src/Channel/ChannelRequest.cpp | 1 + worker/src/RTC/SimpleConsumer.cpp | 122 ++++++++++++++++++++++++++ 6 files changed, 230 insertions(+), 1 deletion(-) diff --git a/node/src/Consumer.ts b/node/src/Consumer.ts index ac092a5d2f..edef981e7b 100644 --- a/node/src/Consumer.ts +++ b/node/src/Consumer.ts @@ -463,6 +463,84 @@ export class ConsumerImpl ); } + /** + * Degrade RTP transmission. + * - delayMs: The delay (in ms) to be applied to the first packet. + * - lossPercent: Generate packet loss by given percent value. + * - durationMs: Duration that the degradation will take. + * + * @remarks + * - Only implemented in `SimpleConsumer`. + * - After `durationMs`, or if `consumer.degrade()` is called again with + * `durationMs: 0`, then degradation is immediately stopped and all delayed + * buffered packets are immediately sent (all together). + * + * @throws + * - If called on a non `SimpleConsumer` (due to method not implemented). + * + * @todo + * - `lossPercent` not implemented yet. + * + * @example + * ```ts + * consumer.degrade({ + * delayMs: 3000, + * lossPercent: 0, + * durationMs: 10000 + * }); + * ``` + */ + async degrade({ + delayMs = 0, + lossPercent = 0, + durationMs = 0, + }: { + delayMs?: number; + lossPercent?: number; + durationMs?: number; + } = {}): Promise { + if (delayMs > Math.pow(2, 16) - 1) { + delayMs = Math.pow(2, 16) - 1; + } else if (delayMs < 0) { + delayMs = 0; + } + + if (lossPercent > 100) { + lossPercent = 100; + } else if (lossPercent < 0) { + lossPercent = 0; + } + + if (durationMs > Math.pow(2, 32) - 1) { + durationMs = Math.pow(2, 32) - 1; + } else if (durationMs < 0) { + durationMs = 0; + } + + if (durationMs === 0) { + delayMs = 0; + lossPercent = 0; + } + + logger.debug( + `degrade() [delayMs:${delayMs}, lossPercent:${lossPercent}, durationMs:${durationMs}]` + ); + + /* Build Request. */ + const requestOffset = new FbsConsumer.DegradeRequestT( + delayMs, + lossPercent, + durationMs + ).pack(this.#channel.bufferBuilder); + + await this.#channel.request( + FbsRequest.Method.CONSUMER_DEGRADE, + FbsRequest.Body.Consumer_DegradeRequest, + requestOffset, + this.#internal.consumerId + ); + } + private handleWorkerNotifications(): void { this.#channel.on( this.#internal.consumerId, diff --git a/worker/fbs/consumer.fbs b/worker/fbs/consumer.fbs index 715ce2bd53..df97ad44b3 100644 --- a/worker/fbs/consumer.fbs +++ b/worker/fbs/consumer.fbs @@ -44,6 +44,12 @@ table EnableTraceEventRequest { events: [TraceEventType] (required); } +table DegradeRequest { + delay_ms: uint16; + loss_percent: uint8; + duration_ms: uint32; +} + table DumpResponse { data: ConsumerDump (required); } diff --git a/worker/fbs/request.fbs b/worker/fbs/request.fbs index d9f869e878..7c0126b120 100644 --- a/worker/fbs/request.fbs +++ b/worker/fbs/request.fbs @@ -60,6 +60,7 @@ enum Method: uint8 { CONSUMER_SET_PRIORITY, CONSUMER_REQUEST_KEY_FRAME, CONSUMER_ENABLE_TRACE_EVENT, + CONSUMER_DEGRADE, DATAPRODUCER_DUMP, DATAPRODUCER_GET_STATS, DATAPRODUCER_PAUSE, @@ -113,6 +114,7 @@ union Body { Consumer_SetPreferredLayersRequest: FBS.Consumer.SetPreferredLayersRequest, Consumer_SetPriorityRequest: FBS.Consumer.SetPriorityRequest, Consumer_EnableTraceEventRequest: FBS.Consumer.EnableTraceEventRequest, + Consumer_DegradeRequest: FBS.Consumer.DegradeRequest, DataConsumer_SetBufferedAmountLowThresholdRequest: FBS.DataConsumer.SetBufferedAmountLowThresholdRequest, DataConsumer_SendRequest: FBS.DataConsumer.SendRequest, DataConsumer_SetSubchannelsRequest: FBS.DataConsumer.SetSubchannelsRequest, diff --git a/worker/include/RTC/SimpleConsumer.hpp b/worker/include/RTC/SimpleConsumer.hpp index d215d2b8ed..e1f6f97fc7 100644 --- a/worker/include/RTC/SimpleConsumer.hpp +++ b/worker/include/RTC/SimpleConsumer.hpp @@ -5,11 +5,22 @@ #include "RTC/Consumer.hpp" #include "RTC/SeqManager.hpp" #include "RTC/Shared.hpp" +#include "handles/TimerHandle.hpp" +#include namespace RTC { - class SimpleConsumer : public RTC::Consumer, public RTC::RtpStreamSend::Listener + class SimpleConsumer : public RTC::Consumer, + public RTC::RtpStreamSend::Listener, + public TimerHandle::Listener { + private: + struct DelayedPacketItem + { + RtpPacket* packet{ nullptr }; + uint64_t arrivalTime{ 0 }; + }; + public: SimpleConsumer( RTC::Shared* shared, @@ -65,6 +76,10 @@ namespace RTC public: void HandleRequest(Channel::ChannelRequest* request) override; + /* Pure virtual methods inherited from TimerHandle::Listener. */ + public: + void OnTimer(TimerHandle* timer) override; + private: void UserOnTransportConnected() override; void UserOnTransportDisconnected() override; @@ -73,6 +88,7 @@ namespace RTC void CreateRtpStream(); void RequestKeyFrame(); void EmitScore() const; + void ClearDegradation(bool sendDelayedPackets); /* Pure virtual methods inherited from RtpStreamSend::Listener. */ public: @@ -90,6 +106,10 @@ namespace RTC std::unique_ptr> rtpSeqManager; bool managingBitrate{ false }; std::unique_ptr encodingContext; + TimerHandle* degradationTimer{ nullptr }; + uint32_t delayMs{ 0 }; + std::deque delayedPacketItems; + TimerHandle* delayTimer{ nullptr }; }; } // namespace RTC diff --git a/worker/src/Channel/ChannelRequest.cpp b/worker/src/Channel/ChannelRequest.cpp index 82830bef7c..b449a68fe8 100644 --- a/worker/src/Channel/ChannelRequest.cpp +++ b/worker/src/Channel/ChannelRequest.cpp @@ -67,6 +67,7 @@ namespace Channel { FBS::Request::Method::CONSUMER_SET_PRIORITY, "consumer.setPriority" }, { FBS::Request::Method::CONSUMER_REQUEST_KEY_FRAME, "consumer.requestKeyFrame" }, { FBS::Request::Method::CONSUMER_ENABLE_TRACE_EVENT, "consumer.enableTraceEvent" }, + { FBS::Request::Method::CONSUMER_DEGRADE, "consumer.degrade" }, { FBS::Request::Method::DATAPRODUCER_DUMP, "dataProducer.dump" }, { FBS::Request::Method::DATAPRODUCER_GET_STATS, "dataProducer.getStats" }, { FBS::Request::Method::DATAPRODUCER_PAUSE, "dataProducer.pause" }, diff --git a/worker/src/RTC/SimpleConsumer.cpp b/worker/src/RTC/SimpleConsumer.cpp index 0c2c9ba176..a48a397a37 100644 --- a/worker/src/RTC/SimpleConsumer.cpp +++ b/worker/src/RTC/SimpleConsumer.cpp @@ -74,6 +74,8 @@ namespace RTC this->shared->channelMessageRegistrator->UnregisterHandler(this->id); delete this->rtpStream; + + ClearDegradation(/*sendDelayedPackets*/ false); } flatbuffers::Offset SimpleConsumer::FillBuffer( @@ -168,6 +170,44 @@ namespace RTC break; } + case Channel::ChannelRequest::Method::CONSUMER_DEGRADE: + { + const auto* body = request->data->body_as(); + uint16_t delayMs = body->delayMs(); + uint8_t lossPercent = body->lossPercent(); + uint32_t durationMs = body->durationMs(); + + MS_DUMP( + "applying consumer degradation [delayMs:%" PRIu16 ", lossPercent:%" PRIu8 + ", durationMs:%" PRIu32 "]", + delayMs, + lossPercent, + durationMs); + + // First clear everything and send already delayed packets. + ClearDegradation(/*sendDelayedPackets*/ true); + + if (durationMs == 0 || (delayMs == 0 && lossPercent == 0)) + { + MS_DUMP("consumer degradation disabled"); + + break; + } + + this->degradationTimer = new TimerHandle(this); + + this->degradationTimer->Start(durationMs); + + this->delayMs = delayMs; + + if (this->delayMs) + { + this->delayTimer = new TimerHandle(this); + } + + break; + } + default: { // Pass it to the parent class. @@ -176,6 +216,36 @@ namespace RTC } } + void SimpleConsumer::OnTimer(TimerHandle* timer) + { + MS_TRACE(); + + if (timer == this->degradationTimer) + { + // First clear everything and send already delayed packets. + ClearDegradation(/*sendDelayedPackets*/ true); + } + else if (timer == this->delayTimer && !this->delayedPacketItems.empty()) + { + auto [packet, arrivalTime] = this->delayedPacketItems.front(); + + // Rearm the delay timer. + if (this->delayedPacketItems.size() > 1) + { + std::shared_ptr sharedPacket; + const uint64_t timeoutMs = this->delayedPacketItems[1].arrivalTime - arrivalTime; + + this->delayTimer->Start(timeoutMs); + + MS_DUMP("(degradation) sending delayed packet [seq:%" PRIu16 "]", packet->GetSequenceNumber()); + + SendRtpPacket(packet, sharedPacket); + + delete packet; + } + } + } + void SimpleConsumer::ProducerRtpStream(RTC::RtpStreamRecv* rtpStream, uint32_t /*mappedSsrc*/) { MS_TRACE(); @@ -314,6 +384,33 @@ namespace RTC packet->logger.consumerId = this->id; #endif + if (this->delayTimer) + { + // This is the first packet in the delay queue, remove it from the queue + // and send it normally. + if (!this->delayedPacketItems.empty() && this->delayedPacketItems.front().packet == packet) + { + this->delayedPacketItems.pop_front(); + } + // Otherwise insert the packet in the delay queue. + else + { + MS_DUMP( + "(degradation) adding packet to delay queue [seq:%" PRIu16 "]", + packet->GetSequenceNumber()); + + this->delayedPacketItems.push_back({ packet->Clone(), DepLibUV::GetTimeMs() }); + + // Only arm the timer for the first packet. + if (this->delayedPacketItems.size() == 1) + { + this->delayTimer->Start(this->delayMs); + } + + return; + } + } + if (!IsActive()) { #ifdef MS_RTC_LOGGER_RTP @@ -747,6 +844,31 @@ namespace RTC notificationOffset); } + void SimpleConsumer::ClearDegradation(bool sendDelayedPackets) + { + MS_TRACE(); + + delete this->degradationTimer; + this->degradationTimer = nullptr; + + delete this->delayTimer; + this->delayTimer = nullptr; + + for (auto& item : this->delayedPacketItems) + { + if (sendDelayedPackets) + { + std::shared_ptr sharedPacket; + + SendRtpPacket(item.packet, sharedPacket); + } + + delete item.packet; + } + + this->delayedPacketItems.clear(); + } + inline void SimpleConsumer::OnRtpStreamScore( RTC::RtpStream* /*rtpStream*/, uint8_t /*score*/, uint8_t /*previousScore*/) { From 05d69fff5ac8fa065fedda7a800da80bf6174bce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Wed, 18 Jun 2025 14:22:06 +0200 Subject: [PATCH 2/7] add method to ConsumerTypes.ts --- node/src/Consumer.ts | 27 --------------------------- node/src/ConsumerTypes.ts | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 27 deletions(-) diff --git a/node/src/Consumer.ts b/node/src/Consumer.ts index edef981e7b..19da22f033 100644 --- a/node/src/Consumer.ts +++ b/node/src/Consumer.ts @@ -463,33 +463,6 @@ export class ConsumerImpl ); } - /** - * Degrade RTP transmission. - * - delayMs: The delay (in ms) to be applied to the first packet. - * - lossPercent: Generate packet loss by given percent value. - * - durationMs: Duration that the degradation will take. - * - * @remarks - * - Only implemented in `SimpleConsumer`. - * - After `durationMs`, or if `consumer.degrade()` is called again with - * `durationMs: 0`, then degradation is immediately stopped and all delayed - * buffered packets are immediately sent (all together). - * - * @throws - * - If called on a non `SimpleConsumer` (due to method not implemented). - * - * @todo - * - `lossPercent` not implemented yet. - * - * @example - * ```ts - * consumer.degrade({ - * delayMs: 3000, - * lossPercent: 0, - * durationMs: 10000 - * }); - * ``` - */ async degrade({ delayMs = 0, lossPercent = 0, diff --git a/node/src/ConsumerTypes.ts b/node/src/ConsumerTypes.ts index 74ca27615d..2f3bbfda87 100644 --- a/node/src/ConsumerTypes.ts +++ b/node/src/ConsumerTypes.ts @@ -389,4 +389,41 @@ export interface Consumer * Enable 'trace' event. */ enableTraceEvent(types?: ConsumerTraceEventType[]): Promise; + + /** + * Degrade RTP transmission. + * - delayMs: The delay (in ms) to be applied to the first packet. + * - lossPercent: Generate packet loss by given percent value. + * - durationMs: Duration that the degradation will take. + * + * @remarks + * - Only implemented in `SimpleConsumer`. + * - After `durationMs`, or if `consumer.degrade()` is called again with + * `durationMs: 0`, then degradation is immediately stopped and all delayed + * buffered packets are immediately sent (all together). + * + * @throws + * - If called on a non `SimpleConsumer` (due to method not implemented). + * + * @todo + * - `lossPercent` not implemented yet. + * + * @example + * ```ts + * consumer.degrade({ + * delayMs: 3000, + * lossPercent: 0, + * durationMs: 10000 + * }); + * ``` + */ + degrade({ + delayMs, + lossPercent, + durationMs, + }: { + delayMs?: number; + lossPercent?: number; + durationMs?: number; + }): Promise; } From b2da8fb5df29832595d7388a017b84f79532be5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Wed, 18 Jun 2025 14:34:24 +0200 Subject: [PATCH 3/7] don't retransmit packets that are delayed on purpose --- worker/src/RTC/SimpleConsumer.cpp | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/worker/src/RTC/SimpleConsumer.cpp b/worker/src/RTC/SimpleConsumer.cpp index a48a397a37..1a86696f19 100644 --- a/worker/src/RTC/SimpleConsumer.cpp +++ b/worker/src/RTC/SimpleConsumer.cpp @@ -191,20 +191,23 @@ namespace RTC { MS_DUMP("consumer degradation disabled"); + request->Accept(); + break; } this->degradationTimer = new TimerHandle(this); + this->delayMs = delayMs; this->degradationTimer->Start(durationMs); - this->delayMs = delayMs; - if (this->delayMs) { this->delayTimer = new TimerHandle(this); } + request->Accept(); + break; } @@ -854,7 +857,7 @@ namespace RTC delete this->delayTimer; this->delayTimer = nullptr; - for (auto& item : this->delayedPacketItems) + for (const auto& item : this->delayedPacketItems) { if (sendDelayedPackets) { @@ -883,6 +886,15 @@ namespace RTC { MS_TRACE(); + // Ignore the transmitted packet if it was delayed on purpose. + for (const auto& item : this->delayedPacketItems) + { + if (item.packet == packet) + { + return; + } + } + this->listener->OnConsumerRetransmitRtpPacket(this, packet); // May emit 'trace' event. From 3928cbc97761d5fe0c694ab3eaad50379ffbab9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Thu, 19 Jun 2025 15:29:26 +0300 Subject: [PATCH 4/7] improve it --- node/src/Consumer.ts | 48 +++++---- node/src/ConsumerTypes.ts | 20 ++-- worker/fbs/consumer.fbs | 5 +- worker/include/RTC/SimpleConsumer.hpp | 12 ++- worker/src/RTC/SimpleConsumer.cpp | 136 ++++++++++++++++++-------- 5 files changed, 147 insertions(+), 74 deletions(-) diff --git a/node/src/Consumer.ts b/node/src/Consumer.ts index 19da22f033..ef4f347fea 100644 --- a/node/src/Consumer.ts +++ b/node/src/Consumer.ts @@ -464,18 +464,32 @@ export class ConsumerImpl } async degrade({ - delayMs = 0, - lossPercent = 0, durationMs = 0, + maxDelayMs = 0, + delayPercent = 100, + lossPercent = 0, }: { - delayMs?: number; - lossPercent?: number; durationMs?: number; + maxDelayMs?: number; + delayPercent?: number; + lossPercent?: number; } = {}): Promise { - if (delayMs > Math.pow(2, 16) - 1) { - delayMs = Math.pow(2, 16) - 1; - } else if (delayMs < 0) { - delayMs = 0; + if (durationMs > Math.pow(2, 32) - 1) { + durationMs = Math.pow(2, 32) - 1; + } else if (durationMs < 0) { + durationMs = 0; + } + + if (maxDelayMs > Math.pow(2, 16) - 1) { + maxDelayMs = Math.pow(2, 16) - 1; + } else if (maxDelayMs < 0) { + maxDelayMs = 0; + } + + if (delayPercent > 100) { + delayPercent = 100; + } else if (delayPercent < 0) { + delayPercent = 0; } if (lossPercent > 100) { @@ -484,26 +498,22 @@ export class ConsumerImpl lossPercent = 0; } - if (durationMs > Math.pow(2, 32) - 1) { - durationMs = Math.pow(2, 32) - 1; - } else if (durationMs < 0) { - durationMs = 0; - } - if (durationMs === 0) { - delayMs = 0; + maxDelayMs = 0; + lossPercent = 0; lossPercent = 0; } logger.debug( - `degrade() [delayMs:${delayMs}, lossPercent:${lossPercent}, durationMs:${durationMs}]` + `degrade() [durationMs:${durationMs}, maxDelayMs:${maxDelayMs}, delayPercent:${delayPercent}, lossPercent:${lossPercent}]` ); /* Build Request. */ const requestOffset = new FbsConsumer.DegradeRequestT( - delayMs, - lossPercent, - durationMs + durationMs, + maxDelayMs, + delayPercent, + lossPercent ).pack(this.#channel.bufferBuilder); await this.#channel.request( diff --git a/node/src/ConsumerTypes.ts b/node/src/ConsumerTypes.ts index 2f3bbfda87..6a6119c892 100644 --- a/node/src/ConsumerTypes.ts +++ b/node/src/ConsumerTypes.ts @@ -392,9 +392,10 @@ export interface Consumer /** * Degrade RTP transmission. - * - delayMs: The delay (in ms) to be applied to the first packet. - * - lossPercent: Generate packet loss by given percent value. * - durationMs: Duration that the degradation will take. + * - maxDelayMs: Max delay (in ms) to be applied to each packet. + * - delayPercent: Only apply delay to this percent of the packets. + * - lossPercent: Generate packet loss by given percent value. * * @remarks * - Only implemented in `SimpleConsumer`. @@ -411,19 +412,22 @@ export interface Consumer * @example * ```ts * consumer.degrade({ - * delayMs: 3000, - * lossPercent: 0, * durationMs: 10000 + * maxDelayMs: 3000, + * delayPercent: 20, + * lossPercent: 0, * }); * ``` */ degrade({ - delayMs, - lossPercent, durationMs, + maxDelayMs, + delayPercent, + lossPercent, }: { - delayMs?: number; - lossPercent?: number; durationMs?: number; + maxDelayMs?: number; + delayPercent?: number; + lossPercent?: number; }): Promise; } diff --git a/worker/fbs/consumer.fbs b/worker/fbs/consumer.fbs index df97ad44b3..e145511ff2 100644 --- a/worker/fbs/consumer.fbs +++ b/worker/fbs/consumer.fbs @@ -45,9 +45,10 @@ table EnableTraceEventRequest { } table DegradeRequest { - delay_ms: uint16; - loss_percent: uint8; duration_ms: uint32; + max_delay_ms: uint16; + delay_percent: uint8; + loss_percent: uint8; } table DumpResponse { diff --git a/worker/include/RTC/SimpleConsumer.hpp b/worker/include/RTC/SimpleConsumer.hpp index e1f6f97fc7..f1396c3b50 100644 --- a/worker/include/RTC/SimpleConsumer.hpp +++ b/worker/include/RTC/SimpleConsumer.hpp @@ -6,7 +6,7 @@ #include "RTC/SeqManager.hpp" #include "RTC/Shared.hpp" #include "handles/TimerHandle.hpp" -#include +#include namespace RTC { @@ -18,7 +18,8 @@ namespace RTC struct DelayedPacketItem { RtpPacket* packet{ nullptr }; - uint64_t arrivalTime{ 0 }; + uint64_t arrivalTimeMs{ 0 }; + uint16_t delayMs{ 0 }; }; public: @@ -89,6 +90,7 @@ namespace RTC void RequestKeyFrame(); void EmitScore() const; void ClearDegradation(bool sendDelayedPackets); + bool ShouldDelayPacket(const RTC::RtpPacket* packet) const; /* Pure virtual methods inherited from RtpStreamSend::Listener. */ public: @@ -106,10 +108,12 @@ namespace RTC std::unique_ptr> rtpSeqManager; bool managingBitrate{ false }; std::unique_ptr encodingContext; + uint32_t maxDelayMs{ 0 }; + uint8_t delayPercent{ 0 }; + uint8_t lossPercent{ 0 }; TimerHandle* degradationTimer{ nullptr }; - uint32_t delayMs{ 0 }; - std::deque delayedPacketItems; TimerHandle* delayTimer{ nullptr }; + std::list delayedPacketItems; }; } // namespace RTC diff --git a/worker/src/RTC/SimpleConsumer.cpp b/worker/src/RTC/SimpleConsumer.cpp index 1a86696f19..6d782a7ceb 100644 --- a/worker/src/RTC/SimpleConsumer.cpp +++ b/worker/src/RTC/SimpleConsumer.cpp @@ -8,6 +8,9 @@ #include "Utils.hpp" #include "RTC/Codecs/Tools.hpp" #include "RTC/SimpleConsumer.hpp" +#include + +const uint64_t DEGRADATION_DELAY_CHECK_INTERVAL_MS = 15; namespace RTC { @@ -172,38 +175,46 @@ namespace RTC case Channel::ChannelRequest::Method::CONSUMER_DEGRADE: { - const auto* body = request->data->body_as(); - uint16_t delayMs = body->delayMs(); - uint8_t lossPercent = body->lossPercent(); - uint32_t durationMs = body->durationMs(); + const auto* body = request->data->body_as(); + uint32_t durationMs = body->durationMs(); + uint16_t maxDelayMs = body->maxDelayMs(); + uint8_t delayPercent = body->delayPercent(); + uint8_t lossPercent = body->lossPercent(); MS_DUMP( - "applying consumer degradation [delayMs:%" PRIu16 ", lossPercent:%" PRIu8 - ", durationMs:%" PRIu32 "]", - delayMs, - lossPercent, - durationMs); + "[DEGRADATION] applying consumer degradation [durationMs:%" PRIu32 ", maxDelayMs:%" PRIu16 + ", delayPercent:%" PRIu8 ", lossPercent:%" PRIu8 "]", + durationMs, + maxDelayMs, + delayPercent, + lossPercent); // First clear everything and send already delayed packets. ClearDegradation(/*sendDelayedPackets*/ true); - if (durationMs == 0 || (delayMs == 0 && lossPercent == 0)) + if (durationMs == 0) { - MS_DUMP("consumer degradation disabled"); + MS_DUMP("[DEGRADATION] consumer degradation disabled"); request->Accept(); break; } + this->maxDelayMs = maxDelayMs; + this->delayPercent = delayPercent; + this->lossPercent = lossPercent; this->degradationTimer = new TimerHandle(this); - this->delayMs = delayMs; this->degradationTimer->Start(durationMs); - if (this->delayMs) + if (this->maxDelayMs > 0 && this->delayPercent > 0) { this->delayTimer = new TimerHandle(this); + + // Check delayed packets every N ms. + this->delayTimer->Start( + DEGRADATION_DELAY_CHECK_INTERVAL_MS, DEGRADATION_DELAY_CHECK_INTERVAL_MS); } request->Accept(); @@ -225,26 +236,39 @@ namespace RTC if (timer == this->degradationTimer) { - // First clear everything and send already delayed packets. + // Clear everything and send already delayed packets. ClearDegradation(/*sendDelayedPackets*/ true); } - else if (timer == this->delayTimer && !this->delayedPacketItems.empty()) + else if (timer == this->delayTimer) { - auto [packet, arrivalTime] = this->delayedPacketItems.front(); + auto nowMs = DepLibUV::GetTimeMs(); - // Rearm the delay timer. - if (this->delayedPacketItems.size() > 1) + for (auto it = this->delayedPacketItems.begin(); it != this->delayedPacketItems.end();) { - std::shared_ptr sharedPacket; - const uint64_t timeoutMs = this->delayedPacketItems[1].arrivalTime - arrivalTime; + auto& item = *it; + + // Only send delayed packets whose arrival time + applied delay is less + // or equal than current time. Deleted the stored packet and remove the + // item from the list once the packet is sent. + if (item.arrivalTimeMs + item.delayMs <= nowMs) + { + std::shared_ptr sharedPacket; - this->delayTimer->Start(timeoutMs); + MS_DUMP( + "[DEGRADATION] sending delayed packet [seq:%" PRIu16 ", delayMs:%" PRIu16 "]", + item.packet->GetSequenceNumber(), + item.delayMs); - MS_DUMP("(degradation) sending delayed packet [seq:%" PRIu16 "]", packet->GetSequenceNumber()); + SendRtpPacket(item.packet, sharedPacket); - SendRtpPacket(packet, sharedPacket); + delete item.packet; - delete packet; + it = this->delayedPacketItems.erase(it); + } + else + { + ++it; + } } } } @@ -389,26 +413,17 @@ namespace RTC if (this->delayTimer) { - // This is the first packet in the delay queue, remove it from the queue - // and send it normally. - if (!this->delayedPacketItems.empty() && this->delayedPacketItems.front().packet == packet) + if (ShouldDelayPacket(packet)) { - this->delayedPacketItems.pop_front(); - } - // Otherwise insert the packet in the delay queue. - else - { - MS_DUMP( - "(degradation) adding packet to delay queue [seq:%" PRIu16 "]", - packet->GetSequenceNumber()); + auto nowMs = DepLibUV::GetTimeMs(); + auto delayMs = static_cast(Utils::Crypto::GetRandomUInt(0, this->maxDelayMs)); - this->delayedPacketItems.push_back({ packet->Clone(), DepLibUV::GetTimeMs() }); + MS_DUMP( + "[DEGRADATION] delaying packet [seq:%" PRIu16 ", delayMs:%" PRIu16 "]", + packet->GetSequenceNumber(), + delayMs); - // Only arm the timer for the first packet. - if (this->delayedPacketItems.size() == 1) - { - this->delayTimer->Start(this->delayMs); - } + this->delayedPacketItems.push_back({ packet->Clone(), nowMs, delayMs }); return; } @@ -851,6 +866,10 @@ namespace RTC { MS_TRACE(); + this->maxDelayMs = 0; + this->delayPercent = 0; + this->lossPercent = 0; + delete this->degradationTimer; this->degradationTimer = nullptr; @@ -863,6 +882,11 @@ namespace RTC { std::shared_ptr sharedPacket; + MS_DUMP( + "[DEGRADATION] terminated, sending delayed packet [seq:%" PRIu16 ", delayMs:%" PRIu16 "]", + item.packet->GetSequenceNumber(), + item.delayMs); + SendRtpPacket(item.packet, sharedPacket); } @@ -872,6 +896,36 @@ namespace RTC this->delayedPacketItems.clear(); } + bool SimpleConsumer::ShouldDelayPacket(const RTC::RtpPacket* packet) const + { + MS_TRACE(); + + static std::random_device rd; + static std::mt19937 gen(rd()); + static std::uniform_int_distribution dist(0, 99); + + if (!this->delayTimer) + { + return false; + } + + // Check if the packet is in the list of delayed packets, meaning that it's + // been sent from the onTimer() callback and hence must not be delayed + // again. + auto it = std::find_if( + this->delayedPacketItems.begin(), + this->delayedPacketItems.end(), + [&](const auto& item) { return item.packet == packet; }); + + if (it != this->delayedPacketItems.end()) + { + return false; + } + + // Take into account the given delay percent. + return dist(gen) < this->delayPercent; + } + inline void SimpleConsumer::OnRtpStreamScore( RTC::RtpStream* /*rtpStream*/, uint8_t /*score*/, uint8_t /*previousScore*/) { From a3ef7c39ee138a53c1092ad44b5fa1eba3c782a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Thu, 19 Jun 2025 15:38:48 +0300 Subject: [PATCH 5/7] implement lossPercent --- node/src/Consumer.ts | 2 +- node/src/ConsumerTypes.ts | 7 ++-- worker/include/RTC/SimpleConsumer.hpp | 1 + worker/src/RTC/SimpleConsumer.cpp | 52 ++++++++++++++++++++------- 4 files changed, 43 insertions(+), 19 deletions(-) diff --git a/node/src/Consumer.ts b/node/src/Consumer.ts index ef4f347fea..5462ded006 100644 --- a/node/src/Consumer.ts +++ b/node/src/Consumer.ts @@ -466,7 +466,7 @@ export class ConsumerImpl async degrade({ durationMs = 0, maxDelayMs = 0, - delayPercent = 100, + delayPercent = 0, lossPercent = 0, }: { durationMs?: number; diff --git a/node/src/ConsumerTypes.ts b/node/src/ConsumerTypes.ts index 6a6119c892..2105bd62c3 100644 --- a/node/src/ConsumerTypes.ts +++ b/node/src/ConsumerTypes.ts @@ -400,15 +400,12 @@ export interface Consumer * @remarks * - Only implemented in `SimpleConsumer`. * - After `durationMs`, or if `consumer.degrade()` is called again with - * `durationMs: 0`, then degradation is immediately stopped and all delayed - * buffered packets are immediately sent (all together). + * `durationMs: 0`, then degradation is immediately stopped and all delayed + * buffered packets are immediately sent (all together). * * @throws * - If called on a non `SimpleConsumer` (due to method not implemented). * - * @todo - * - `lossPercent` not implemented yet. - * * @example * ```ts * consumer.degrade({ diff --git a/worker/include/RTC/SimpleConsumer.hpp b/worker/include/RTC/SimpleConsumer.hpp index f1396c3b50..d4f65582de 100644 --- a/worker/include/RTC/SimpleConsumer.hpp +++ b/worker/include/RTC/SimpleConsumer.hpp @@ -91,6 +91,7 @@ namespace RTC void EmitScore() const; void ClearDegradation(bool sendDelayedPackets); bool ShouldDelayPacket(const RTC::RtpPacket* packet) const; + bool ShouldDropPacket(const RTC::RtpPacket* packet) const; /* Pure virtual methods inherited from RtpStreamSend::Listener. */ public: diff --git a/worker/src/RTC/SimpleConsumer.cpp b/worker/src/RTC/SimpleConsumer.cpp index 6d782a7ceb..5206966f73 100644 --- a/worker/src/RTC/SimpleConsumer.cpp +++ b/worker/src/RTC/SimpleConsumer.cpp @@ -411,22 +411,26 @@ namespace RTC packet->logger.consumerId = this->id; #endif - if (this->delayTimer) + if (ShouldDropPacket(packet)) { - if (ShouldDelayPacket(packet)) - { - auto nowMs = DepLibUV::GetTimeMs(); - auto delayMs = static_cast(Utils::Crypto::GetRandomUInt(0, this->maxDelayMs)); + MS_DUMP("[DEGRADATION] dropping packet [seq:%" PRIu16 "]", packet->GetSequenceNumber()); - MS_DUMP( - "[DEGRADATION] delaying packet [seq:%" PRIu16 ", delayMs:%" PRIu16 "]", - packet->GetSequenceNumber(), - delayMs); + return; + } - this->delayedPacketItems.push_back({ packet->Clone(), nowMs, delayMs }); + if (ShouldDelayPacket(packet)) + { + auto nowMs = DepLibUV::GetTimeMs(); + auto delayMs = static_cast(Utils::Crypto::GetRandomUInt(0, this->maxDelayMs)); - return; - } + MS_DUMP( + "[DEGRADATION] delaying packet [seq:%" PRIu16 ", delayMs:%" PRIu16 "]", + packet->GetSequenceNumber(), + delayMs); + + this->delayedPacketItems.push_back({ packet->Clone(), nowMs, delayMs }); + + return; } if (!IsActive()) @@ -904,6 +908,11 @@ namespace RTC static std::mt19937 gen(rd()); static std::uniform_int_distribution dist(0, 99); + if (!this->degradationTimer) + { + return false; + } + if (!this->delayTimer) { return false; @@ -922,10 +931,27 @@ namespace RTC return false; } - // Take into account the given delay percent. + // Take into account use given delay percent. return dist(gen) < this->delayPercent; } + bool SimpleConsumer::ShouldDropPacket(const RTC::RtpPacket* packet) const + { + MS_TRACE(); + + static std::random_device rd; + static std::mt19937 gen(rd()); + static std::uniform_int_distribution dist(0, 99); + + if (!this->degradationTimer) + { + return false; + } + + // Take into account user given loss percent. + return dist(gen) < this->lossPercent; + } + inline void SimpleConsumer::OnRtpStreamScore( RTC::RtpStream* /*rtpStream*/, uint8_t /*score*/, uint8_t /*previousScore*/) { From a5bb38625625d251e7d35e03e8930b9f0cc889ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Mon, 7 Jul 2025 10:53:18 +0200 Subject: [PATCH 6/7] merge v3 and adapt things --- worker/include/RTC/SimpleConsumer.hpp | 13 +-- worker/src/RTC/SimpleConsumer.cpp | 121 ++++++++++++++------------ 2 files changed, 74 insertions(+), 60 deletions(-) diff --git a/worker/include/RTC/SimpleConsumer.hpp b/worker/include/RTC/SimpleConsumer.hpp index dd60e82754..6627002c6e 100644 --- a/worker/include/RTC/SimpleConsumer.hpp +++ b/worker/include/RTC/SimpleConsumer.hpp @@ -18,8 +18,11 @@ namespace RTC private: struct DelayedPacketItem { - RtpPacket* packet{ nullptr }; + // Original packet. + RTC::SharedRtpPacket sharedPacket{ nullptr }; + // Arrival time of the original packet. uint64_t arrivalTimeMs{ 0 }; + // Delay applied to the packet. uint16_t delayMs{ 0 }; }; @@ -78,10 +81,6 @@ namespace RTC public: void HandleRequest(Channel::ChannelRequest* request) override; - /* Pure virtual methods inherited from TimerHandle::Listener. */ - public: - void OnTimer(TimerHandle* timer) override; - private: void UserOnTransportConnected() override; void UserOnTransportDisconnected() override; @@ -101,6 +100,10 @@ namespace RTC void OnRtpStreamScore(RTC::RtpStream* rtpStream, uint8_t score, uint8_t previousScore) override; void OnRtpStreamRetransmitRtpPacket(RTC::RtpStreamSend* rtpStream, RTC::RtpPacket* packet) override; + /* Pure virtual methods inherited from TimerHandle::Listener. */ + public: + void OnTimer(TimerHandle* timer) override; + private: // Allocated by this. RTC::RtpStreamSend* rtpStream{ nullptr }; diff --git a/worker/src/RTC/SimpleConsumer.cpp b/worker/src/RTC/SimpleConsumer.cpp index 43f00420f4..ad6abcd792 100644 --- a/worker/src/RTC/SimpleConsumer.cpp +++ b/worker/src/RTC/SimpleConsumer.cpp @@ -238,49 +238,6 @@ namespace RTC } } - void SimpleConsumer::OnTimer(TimerHandle* timer) - { - MS_TRACE(); - - if (timer == this->degradationTimer) - { - // Clear everything and send already delayed packets. - ClearDegradation(/*sendDelayedPackets*/ true); - } - else if (timer == this->delayTimer) - { - auto nowMs = DepLibUV::GetTimeMs(); - - for (auto it = this->delayedPacketItems.begin(); it != this->delayedPacketItems.end();) - { - auto& item = *it; - - // Only send delayed packets whose arrival time + applied delay is less - // or equal than current time. Deleted the stored packet and remove the - // item from the list once the packet is sent. - if (item.arrivalTimeMs + item.delayMs <= nowMs) - { - std::shared_ptr sharedPacket; - - MS_DUMP( - "[DEGRADATION] sending delayed packet [seq:%" PRIu16 ", delayMs:%" PRIu16 "]", - item.packet->GetSequenceNumber(), - item.delayMs); - - SendRtpPacket(item.packet, sharedPacket); - - delete item.packet; - - it = this->delayedPacketItems.erase(it); - } - else - { - ++it; - } - } - } - } - void SimpleConsumer::ProducerRtpStream(RTC::RtpStreamRecv* rtpStream, uint32_t /*mappedSsrc*/) { MS_TRACE(); @@ -436,7 +393,20 @@ namespace RTC packet->GetSequenceNumber(), delayMs); - this->delayedPacketItems.push_back({ packet->Clone(), nowMs, delayMs }); + // Only clone once and only if necessary. + if (!sharedPacket.HasPacket()) + { + sharedPacket.Assign(packet); + } + // Assert that, if sharedPacket was already filled, both packet and + // sharedPacket are the very same RTP packet. + else + { + sharedPacket.AssertSamePacket(packet); + } + + // Store original packet into the delay buffer. + this->delayedPacketItems.push_back({ sharedPacket, nowMs, delayMs }); return; } @@ -982,8 +952,8 @@ namespace RTC FBS::Notification::Body::Consumer_ScoreNotification, notificationOffset); } - - void SimpleConsumer::ClearDegradation(bool sendDelayedPackets) + + void SimpleConsumer::ClearDegradation(bool sendDelayedPackets) { MS_TRACE(); @@ -997,26 +967,25 @@ namespace RTC delete this->delayTimer; this->delayTimer = nullptr; - for (const auto& item : this->delayedPacketItems) + for (auto& item : this->delayedPacketItems) { if (sendDelayedPackets) { - std::shared_ptr sharedPacket; + auto& sharedPacket = item.sharedPacket; + auto* packet = sharedPacket.GetPacket(); MS_DUMP( "[DEGRADATION] terminated, sending delayed packet [seq:%" PRIu16 ", delayMs:%" PRIu16 "]", - item.packet->GetSequenceNumber(), + packet->GetSequenceNumber(), item.delayMs); - SendRtpPacket(item.packet, sharedPacket); + SendRtpPacket(packet, sharedPacket); } - - delete item.packet; } this->delayedPacketItems.clear(); } - + bool SimpleConsumer::ShouldDelayPacket(const RTC::RtpPacket* packet) const { MS_TRACE(); @@ -1041,7 +1010,7 @@ namespace RTC auto it = std::find_if( this->delayedPacketItems.begin(), this->delayedPacketItems.end(), - [&](const auto& item) { return item.packet == packet; }); + [&](const auto& item) { return item.sharedPacket.GetPacket() == packet; }); if (it != this->delayedPacketItems.end()) { @@ -1086,7 +1055,7 @@ namespace RTC // Ignore the transmitted packet if it was delayed on purpose. for (const auto& item : this->delayedPacketItems) { - if (item.packet == packet) + if (item.sharedPacket.GetPacket() == packet) { return; } @@ -1097,4 +1066,46 @@ namespace RTC // May emit 'trace' event. EmitTraceEventRtpAndKeyFrameTypes(packet, this->rtpStream->HasRtx()); } + + void SimpleConsumer::OnTimer(TimerHandle* timer) + { + MS_TRACE(); + + if (timer == this->degradationTimer) + { + // Clear everything and send already delayed packets. + ClearDegradation(/*sendDelayedPackets*/ true); + } + else if (timer == this->delayTimer) + { + auto nowMs = DepLibUV::GetTimeMs(); + + for (auto it = this->delayedPacketItems.begin(); it != this->delayedPacketItems.end();) + { + auto& item = *it; + + // Only send delayed packets whose arrival time + applied delay is less + // or equal than current time. Deleted the stored packet and remove the + // item from the list once the packet is sent. + if (item.arrivalTimeMs + item.delayMs <= nowMs) + { + auto& sharedPacket = item.sharedPacket; + auto* packet = sharedPacket.GetPacket(); + + MS_DUMP( + "[DEGRADATION] sending delayed packet [seq:%" PRIu16 ", delayMs:%" PRIu16 "]", + packet->GetSequenceNumber(), + item.delayMs); + + SendRtpPacket(packet, sharedPacket); + + it = this->delayedPacketItems.erase(it); + } + else + { + ++it; + } + } + } + } } // namespace RTC From c121185f07d175b34227ded3747e4a7e0a480032 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Mon, 7 Jul 2025 12:50:31 +0200 Subject: [PATCH 7/7] add Producer degradation (WIP) --- node/src/Producer.ts | 61 +++++++++++++++++++++++++++++++++++++++ node/src/ProducerTypes.ts | 34 ++++++++++++++++++++++ worker/fbs/producer.fbs | 7 +++++ worker/fbs/request.fbs | 2 ++ 4 files changed, 104 insertions(+) diff --git a/node/src/Producer.ts b/node/src/Producer.ts index 9420b8f8eb..0e8062503f 100644 --- a/node/src/Producer.ts +++ b/node/src/Producer.ts @@ -311,6 +311,67 @@ export class ProducerImpl ); } + async degrade({ + durationMs = 0, + maxDelayMs = 0, + delayPercent = 0, + lossPercent = 0, + }: { + durationMs?: number; + maxDelayMs?: number; + delayPercent?: number; + lossPercent?: number; + } = {}): Promise { + if (durationMs > Math.pow(2, 32) - 1) { + durationMs = Math.pow(2, 32) - 1; + } else if (durationMs < 0) { + durationMs = 0; + } + + if (maxDelayMs > Math.pow(2, 16) - 1) { + maxDelayMs = Math.pow(2, 16) - 1; + } else if (maxDelayMs < 0) { + maxDelayMs = 0; + } + + if (delayPercent > 100) { + delayPercent = 100; + } else if (delayPercent < 0) { + delayPercent = 0; + } + + if (lossPercent > 100) { + lossPercent = 100; + } else if (lossPercent < 0) { + lossPercent = 0; + } + + if (durationMs === 0) { + maxDelayMs = 0; + lossPercent = 0; + lossPercent = 0; + } + + logger.debug( + `degrade() [durationMs:${durationMs}, maxDelayMs:${maxDelayMs}, delayPercent:${delayPercent}, lossPercent:${lossPercent}]` + ); + + /* Build Request. */ + const requestOffset = new FbsProducer.DegradeRequestT( + durationMs, + maxDelayMs, + delayPercent, + lossPercent + ).pack(this.#channel.bufferBuilder); + + await this.#channel.request( + FbsRequest.Method.CONSUMER_DEGRADE, + FbsRequest.Body.Consumer_DegradeRequest, + requestOffset, + this.#internal.producerId + ); + } + send(rtpPacket: Buffer): void { if (!Buffer.isBuffer(rtpPacket)) { throw new TypeError('rtpPacket must be a Buffer'); diff --git a/node/src/ProducerTypes.ts b/node/src/ProducerTypes.ts index 79c9df3bf6..98e420fe57 100644 --- a/node/src/ProducerTypes.ts +++ b/node/src/ProducerTypes.ts @@ -249,4 +249,38 @@ export interface Producer * Send RTP packet (just valid for Producers created on a DirectTransport). */ send(rtpPacket: Buffer): void; + + /** + * Degrade RTP transmission. + * - durationMs: Duration that the degradation will take. + * - maxDelayMs: Max delay (in ms) to be applied to each packet. + * - delayPercent: Only apply delay to this percent of the packets. + * - lossPercent: Generate packet loss by given percent value. + * + * @remarks + * - After `durationMs`, or if `producer.degrade()` is called again with + * `durationMs: 0`, then degradation is immediately stopped and all delayed + * buffered packets are immediately sent (all together). + * + * @example + * ```ts + * producer.degrade({ + * durationMs: 10000 + * maxDelayMs: 3000, + * delayPercent: 20, + * lossPercent: 0, + * }); + * ``` + */ + degrade({ + durationMs, + maxDelayMs, + delayPercent, + lossPercent, + }: { + durationMs?: number; + maxDelayMs?: number; + delayPercent?: number; + lossPercent?: number; + }): Promise; } diff --git a/worker/fbs/producer.fbs b/worker/fbs/producer.fbs index 8913cb9222..970d6e5d8b 100644 --- a/worker/fbs/producer.fbs +++ b/worker/fbs/producer.fbs @@ -18,6 +18,13 @@ table EnableTraceEventRequest { events: [TraceEventType] (required); } +table DegradeRequest { + duration_ms: uint32; + max_delay_ms: uint16; + delay_percent: uint8; + loss_percent: uint8; +} + table DumpResponse { id: string (required); kind: FBS.RtpParameters.MediaKind; diff --git a/worker/fbs/request.fbs b/worker/fbs/request.fbs index 7c0126b120..939693caeb 100644 --- a/worker/fbs/request.fbs +++ b/worker/fbs/request.fbs @@ -52,6 +52,7 @@ enum Method: uint8 { PRODUCER_PAUSE, PRODUCER_RESUME, PRODUCER_ENABLE_TRACE_EVENT, + PRODUCER_DEGRADE, CONSUMER_DUMP, CONSUMER_GET_STATS, CONSUMER_PAUSE, @@ -111,6 +112,7 @@ union Body { PipeTransport_ConnectRequest: FBS.PipeTransport.ConnectRequest, WebRtcTransport_ConnectRequest: FBS.WebRtcTransport.ConnectRequest, Producer_EnableTraceEventRequest: FBS.Producer.EnableTraceEventRequest, + Producer_DegradeRequest: FBS.Producer.DegradeRequest, Consumer_SetPreferredLayersRequest: FBS.Consumer.SetPreferredLayersRequest, Consumer_SetPriorityRequest: FBS.Consumer.SetPriorityRequest, Consumer_EnableTraceEventRequest: FBS.Consumer.EnableTraceEventRequest,