diff --git a/node/src/test/test-Consumer.ts b/node/src/test/test-Consumer.ts index b486dc6309..89d306d374 100644 --- a/node/src/test/test-Consumer.ts +++ b/node/src/test/test-Consumer.ts @@ -307,7 +307,10 @@ test('transport.consume() succeeds', async () => { producerScore: 0, producerScores: [0], }); - expect(audioConsumer.preferredLayers).toBeUndefined(); + expect(audioConsumer.preferredLayers).toEqual({ + spatialLayer: 0, + temporalLayer: 0, + }); expect(audioConsumer.currentLayers).toBeUndefined(); expect(audioConsumer.appData).toEqual({ baz: 'LOL' }); @@ -879,7 +882,10 @@ test('consumer.setPreferredLayers() succeed', async () => { await audioConsumer.setPreferredLayers({ spatialLayer: 1, temporalLayer: 1 }); - expect(audioConsumer.preferredLayers).toBeUndefined(); + expect(audioConsumer.preferredLayers).toEqual({ + spatialLayer: 0, + temporalLayer: 0, + }); await videoConsumer.setPreferredLayers({ spatialLayer: 2, temporalLayer: 3 }); diff --git a/node/src/test/test-Producer.ts b/node/src/test/test-Producer.ts index a75aa67f36..8689929032 100644 --- a/node/src/test/test-Producer.ts +++ b/node/src/test/test-Producer.ts @@ -14,7 +14,8 @@ import * as FbsProducer from '../fbs/producer'; type TestContext = { mediaCodecs: mediasoup.types.RouterRtpCodecCapability[]; - audioProducerOptions: mediasoup.types.ProducerOptions; + audioProducerOptions1: mediasoup.types.ProducerOptions; + audioProducerOptions2: mediasoup.types.ProducerOptions; videoProducerOptions: mediasoup.types.ProducerOptions; worker?: mediasoup.types.Worker; router?: mediasoup.types.Router; @@ -33,6 +34,11 @@ const ctx: TestContext = { foo: '111', }, }, + { + kind: 'audio', + mimeType: 'audio/PCMA', + clockRate: 8000, + }, { kind: 'video', mimeType: 'video/VP8', @@ -51,7 +57,7 @@ const ctx: TestContext = { rtcpFeedback: [], // Will be ignored. }, ]), - audioProducerOptions: utils.deepFreeze({ + audioProducerOptions1: utils.deepFreeze({ kind: 'audio', rtpParameters: { mid: 'AUDIO', @@ -81,11 +87,34 @@ const ctx: TestContext = { ], // Missing encodings on purpose. rtcp: { - cname: 'audio-1', + cname: 'audio-cname', }, }, appData: { foo: 1, bar: '2' }, }), + audioProducerOptions2: utils.deepFreeze({ + kind: 'audio', + rtpParameters: { + mid: 'AUDIO-2', + codecs: [ + { + mimeType: 'audio/PCMA', + payloadType: 1, + clockRate: 8000, + }, + ], + headerExtensions: [ + { + uri: 'urn:ietf:params:rtp-hdrext:sdes:mid', + id: 10, + }, + ], + encodings: [{ ssrc: 20000000 }], + rtcp: { + cname: 'audio-cname', + }, + }, + }), videoProducerOptions: utils.deepFreeze({ kind: 'video', rtpParameters: { @@ -129,7 +158,7 @@ const ctx: TestContext = { { ssrc: 22222228, rtx: { ssrc: 22222229 } }, ], rtcp: { - cname: 'video-1', + cname: 'video-cname', }, }, appData: { foo: 1, bar: '2' }, @@ -158,33 +187,53 @@ afterEach(async () => { test('webRtcTransport1.produce() succeeds', async () => { const onObserverNewProducer = jest.fn(); - ctx.webRtcTransport1!.observer.once('newproducer', onObserverNewProducer); + ctx.webRtcTransport1!.observer.on('newproducer', onObserverNewProducer); - const audioProducer = await ctx.webRtcTransport1!.produce( - ctx.audioProducerOptions + const audioProducer1 = await ctx.webRtcTransport1!.produce( + ctx.audioProducerOptions1 ); - expect(onObserverNewProducer).toHaveBeenCalledTimes(1); - expect(onObserverNewProducer).toHaveBeenCalledWith(audioProducer); - expect(typeof audioProducer.id).toBe('string'); - expect(audioProducer.closed).toBe(false); - expect(audioProducer.kind).toBe('audio'); - expect(typeof audioProducer.rtpParameters).toBe('object'); - expect(audioProducer.type).toBe('simple'); + const audioProducer2 = await ctx.webRtcTransport1!.produce( + ctx.audioProducerOptions2 + ); + + expect(onObserverNewProducer).toHaveBeenCalledTimes(2); + expect(onObserverNewProducer).toHaveBeenCalledWith(audioProducer1); + expect(onObserverNewProducer).toHaveBeenCalledWith(audioProducer2); + + expect(typeof audioProducer1.id).toBe('string'); + expect(audioProducer1.closed).toBe(false); + expect(audioProducer1.kind).toBe('audio'); + expect(typeof audioProducer1.rtpParameters).toBe('object'); + expect(audioProducer1.type).toBe('simple'); // Private API. - expect(typeof audioProducer.consumableRtpParameters).toBe('object'); - expect(audioProducer.paused).toBe(false); - expect(audioProducer.score).toEqual([]); - expect(audioProducer.appData).toEqual({ foo: 1, bar: '2' }); + expect(typeof audioProducer1.consumableRtpParameters).toBe('object'); + expect(audioProducer1.paused).toBe(false); + expect(audioProducer1.score).toEqual([]); + expect(audioProducer1.appData).toEqual({ foo: 1, bar: '2' }); + + expect(typeof audioProducer2.id).toBe('string'); + expect(audioProducer2.closed).toBe(false); + expect(audioProducer2.kind).toBe('audio'); + expect(typeof audioProducer2.rtpParameters).toBe('object'); + expect(audioProducer2.type).toBe('simple'); + // Private API. + expect(typeof audioProducer2.consumableRtpParameters).toBe('object'); + expect(audioProducer2.paused).toBe(false); + expect(audioProducer2.score).toEqual([]); + expect(audioProducer2.appData).toEqual({}); await expect(ctx.router!.dump()).resolves.toMatchObject({ - mapProducerIdConsumerIds: [{ key: audioProducer.id, values: [] }], + mapProducerIdConsumerIds: expect.arrayContaining([ + { key: audioProducer1.id, values: [] }, + { key: audioProducer2.id, values: [] }, + ]), mapConsumerIdProducerId: [], }); await expect(ctx.webRtcTransport1!.dump()).resolves.toMatchObject({ id: ctx.webRtcTransport1!.id, - producerIds: [audioProducer.id], + producerIds: expect.arrayContaining([audioProducer1.id, audioProducer2.id]), consumerIds: [], }); }, 2000); @@ -489,7 +538,7 @@ test('transport.produce() with no MID and with single encoding without RID or SS test('producer.dump() succeeds', async () => { const audioProducer = await ctx.webRtcTransport1!.produce( - ctx.audioProducerOptions + ctx.audioProducerOptions1 ); const dump1 = await audioProducer.dump(); @@ -599,7 +648,7 @@ test('producer.dump() succeeds', async () => { test('producer.getStats() succeeds', async () => { const audioProducer = await ctx.webRtcTransport1!.produce( - ctx.audioProducerOptions + ctx.audioProducerOptions1 ); const videoProducer = await ctx.webRtcTransport2!.produce( @@ -613,7 +662,7 @@ test('producer.getStats() succeeds', async () => { test('producer.pause() and resume() succeed', async () => { const audioProducer = await ctx.webRtcTransport1!.produce( - ctx.audioProducerOptions + ctx.audioProducerOptions1 ); const onObserverPause = jest.fn(); @@ -647,7 +696,7 @@ test('producer.pause() and resume() succeed', async () => { test('producer.pause() and resume() emit events', async () => { const audioProducer = await ctx.webRtcTransport1!.produce( - ctx.audioProducerOptions + ctx.audioProducerOptions1 ); const promises = []; @@ -672,7 +721,7 @@ test('producer.pause() and resume() emit events', async () => { test('producer.enableTraceEvent() succeed', async () => { const audioProducer = await ctx.webRtcTransport1!.produce( - ctx.audioProducerOptions + ctx.audioProducerOptions1 ); await audioProducer.enableTraceEvent(['rtp', 'pli']); @@ -705,7 +754,7 @@ test('producer.enableTraceEvent() succeed', async () => { test('producer.enableTraceEvent() with wrong arguments rejects with TypeError', async () => { const audioProducer = await ctx.webRtcTransport1!.produce( - ctx.audioProducerOptions + ctx.audioProducerOptions1 ); // @ts-expect-error --- Testing purposes. @@ -776,7 +825,7 @@ test('Producer emits "score"', async () => { test('producer.close() succeeds', async () => { const audioProducer = await ctx.webRtcTransport1!.produce( - ctx.audioProducerOptions + ctx.audioProducerOptions1 ); const onObserverClose = jest.fn(); @@ -801,7 +850,7 @@ test('producer.close() succeeds', async () => { test('Producer methods reject if closed', async () => { const audioProducer = await ctx.webRtcTransport1!.produce( - ctx.audioProducerOptions + ctx.audioProducerOptions1 ); audioProducer.close(); diff --git a/rust/tests/integration/consumer.rs b/rust/tests/integration/consumer.rs index 3dbd4e51ce..f959cdba1e 100644 --- a/rust/tests/integration/consumer.rs +++ b/rust/tests/integration/consumer.rs @@ -474,7 +474,13 @@ fn consume_succeeds() { producer_scores: vec![0] } ); - assert_eq!(audio_consumer.preferred_layers(), None); + assert_eq!( + audio_consumer.preferred_layers(), + Some(ConsumerLayers { + spatial_layer: 0, + temporal_layer: Some(0) + }) + ); assert_eq!(audio_consumer.current_layers(), None); assert_eq!( audio_consumer @@ -1320,7 +1326,13 @@ fn set_preferred_layers_succeeds() { .await .expect("Failed to set preferred layers consumer"); - assert_eq!(audio_consumer.preferred_layers(), None); + assert_eq!( + audio_consumer.preferred_layers(), + Some(ConsumerLayers { + spatial_layer: 0, + temporal_layer: Some(0) + }) + ); } { diff --git a/worker/include/RTC/Consumer.hpp b/worker/include/RTC/Consumer.hpp index faf9526fec..a16c325cc2 100644 --- a/worker/include/RTC/Consumer.hpp +++ b/worker/include/RTC/Consumer.hpp @@ -5,6 +5,7 @@ #include "Channel/ChannelRequest.hpp" #include "Channel/ChannelSocket.hpp" #include "FBS/consumer.h" +#include "FBS/transport.h" #include "RTC/ConsumerTypes.hpp" #include "RTC/RTCP/CompoundPacket.hpp" #include "RTC/RTCP/FeedbackRtpNack.hpp" @@ -132,7 +133,7 @@ namespace RTC RTC::RTP::RtpStreamRecv* rtpStream, uint8_t score, uint8_t previousScore) = 0; virtual void ProducerRtcpSenderReport(RTC::RTP::RtpStreamRecv* rtpStream, bool first) = 0; void ProducerClosed(); - void SetExternallyManagedBitrate() + virtual void SetExternallyManagedBitrate() { this->externallyManagedBitrate = true; } diff --git a/worker/include/RTC/ConsumerTypes.hpp b/worker/include/RTC/ConsumerTypes.hpp index 0544167cbd..54f6ffd853 100644 --- a/worker/include/RTC/ConsumerTypes.hpp +++ b/worker/include/RTC/ConsumerTypes.hpp @@ -19,6 +19,7 @@ namespace RTC } VideoLayers(const VideoLayers& other) = default; + bool operator==(const VideoLayers& other) const { return spatial == other.spatial && temporal == other.temporal; @@ -29,6 +30,8 @@ namespace RTC return !(*this == other); } + void Dump(int indentation) const; + void Reset() { spatial = -1; diff --git a/worker/include/RTC/SimulcastConsumer.hpp b/worker/include/RTC/MultiStreamConsumer.hpp similarity index 91% rename from worker/include/RTC/SimulcastConsumer.hpp rename to worker/include/RTC/MultiStreamConsumer.hpp index c63d7c8254..c23ee2d0a3 100644 --- a/worker/include/RTC/SimulcastConsumer.hpp +++ b/worker/include/RTC/MultiStreamConsumer.hpp @@ -1,5 +1,5 @@ -#ifndef MS_RTC_SIMULCAST_CONSUMER_HPP -#define MS_RTC_SIMULCAST_CONSUMER_HPP +#ifndef MS_RTC_MULTI_STREAM_CONSUMER_HPP +#define MS_RTC_MULTI_STREAM_CONSUMER_HPP #include "FBS/consumer.h" #include "RTC/Consumer.hpp" @@ -10,16 +10,17 @@ namespace RTC { - class SimulcastConsumer : public RTC::Consumer, public RTC::RTP::RtpStreamSend::Listener + class MultiStreamConsumer : public RTC::Consumer, public RTC::RTP::RtpStreamSend::Listener { public: - SimulcastConsumer( + MultiStreamConsumer( RTC::Shared* shared, const std::string& id, const std::string& producerId, + RTC::RtpParameters::Type type, RTC::Consumer::Listener* listener, const FBS::Transport::ConsumeRequest* data); - ~SimulcastConsumer() override; + ~MultiStreamConsumer() override; public: flatbuffers::Offset FillBuffer( @@ -60,6 +61,14 @@ namespace RTC void ProducerRtpStreamScore( RTC::RTP::RtpStreamRecv* rtpStream, uint8_t score, uint8_t previousScore) override; void ProducerRtcpSenderReport(RTC::RTP::RtpStreamRecv* rtpStream, bool first) override; + void SetExternallyManagedBitrate() override + { + // Only allow externally managed bitrate video. + if (this->kind == RTC::Media::Kind::VIDEO) + { + this->externallyManagedBitrate = true; + } + } uint8_t GetBitratePriority() const override; uint32_t IncreaseLayer(uint32_t bitrate, bool considerLoss) override; void ApplyLayers() override; @@ -116,6 +125,7 @@ namespace RTC absl::flat_hash_map mapMappedSsrcSpatialLayer; std::vector rtpStreams; std::vector producerRtpStreams; // Indexed by spatial layer. + bool keyFrameSupported{ false }; bool syncRequired{ false }; int16_t spatialLayerToSync{ -1 }; bool lastSentPacketHasMarker{ false }; diff --git a/worker/include/RTC/SimpleConsumer.hpp b/worker/include/RTC/SimpleConsumer.hpp deleted file mode 100644 index 83b8f9c735..0000000000 --- a/worker/include/RTC/SimpleConsumer.hpp +++ /dev/null @@ -1,104 +0,0 @@ -#ifndef MS_RTC_SIMPLE_CONSUMER_HPP -#define MS_RTC_SIMPLE_CONSUMER_HPP - -#include "FBS/transport.h" -#include "RTC/Consumer.hpp" -#include "RTC/SeqManager.hpp" -#include "RTC/Shared.hpp" -#include - -namespace RTC -{ - class SimpleConsumer : public RTC::Consumer, public RTC::RTP::RtpStreamSend::Listener - { - public: - SimpleConsumer( - RTC::Shared* shared, - const std::string& id, - const std::string& producerId, - RTC::Consumer::Listener* listener, - const FBS::Transport::ConsumeRequest* data); - ~SimpleConsumer() override; - - public: - flatbuffers::Offset FillBuffer( - flatbuffers::FlatBufferBuilder& builder) const; - flatbuffers::Offset FillBufferStats( - flatbuffers::FlatBufferBuilder& builder) override; - flatbuffers::Offset FillBufferScore( - flatbuffers::FlatBufferBuilder& builder) const override; - bool IsActive() const override - { - // clang-format off - return ( - RTC::Consumer::IsActive() && - this->producerRtpStream && - // If there is no RTP inactivity check do not consider the stream - // inactive despite it has score 0. - (this->producerRtpStream->GetScore() > 0u || !this->producerRtpStream->HasRtpInactivityCheckEnabled()) - ); - // clang-format on - } - void ProducerRtpStream(RTC::RTP::RtpStreamRecv* rtpStream, uint32_t mappedSsrc) override; - void ProducerNewRtpStream(RTC::RTP::RtpStreamRecv* rtpStream, uint32_t mappedSsrc) override; - void ProducerRtpStreamScore( - RTC::RTP::RtpStreamRecv* rtpStream, uint8_t score, uint8_t previousScore) override; - void ProducerRtcpSenderReport(RTC::RTP::RtpStreamRecv* rtpStream, bool first) override; - uint8_t GetBitratePriority() const override; - uint32_t IncreaseLayer(uint32_t bitrate, bool considerLoss) override; - void ApplyLayers() override; - uint32_t GetDesiredBitrate() const override; - void SendRtpPacket(RTC::RTP::Packet* packet, RTC::RTP::SharedPacket& sharedPacket) override; - const std::vector& GetRtpStreams() const override - { - return this->rtpStreams; - } - bool GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs) override; - void NeedWorstRemoteFractionLost(uint32_t mappedSsrc, uint8_t& worstRemoteFractionLost) override; - void ReceiveNack(RTC::RTCP::FeedbackRtpNackPacket* nackPacket) override; - void ReceiveKeyFrameRequest(RTC::RTCP::FeedbackPs::MessageType messageType, uint32_t ssrc) override; - void ReceiveRtcpReceiverReport(RTC::RTCP::ReceiverReport* report) override; - void ReceiveRtcpXrReceiverReferenceTime(RTC::RTCP::ReceiverReferenceTime* report) override; - uint32_t GetTransmissionRate(uint64_t nowMs) override; - float GetRtt() const override; - - /* Methods inherited from Channel::ChannelSocket::RequestHandler. */ - public: - void HandleRequest(Channel::ChannelRequest* request) override; - - private: - void UserOnTransportConnected() override; - void UserOnTransportDisconnected() override; - void UserOnPaused() override; - void UserOnResumed() override; - void CreateRtpStream(); - void RequestKeyFrame(); - void StorePacketInTargetLayerRetransmissionBuffer( - RTC::RTP::Packet* packet, RTC::RTP::SharedPacket& sharedPacket); - void EmitScore() const; - - /* Pure virtual methods inherited from RtpStreamSend::Listener. */ - public: - void OnRtpStreamScore(RTC::RTP::RtpStream* rtpStream, uint8_t score, uint8_t previousScore) override; - void OnRtpStreamRetransmitRtpPacket( - RTC::RTP::RtpStreamSend* rtpStream, RTC::RTP::Packet* packet) override; - - private: - // Allocated by this. - RTC::RTP::RtpStreamSend* rtpStream{ nullptr }; - // Others. - std::vector rtpStreams; - RTC::RTP::RtpStreamRecv* producerRtpStream{ nullptr }; - bool keyFrameSupported{ false }; - bool syncRequired{ false }; - RTC::SeqManager rtpSeqManager; - bool managingBitrate{ false }; - std::unique_ptr encodingContext; - // Buffer to store packets that arrive earlier than the first packet of the - // video key frame. - std::map::SeqLowerThan> - targetLayerRetransmissionBuffer; - }; -} // namespace RTC - -#endif diff --git a/worker/meson.build b/worker/meson.build index cb48c93bf3..ad0cf99aed 100644 --- a/worker/meson.build +++ b/worker/meson.build @@ -104,6 +104,7 @@ common_sources = [ 'src/RTC/ActiveSpeakerObserver.cpp', 'src/RTC/AudioLevelObserver.cpp', 'src/RTC/Consumer.cpp', + 'src/RTC/ConsumerTypes.cpp', 'src/RTC/DataConsumer.cpp', 'src/RTC/DataProducer.cpp', 'src/RTC/DirectTransport.cpp', @@ -126,8 +127,7 @@ common_sources = [ 'src/RTC/SeqManager.cpp', 'src/RTC/Serializable.cpp', 'src/RTC/Shared.cpp', - 'src/RTC/SimpleConsumer.cpp', - 'src/RTC/SimulcastConsumer.cpp', + 'src/RTC/MultiStreamConsumer.cpp', 'src/RTC/SrtpSession.cpp', 'src/RTC/SvcConsumer.cpp', 'src/RTC/TcpConnection.cpp', @@ -410,7 +410,7 @@ test_sources = [ 'test/src/RTC/TestRateCalculator.cpp', 'test/src/RTC/TestRtpEncodingParameters.cpp', 'test/src/RTC/TestSeqManager.cpp', - 'test/src/RTC/TestSimpleConsumer.cpp', + 'test/src/RTC/TestMultiStreamConsumer.cpp', 'test/src/RTC/TestTransportCongestionControlServer.cpp', 'test/src/RTC/TestTransportTuple.cpp', 'test/src/RTC/TestTrendCalculator.cpp', diff --git a/worker/src/RTC/ConsumerTypes.cpp b/worker/src/RTC/ConsumerTypes.cpp new file mode 100644 index 0000000000..f465359e76 --- /dev/null +++ b/worker/src/RTC/ConsumerTypes.cpp @@ -0,0 +1,21 @@ +#define MS_CLASS "RTC::ConsumerTypes" +// #define MS_LOG_DEV_LEVEL 3 + +#include "RTC/ConsumerTypes.hpp" +#include "Logger.hpp" + +namespace RTC +{ + namespace ConsumerTypes + { + void VideoLayers::Dump(int indentation) const + { + MS_TRACE(); + + MS_DUMP_CLEAN(indentation, ""); + MS_DUMP_CLEAN( + indentation, " spatial:%" PRIi16 ", temporal:%" PRIi16, this->spatial, this->temporal); + MS_DUMP_CLEAN(indentation, ""); + } + } // namespace ConsumerTypes +} // namespace RTC diff --git a/worker/src/RTC/SimulcastConsumer.cpp b/worker/src/RTC/MultiStreamConsumer.cpp similarity index 80% rename from worker/src/RTC/SimulcastConsumer.cpp rename to worker/src/RTC/MultiStreamConsumer.cpp index ccde25bd08..6917280249 100644 --- a/worker/src/RTC/SimulcastConsumer.cpp +++ b/worker/src/RTC/MultiStreamConsumer.cpp @@ -1,7 +1,8 @@ -#define MS_CLASS "RTC::SimulcastConsumer" -// #define MS_LOG_DEV_LEVEL 3 +#define MS_CLASS "RTC::MultiStreamConsumer" +// TODO +#define MS_LOG_DEV_LEVEL 3 -#include "RTC/SimulcastConsumer.hpp" +#include "RTC/MultiStreamConsumer.hpp" #include "DepLibUV.hpp" #include "Logger.hpp" #include "MediaSoupErrors.hpp" @@ -24,23 +25,29 @@ namespace RTC /* Instance methods. */ - SimulcastConsumer::SimulcastConsumer( + MultiStreamConsumer::MultiStreamConsumer( RTC::Shared* shared, const std::string& id, const std::string& producerId, + RTC::RtpParameters::Type type, RTC::Consumer::Listener* listener, const FBS::Transport::ConsumeRequest* data) - : RTC::Consumer::Consumer( - shared, id, producerId, listener, data, RTC::RtpParameters::Type::SIMULCAST) + : RTC::Consumer::Consumer(shared, id, producerId, listener, data, type) { MS_TRACE(); + MS_ASSERT( + this->type == RTC::RtpParameters::Type::SIMPLE || + this->type == RTC::RtpParameters::Type::SIMULCAST, + "type must be SIMPLE or SIMULCAST"); + // We allow a single encoding in simulcast (so we can enable temporal layers // with a single simulcast stream). // NOTE: No need to check this->consumableRtpEncodings.size() > 0 here since // it's already done in Consumer constructor. - auto& encoding = this->rtpParameters.encodings[0]; + auto& encoding = this->rtpParameters.encodings[0]; + const auto* mediaCodec = this->rtpParameters.GetCodecForEncoding(encoding); // Ensure there are as many spatial layers as encodings. if (encoding.spatialLayers != this->consumableRtpEncodings.size()) @@ -48,6 +55,12 @@ namespace RTC MS_THROW_TYPE_ERROR("encoding.spatialLayers does not match number of consumableRtpEncodings"); } + if (!RTC::RTP::Codecs::Tools::IsValidTypeForCodec(this->type, mediaCodec->mimeType)) + { + MS_THROW_TYPE_ERROR( + "%s codec not supported for simulcast", mediaCodec->mimeType.ToString().c_str()); + } + // Fill mapMappedSsrcSpatialLayer. for (size_t idx{ 0u }; idx < this->consumableRtpEncodings.size(); ++idx) { @@ -56,6 +69,8 @@ namespace RTC this->mapMappedSsrcSpatialLayer[encoding.ssrc] = static_cast(idx); } + this->keyFrameSupported = RTC::RTP::Codecs::Tools::CanBeKeyFrame(mediaCodec->mimeType); + // Set preferredLayers (if given). if (flatbuffers::IsFieldPresent(data, FBS::Transport::ConsumeRequest::VT_PREFERREDLAYERS)) { @@ -96,15 +111,6 @@ namespace RTC this->producerRtpStreams.insert( this->producerRtpStreams.begin(), this->consumableRtpEncodings.size(), nullptr); - // Create the encoding context. - const auto* mediaCodec = this->rtpParameters.GetCodecForEncoding(encoding); - - if (!RTC::RTP::Codecs::Tools::IsValidTypeForCodec(this->type, mediaCodec->mimeType)) - { - MS_THROW_TYPE_ERROR( - "%s codec not supported for simulcast", mediaCodec->mimeType.ToString().c_str()); - } - // Let's chosee an initial output seq number between 1000 and 32768 to avoid // libsrtp bug: // https://github.com/versatica/mediasoup/issues/1437 @@ -113,6 +119,7 @@ namespace RTC this->rtpSeqManager = RTC::SeqManager(initialOutputSeq); + // Create the encoding context. RTC::RTP::Codecs::EncodingContext::Params params; params.spatialLayers = encoding.spatialLayers; @@ -121,7 +128,15 @@ namespace RTC this->encodingContext.reset( RTC::RTP::Codecs::Tools::GetEncodingContext(mediaCodec->mimeType, params)); - MS_ASSERT(this->encodingContext, "no encoding context for this codec"); + // Specific for Opus codec. + if ( + this->encodingContext && mediaCodec->mimeType.type == RTC::RtpCodecMimeType::Type::AUDIO && + (mediaCodec->mimeType.subtype == RTC::RtpCodecMimeType::Subtype::OPUS || + mediaCodec->mimeType.subtype == RTC::RtpCodecMimeType::Subtype::MULTIOPUS)) + { + // ignoreDtx is set to false by default. + this->encodingContext->SetIgnoreDtx(data->ignoreDtx()); + } // Create RtpStreamSend instance for sending a single stream to the remote. CreateRtpStream(); @@ -133,7 +148,7 @@ namespace RTC /*channelRequestHandler*/ nullptr); } - SimulcastConsumer::~SimulcastConsumer() + MultiStreamConsumer::~MultiStreamConsumer() { MS_TRACE(); @@ -143,7 +158,7 @@ namespace RTC this->targetLayerRetransmissionBuffer.clear(); } - flatbuffers::Offset SimulcastConsumer::FillBuffer( + flatbuffers::Offset MultiStreamConsumer::FillBuffer( flatbuffers::FlatBufferBuilder& builder) const { MS_TRACE(); @@ -163,12 +178,12 @@ namespace RTC this->currentSpatialLayer, this->preferredLayers.temporal, this->targetLayers.temporal, - this->encodingContext->GetCurrentTemporalLayer()); + this->encodingContext ? this->encodingContext->GetCurrentTemporalLayer() : 1); return FBS::Consumer::CreateDumpResponse(builder, dump); } - flatbuffers::Offset SimulcastConsumer::FillBufferStats( + flatbuffers::Offset MultiStreamConsumer::FillBufferStats( flatbuffers::FlatBufferBuilder& builder) { MS_TRACE(); @@ -189,7 +204,7 @@ namespace RTC return FBS::Consumer::CreateGetStatsResponseDirect(builder, &rtpStreams); } - flatbuffers::Offset SimulcastConsumer::FillBufferScore( + flatbuffers::Offset MultiStreamConsumer::FillBufferScore( flatbuffers::FlatBufferBuilder& builder) const { MS_TRACE(); @@ -213,7 +228,7 @@ namespace RTC builder, this->rtpStream->GetScore(), producerScore, this->producerRtpStreamScores); } - void SimulcastConsumer::HandleRequest(Channel::ChannelRequest* request) + void MultiStreamConsumer::HandleRequest(Channel::ChannelRequest* request) { MS_TRACE(); @@ -305,7 +320,7 @@ namespace RTC } } - void SimulcastConsumer::ProducerRtpStream(RTC::RTP::RtpStreamRecv* rtpStream, uint32_t mappedSsrc) + void MultiStreamConsumer::ProducerRtpStream(RTC::RTP::RtpStreamRecv* rtpStream, uint32_t mappedSsrc) { MS_TRACE(); @@ -318,7 +333,7 @@ namespace RTC this->producerRtpStreams[spatialLayer] = rtpStream; } - void SimulcastConsumer::ProducerNewRtpStream(RTC::RTP::RtpStreamRecv* rtpStream, uint32_t mappedSsrc) + void MultiStreamConsumer::ProducerNewRtpStream(RTC::RTP::RtpStreamRecv* rtpStream, uint32_t mappedSsrc) { MS_TRACE(); @@ -339,7 +354,7 @@ namespace RTC } } - void SimulcastConsumer::ProducerRtpStreamScore( + void MultiStreamConsumer::ProducerRtpStreamScore( RTC::RTP::RtpStreamRecv* /*rtpStream*/, uint8_t score, uint8_t previousScore) { MS_TRACE(); @@ -352,6 +367,12 @@ namespace RTC // All Producer streams are dead. if (!IsActive()) { + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO) + { + MS_DUMP("----- all Producers are inactive, calling UpdateTargetLayers(-1, -1)"); + } + UpdateTargetLayers(-1, -1); } // Just check target layers if the stream has died or reborned. @@ -362,7 +383,7 @@ namespace RTC } } - void SimulcastConsumer::ProducerRtcpSenderReport(RTC::RTP::RtpStreamRecv* rtpStream, bool first) + void MultiStreamConsumer::ProducerRtcpSenderReport(RTC::RTP::RtpStreamRecv* rtpStream, bool first) { MS_TRACE(); @@ -389,12 +410,24 @@ namespace RTC } } - uint8_t SimulcastConsumer::GetBitratePriority() const + uint8_t MultiStreamConsumer::GetBitratePriority() const { MS_TRACE(); + // Only for video. + if (this->kind == RTC::Media::Kind::AUDIO) + { + return 0u; + } + MS_ASSERT(this->externallyManagedBitrate, "bitrate is not externally managed"); + // Audio SimpleConsumer does not play the BWE game. + if (this->kind == RTC::Media::Kind::AUDIO && this->type == RTC::RtpParameters::Type::SIMPLE) + { + return 0u; + } + if (!IsActive()) { return 0u; @@ -403,11 +436,12 @@ namespace RTC return this->priority; } - uint32_t SimulcastConsumer::IncreaseLayer(uint32_t bitrate, bool considerLoss) + uint32_t MultiStreamConsumer::IncreaseLayer(uint32_t bitrate, bool considerLoss) { MS_TRACE(); MS_ASSERT(this->externallyManagedBitrate, "bitrate is not externally managed"); + MS_ASSERT(this->kind == RTC::Media::Kind::VIDEO, "should be video"); MS_ASSERT(IsActive(), "should be active"); // If already in the preferred layers, do nothing. @@ -626,11 +660,12 @@ namespace RTC } } - void SimulcastConsumer::ApplyLayers() + void MultiStreamConsumer::ApplyLayers() { MS_TRACE(); MS_ASSERT(this->externallyManagedBitrate, "bitrate is not externally managed"); + MS_ASSERT(this->kind == RTC::Media::Kind::VIDEO, "should be video"); MS_ASSERT(IsActive(), "should be active"); auto provisionalTargetLayers = this->provisionalTargetLayers; @@ -640,6 +675,15 @@ namespace RTC if (provisionalTargetLayers != this->targetLayers) { + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO) + { + MS_DUMP( + "----- calling UpdateTargetLayers(%d, %d)", + provisionalTargetLayers.spatial, + provisionalTargetLayers.temporal); + } + UpdateTargetLayers(provisionalTargetLayers.spatial, provisionalTargetLayers.temporal); // If this looks like a spatial layer downgrade due to BWE limitations, set @@ -660,10 +704,16 @@ namespace RTC } } - uint32_t SimulcastConsumer::GetDesiredBitrate() const + uint32_t MultiStreamConsumer::GetDesiredBitrate() const { MS_TRACE(); + // Only for video. + if (this->kind == RTC::Media::Kind::AUDIO) + { + return 0u; + } + MS_ASSERT(this->externallyManagedBitrate, "bitrate is not externally managed"); if (!IsActive()) @@ -703,7 +753,7 @@ namespace RTC } // NOLINTNEXTLINE(misc-no-recursion) - void SimulcastConsumer::SendRtpPacket(RTC::RTP::Packet* packet, RTC::RTP::SharedPacket& sharedPacket) + void MultiStreamConsumer::SendRtpPacket(RTC::RTP::Packet* packet, RTC::RTP::SharedPacket& sharedPacket) { MS_TRACE(); @@ -723,9 +773,26 @@ namespace RTC packet->logger.Discarded(RTC::RtcLogger::RtpPacket::DiscardReason::CONSUMER_INACTIVE); #endif + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO) + { + MS_DUMP( + "----- audio packet dropped CONSUMER_INACTIVE | seq:%u", packet->GetSequenceNumber()); + } + this->rtpSeqManager.Drop(packet->GetSequenceNumber()); } + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO && spatialLayer != this->currentSpatialLayer) + { + MS_DUMP( + "----- !IsActive() => audio packet ignored because spatialLayer %d != this->currentSpatialLayer %d | seq:%u", + spatialLayer, + this->currentSpatialLayer, + packet->GetSequenceNumber()); + } + return; } @@ -739,9 +806,26 @@ namespace RTC packet->logger.Discarded(RTC::RtcLogger::RtpPacket::DiscardReason::INVALID_TARGET_LAYER); #endif + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO) + { + MS_DUMP( + "----- audio packet dropped INVALID_TARGET_LAYER | seq:%u", packet->GetSequenceNumber()); + } + this->rtpSeqManager.Drop(packet->GetSequenceNumber()); } + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO && spatialLayer != this->currentSpatialLayer) + { + MS_DUMP( + "----- this->targetLayers.temporal == -1 => audio packet ignored because spatialLayer %d != this->currentSpatialLayer %d | seq:%u", + spatialLayer, + this->currentSpatialLayer, + packet->GetSequenceNumber()); + } + return; } @@ -761,9 +845,27 @@ namespace RTC packet->logger.Discarded(RTC::RtcLogger::RtpPacket::DiscardReason::UNSUPPORTED_PAYLOAD_TYPE); #endif + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO) + { + MS_DUMP( + "----- audio packet dropped UNSUPPORTED_PAYLOAD_TYPE | seq:%u", + packet->GetSequenceNumber()); + } + this->rtpSeqManager.Drop(packet->GetSequenceNumber()); } + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO && spatialLayer != this->currentSpatialLayer) + { + MS_DUMP( + "----- unsupported codec => audio packet ignored because spatialLayer %d != this->currentSpatialLayer %d | seq:%u", + spatialLayer, + this->currentSpatialLayer, + packet->GetSequenceNumber()); + } + return; } @@ -776,12 +878,18 @@ namespace RTC spatialLayer == this->targetLayers.spatial) { // Ignore if not a key frame. - if (!packet->IsKeyFrame()) + if (this->keyFrameSupported && !packet->IsKeyFrame()) { #ifdef MS_RTC_LOGGER_RTP packet->logger.Discarded(RTC::RtcLogger::RtpPacket::DiscardReason::NOT_A_KEYFRAME); #endif + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO) + { + MS_DUMP("----- audio packet dropped NOT_A_KEYFRAME | seq:%u", packet->GetSequenceNumber()); + } + // NOTE: Don't drop the packet in the RTP sequence manager since this // packet doesn't belong to the current spatial layer. @@ -805,18 +913,34 @@ namespace RTC // NOTE: Don't drop the packet in the RTP sequence manager since this // packet doesn't belong to the current spatial layer. + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO) + { + MS_DUMP( + "----- audio packet ignored (spatialLayer %d != this->currentSpatialLayer %d) | seq:%u", + spatialLayer, + this->currentSpatialLayer, + packet->GetSequenceNumber()); + } + return; } // If we need to sync and this is not a key frame, ignore the packet. // NOTE: syncRequired is true if packet is a key frame of the target spatial // layer or if transport just connected or consumer resumed. - if (this->syncRequired && !packet->IsKeyFrame()) + if (this->syncRequired && this->keyFrameSupported && !packet->IsKeyFrame()) { #ifdef MS_RTC_LOGGER_RTP packet->logger.Discarded(RTC::RtcLogger::RtpPacket::DiscardReason::NOT_A_KEYFRAME); #endif + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO) + { + MS_DUMP("----- audio packet dropped NOT_A_KEYFRAME | seq:%u", packet->GetSequenceNumber()); + } + // NOTE: No need to drop the packet in the RTP sequence manager since here // we are blocking all packets but the key frame that would trigger sync // below. @@ -839,9 +963,25 @@ namespace RTC packet->logger.Discarded(RTC::RtcLogger::RtpPacket::DiscardReason::EMPTY_PAYLOAD); #endif + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO) + { + MS_DUMP("----- audio packet dropped EMPTY_PAYLOAD | seq:%u", packet->GetSequenceNumber()); + } + this->rtpSeqManager.Drop(packet->GetSequenceNumber()); } + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO && spatialLayer != this->currentSpatialLayer) + { + MS_DUMP( + "----- unsupported codec => audio packet ignored because spatialLayer %d != this->currentSpatialLayer %d | seq:%u", + spatialLayer, + this->currentSpatialLayer, + packet->GetSequenceNumber()); + } + return; } @@ -967,6 +1107,14 @@ namespace RTC RTC::RtcLogger::RtpPacket::DiscardReason::TOO_HIGH_TIMESTAMP_EXTRA_NEEDED); #endif + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO) + { + MS_DUMP( + "----- audio packet dropped TOO_HIGH_TIMESTAMP_EXTRA_NEEDED | seq:%u", + packet->GetSequenceNumber()); + } + // NOTE: Don't drop the packet in the RTP sequence manager since this // packet doesn't belong to the current spatial layer. @@ -997,7 +1145,10 @@ namespace RTC // https://github.com/versatica/mediasoup/issues/408 this->rtpSeqManager.Sync(packet->GetSequenceNumber() - (this->lastSentPacketHasMarker ? 1 : 2)); - this->encodingContext->SyncRequired(); + if (this->encodingContext) + { + this->encodingContext->SyncRequired(); + } this->syncRequired = false; this->spatialLayerToSync = -1; @@ -1018,6 +1169,14 @@ namespace RTC RTC::RtcLogger::RtpPacket::DiscardReason::PACKET_PREVIOUS_TO_SPATIAL_LAYER_SWITCH); #endif + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO) + { + MS_DUMP( + "----- audio packet dropped PACKET_PREVIOUS_TO_SPATIAL_LAYER_SWITCH | seq:%u", + packet->GetSequenceNumber()); + } + this->rtpSeqManager.Drop(packet->GetSequenceNumber()); return; @@ -1033,6 +1192,16 @@ namespace RTC if (shouldSwitchCurrentSpatialLayer) { + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO) + { + MS_DUMP( + "----- setting this->currentSpatialLayer = %d [previous value:%d] | seq:%u", + this->targetLayers.spatial, + this->currentSpatialLayer, + packet->GetSequenceNumber()); + } + // Update current spatial layer. this->currentSpatialLayer = this->targetLayers.spatial; @@ -1040,8 +1209,11 @@ namespace RTC this->checkingForOldPacketsInSpatialLayer = true; // Update target and current temporal layer. - this->encodingContext->SetTargetTemporalLayer(this->targetLayers.temporal); - this->encodingContext->SetCurrentTemporalLayer(packet->GetTemporalLayer()); + if (this->encodingContext) + { + this->encodingContext->SetTargetTemporalLayer(this->targetLayers.temporal); + this->encodingContext->SetCurrentTemporalLayer(packet->GetTemporalLayer()); + } // Reset the score of our RtpStream to 10. this->rtpStream->ResetScore(10u, /*notify*/ false); @@ -1053,9 +1225,12 @@ namespace RTC EmitScore(); // Rewrite payload if needed. - packet->ProcessPayload(this->encodingContext.get(), marker); + if (this->encodingContext) + { + packet->ProcessPayload(this->encodingContext.get(), marker); + } } - else + else if (this->encodingContext) { auto previousTemporalLayer = this->encodingContext->GetCurrentTemporalLayer(); @@ -1067,6 +1242,11 @@ namespace RTC #ifdef MS_RTC_LOGGER_RTP packet->logger.Discarded(RTC::RtcLogger::RtpPacket::DiscardReason::DROPPED_BY_CODEC); #endif + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO) + { + MS_DUMP("----- audio packet dropped DROPPED_BY_CODEC | seq:%u", packet->GetSequenceNumber()); + } this->rtpSeqManager.Drop(packet->GetSequenceNumber()); @@ -1146,6 +1326,12 @@ namespace RTC #ifdef MS_RTC_LOGGER_RTP packet->logger.Discarded(RTC::RtcLogger::RtpPacket::DiscardReason::SEND_RTP_STREAM_DISCARDED); #endif + + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO) + { + MS_DUMP("----- audio packet dropped SEND_RTP_STREAM_DISCARDED | seq:%u", origSeq); + } } // Restore packet fields. @@ -1210,7 +1396,7 @@ namespace RTC } } - bool SimulcastConsumer::GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs) + bool MultiStreamConsumer::GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs) { MS_TRACE(); @@ -1242,7 +1428,7 @@ namespace RTC return true; } - void SimulcastConsumer::NeedWorstRemoteFractionLost( + void MultiStreamConsumer::NeedWorstRemoteFractionLost( uint32_t /*mappedSsrc*/, uint8_t& worstRemoteFractionLost) { MS_TRACE(); @@ -1258,7 +1444,7 @@ namespace RTC worstRemoteFractionLost = std::max(fractionLost, worstRemoteFractionLost); } - void SimulcastConsumer::ReceiveNack(RTC::RTCP::FeedbackRtpNackPacket* nackPacket) + void MultiStreamConsumer::ReceiveNack(RTC::RTCP::FeedbackRtpNackPacket* nackPacket) { MS_TRACE(); @@ -1273,7 +1459,7 @@ namespace RTC this->rtpStream->ReceiveNack(nackPacket); } - void SimulcastConsumer::ReceiveKeyFrameRequest( + void MultiStreamConsumer::ReceiveKeyFrameRequest( RTC::RTCP::FeedbackPs::MessageType messageType, uint32_t ssrc) { MS_TRACE(); @@ -1305,21 +1491,21 @@ namespace RTC } } - void SimulcastConsumer::ReceiveRtcpReceiverReport(RTC::RTCP::ReceiverReport* report) + void MultiStreamConsumer::ReceiveRtcpReceiverReport(RTC::RTCP::ReceiverReport* report) { MS_TRACE(); this->rtpStream->ReceiveRtcpReceiverReport(report); } - void SimulcastConsumer::ReceiveRtcpXrReceiverReferenceTime(RTC::RTCP::ReceiverReferenceTime* report) + void MultiStreamConsumer::ReceiveRtcpXrReceiverReferenceTime(RTC::RTCP::ReceiverReferenceTime* report) { MS_TRACE(); this->rtpStream->ReceiveRtcpXrReceiverReferenceTime(report); } - uint32_t SimulcastConsumer::GetTransmissionRate(uint64_t nowMs) + uint32_t MultiStreamConsumer::GetTransmissionRate(uint64_t nowMs) { MS_TRACE(); @@ -1331,14 +1517,14 @@ namespace RTC return this->rtpStream->GetBitrate(nowMs); } - float SimulcastConsumer::GetRtt() const + float MultiStreamConsumer::GetRtt() const { MS_TRACE(); return this->rtpStream->GetRtt(); } - void SimulcastConsumer::UserOnTransportConnected() + void MultiStreamConsumer::UserOnTransportConnected() { MS_TRACE(); @@ -1352,7 +1538,7 @@ namespace RTC } } - void SimulcastConsumer::UserOnTransportDisconnected() + void MultiStreamConsumer::UserOnTransportDisconnected() { MS_TRACE(); @@ -1361,10 +1547,18 @@ namespace RTC this->rtpStream->Pause(); this->targetLayerRetransmissionBuffer.clear(); + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO) + { + MS_DUMP( + "----- calling UpdateTargetLayers(-1, -1) [current this->currentSpatialLayer:%d]", + this->currentSpatialLayer); + } + UpdateTargetLayers(-1, -1); } - void SimulcastConsumer::UserOnPaused() + void MultiStreamConsumer::UserOnPaused() { MS_TRACE(); @@ -1373,6 +1567,14 @@ namespace RTC this->rtpStream->Pause(); this->targetLayerRetransmissionBuffer.clear(); + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO) + { + MS_DUMP( + "----- calling UpdateTargetLayers(-1, -1) [current this->currentSpatialLayer:%d]", + this->currentSpatialLayer); + } + UpdateTargetLayers(-1, -1); if (this->externallyManagedBitrate) @@ -1381,7 +1583,7 @@ namespace RTC } } - void SimulcastConsumer::UserOnResumed() + void MultiStreamConsumer::UserOnResumed() { MS_TRACE(); @@ -1396,7 +1598,7 @@ namespace RTC } } - void SimulcastConsumer::CreateRtpStream() + void MultiStreamConsumer::CreateRtpStream() { MS_TRACE(); @@ -1480,7 +1682,7 @@ namespace RTC } } - void SimulcastConsumer::RequestKeyFrames() + void MultiStreamConsumer::RequestKeyFrames() { MS_TRACE(); @@ -1507,7 +1709,7 @@ namespace RTC } } - void SimulcastConsumer::RequestKeyFrameForTargetSpatialLayer() + void MultiStreamConsumer::RequestKeyFrameForTargetSpatialLayer() { MS_TRACE(); @@ -1528,7 +1730,7 @@ namespace RTC this->listener->OnConsumerKeyFrameRequested(this, mappedSsrc); } - void SimulcastConsumer::RequestKeyFrameForCurrentSpatialLayer() + void MultiStreamConsumer::RequestKeyFrameForCurrentSpatialLayer() { MS_TRACE(); @@ -1549,10 +1751,16 @@ namespace RTC this->listener->OnConsumerKeyFrameRequested(this, mappedSsrc); } - void SimulcastConsumer::MayChangeLayers(bool force) + void MultiStreamConsumer::MayChangeLayers(bool force) { MS_TRACE(); + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO) + { + MS_DUMP("***** MayChangeLayers(force:%s)", force ? "true" : "false"); + } + RTC::ConsumerTypes::VideoLayers newTargetLayers; if (RecalculateTargetLayers(newTargetLayers)) @@ -1564,19 +1772,41 @@ namespace RTC // will let us change it when it considers. if (this->externallyManagedBitrate) { + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO) + { + MS_DUMP("----- this->externallyManagedBitrate == true"); + } + if (newTargetLayers.spatial != this->targetLayers.spatial || force) { + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO) + { + MS_DUMP( + "----- this->externallyManagedBitrate == true => calling listener->OnConsumerNeedBitrateChange()"); + } + this->listener->OnConsumerNeedBitrateChange(this); } } else { + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO) + { + MS_DUMP( + "----- calling UpdateTargetLayers(%d, %d)", + newTargetLayers.spatial, + newTargetLayers.temporal); + } + UpdateTargetLayers(newTargetLayers.spatial, newTargetLayers.temporal); } } } - bool SimulcastConsumer::RecalculateTargetLayers(RTC::ConsumerTypes::VideoLayers& newTargetLayers) const + bool MultiStreamConsumer::RecalculateTargetLayers(RTC::ConsumerTypes::VideoLayers& newTargetLayers) const { MS_TRACE(); @@ -1651,10 +1881,17 @@ namespace RTC return (newTargetLayers != this->targetLayers); } - void SimulcastConsumer::UpdateTargetLayers(int16_t newTargetSpatialLayer, int16_t newTargetTemporalLayer) + void MultiStreamConsumer::UpdateTargetLayers( + int16_t newTargetSpatialLayer, int16_t newTargetTemporalLayer) { MS_TRACE(); + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO) + { + MS_DUMP("***** UpdateTargetLayers(%d, %d)", newTargetSpatialLayer, newTargetTemporalLayer); + } + // If we don't have yet a RTP timestamp reference, set it now. if ( newTargetSpatialLayer != -1 && (this->tsReferenceSpatialLayer == -1 || @@ -1675,13 +1912,23 @@ namespace RTC if (newTargetSpatialLayer == -1) { + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO) + { + MS_DUMP( + "----- newTargetSpatialLayer == -1 => setting this->targetLayers.spatial = -1, this->targetLayers.temporal = -1, this->currentSpatialLayer = -1"); + } + // Unset current and target layers. this->targetLayers.spatial = -1; this->targetLayers.temporal = -1; this->currentSpatialLayer = -1; - this->encodingContext->SetTargetTemporalLayer(-1); - this->encodingContext->SetCurrentTemporalLayer(-1); + if (this->encodingContext) + { + this->encodingContext->SetTargetTemporalLayer(-1); + this->encodingContext->SetCurrentTemporalLayer(-1); + } MS_DEBUG_TAG( simulcast, "target layers changed [spatial:-1, temporal:-1, consumerId:%s]", this->id.c_str()); @@ -1691,12 +1938,21 @@ namespace RTC return; } + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO) + { + MS_DUMP( + "----- setting this->targetLayers.spatial = %d, this->targetLayers.temporal = %d", + newTargetSpatialLayer, + newTargetTemporalLayer); + } + this->targetLayers.spatial = newTargetSpatialLayer; this->targetLayers.temporal = newTargetTemporalLayer; // If the new target spatial layer matches the current one, apply the new // target temporal layer now. - if (this->targetLayers.spatial == this->currentSpatialLayer) + if (this->encodingContext && this->targetLayers.spatial == this->currentSpatialLayer) { this->encodingContext->SetTargetTemporalLayer(this->targetLayers.temporal); } @@ -1716,14 +1972,14 @@ namespace RTC } } - bool SimulcastConsumer::CanSwitchToSpatialLayer(int16_t spatialLayer) const + bool MultiStreamConsumer::CanSwitchToSpatialLayer(int16_t spatialLayer) const { MS_TRACE(); // This method assumes that the caller has verified that there is a valid // Producer RtpStream for the given spatial layer. MS_ASSERT( - this->producerRtpStreams.at(spatialLayer), + this->producerRtpStreams.size() > spatialLayer, "no Producer RtpStream for the given spatialLayer:%" PRIi16, spatialLayer); @@ -1737,7 +1993,7 @@ namespace RTC this->producerRtpStreams.at(spatialLayer)->GetSenderReportNtpMs()); } - void SimulcastConsumer::StorePacketInTargetLayerRetransmissionBuffer( + void MultiStreamConsumer::StorePacketInTargetLayerRetransmissionBuffer( RTC::RTP::Packet* packet, RTC::RTP::SharedPacket& sharedPacket) { MS_TRACE(); @@ -1770,7 +2026,7 @@ namespace RTC } } - void SimulcastConsumer::EmitScore() const + void MultiStreamConsumer::EmitScore() const { MS_TRACE(); @@ -1786,14 +2042,14 @@ namespace RTC notificationOffset); } - void SimulcastConsumer::EmitLayersChange() const + void MultiStreamConsumer::EmitLayersChange() const { MS_TRACE(); MS_DEBUG_DEV( "current layers changed to [spatial:%" PRIi16 ", temporal:%" PRIi16 ", consumerId:%s]", this->currentSpatialLayer, - this->encodingContext->GetCurrentTemporalLayer(), + this->encodingContext ? this->encodingContext->GetCurrentTemporalLayer() : 1, this->id.c_str()); flatbuffers::Offset layersOffset; @@ -1803,7 +2059,7 @@ namespace RTC layersOffset = FBS::Consumer::CreateConsumerLayers( this->shared->channelNotifier->GetBufferBuilder(), this->currentSpatialLayer, - this->encodingContext->GetCurrentTemporalLayer()); + this->encodingContext ? this->encodingContext->GetCurrentTemporalLayer() : 1); } auto notificationOffset = FBS::Consumer::CreateLayersChangeNotification( @@ -1816,12 +2072,18 @@ namespace RTC notificationOffset); } - RTC::RTP::RtpStreamRecv* SimulcastConsumer::GetProducerCurrentRtpStream() const + RTC::RTP::RtpStreamRecv* MultiStreamConsumer::GetProducerCurrentRtpStream() const { MS_TRACE(); if (this->currentSpatialLayer == -1) { + // TODO: REMOVE + if (this->kind == RTC::Media::Kind::AUDIO) + { + MS_DUMP("----- this->currentSpatialLayer == -1 => nullptr"); + } + return nullptr; } @@ -1829,7 +2091,7 @@ namespace RTC return this->producerRtpStreams.at(this->currentSpatialLayer); } - RTC::RTP::RtpStreamRecv* SimulcastConsumer::GetProducerTargetRtpStream() const + RTC::RTP::RtpStreamRecv* MultiStreamConsumer::GetProducerTargetRtpStream() const { MS_TRACE(); @@ -1842,7 +2104,7 @@ namespace RTC return this->producerRtpStreams.at(this->targetLayers.spatial); } - RTC::RTP::RtpStreamRecv* SimulcastConsumer::GetProducerTsReferenceRtpStream() const + RTC::RTP::RtpStreamRecv* MultiStreamConsumer::GetProducerTsReferenceRtpStream() const { MS_TRACE(); @@ -1855,7 +2117,7 @@ namespace RTC return this->producerRtpStreams.at(this->tsReferenceSpatialLayer); } - void SimulcastConsumer::OnRtpStreamScore( + void MultiStreamConsumer::OnRtpStreamScore( RTC::RTP::RtpStream* /*rtpStream*/, uint8_t /*score*/, uint8_t /*previousScore*/) { MS_TRACE(); @@ -1875,7 +2137,7 @@ namespace RTC } } - void SimulcastConsumer::OnRtpStreamRetransmitRtpPacket( + void MultiStreamConsumer::OnRtpStreamRetransmitRtpPacket( RTC::RTP::RtpStreamSend* /*rtpStream*/, RTC::RTP::Packet* packet) { MS_TRACE(); diff --git a/worker/src/RTC/SimpleConsumer.cpp b/worker/src/RTC/SimpleConsumer.cpp deleted file mode 100644 index 3b3962e6ff..0000000000 --- a/worker/src/RTC/SimpleConsumer.cpp +++ /dev/null @@ -1,882 +0,0 @@ -#include "FBS/consumer.h" -#define MS_CLASS "RTC::SimpleConsumer" -// #define MS_LOG_DEV_LEVEL 3 - -#include "DepLibUV.hpp" -#include "Logger.hpp" -#include "MediaSoupErrors.hpp" -#include "Utils.hpp" -#include "RTC/RTP/Codecs/Tools.hpp" -#include "RTC/SimpleConsumer.hpp" -#ifdef MS_RTC_LOGGER_RTP -#include "RTC/RtcLogger.hpp" -#endif -#include // std::numeric_limits - -namespace RTC -{ - /* Static. */ - - static constexpr size_t TargetLayerRetransmissionBufferSize{ 15u }; - - /* Instance methods. */ - - SimpleConsumer::SimpleConsumer( - RTC::Shared* shared, - const std::string& id, - const std::string& producerId, - RTC::Consumer::Listener* listener, - const FBS::Transport::ConsumeRequest* data) - : RTC::Consumer::Consumer(shared, id, producerId, listener, data, RTC::RtpParameters::Type::SIMPLE) - { - MS_TRACE(); - - // Ensure there is a single encoding. - if (this->consumableRtpEncodings.size() != 1u) - { - MS_THROW_TYPE_ERROR("invalid consumableRtpEncodings with size != 1"); - } - - auto& encoding = this->rtpParameters.encodings[0]; - const auto* mediaCodec = this->rtpParameters.GetCodecForEncoding(encoding); - - this->keyFrameSupported = RTC::RTP::Codecs::Tools::CanBeKeyFrame(mediaCodec->mimeType); - - // Create RtpStreamSend instance for sending a single stream to the remote. - CreateRtpStream(); - - // Let's chosee an initial output seq number between 1000 and 32768 to avoid - // libsrtp bug: - // https://github.com/versatica/mediasoup/issues/1437 - const uint16_t initialOutputSeq = - Utils::Crypto::GetRandomUInt(1000u, std::numeric_limits::max() / 2); - - this->rtpSeqManager = RTC::SeqManager(initialOutputSeq); - - // Create the encoding context for Opus. - if ( - mediaCodec->mimeType.type == RTC::RtpCodecMimeType::Type::AUDIO && - (mediaCodec->mimeType.subtype == RTC::RtpCodecMimeType::Subtype::OPUS || - mediaCodec->mimeType.subtype == RTC::RtpCodecMimeType::Subtype::MULTIOPUS)) - { - RTC::RTP::Codecs::EncodingContext::Params params; - - this->encodingContext.reset( - RTC::RTP::Codecs::Tools::GetEncodingContext(mediaCodec->mimeType, params)); - - // ignoreDtx is set to false by default. - this->encodingContext->SetIgnoreDtx(data->ignoreDtx()); - } - - // NOTE: This may throw. - this->shared->channelMessageRegistrator->RegisterHandler( - this->id, - /*channelRequestHandler*/ this, - /*channelNotificationHandler*/ nullptr); - } - - SimpleConsumer::~SimpleConsumer() - { - MS_TRACE(); - - this->shared->channelMessageRegistrator->UnregisterHandler(this->id); - - delete this->rtpStream; - this->targetLayerRetransmissionBuffer.clear(); - } - - flatbuffers::Offset SimpleConsumer::FillBuffer( - flatbuffers::FlatBufferBuilder& builder) const - { - MS_TRACE(); - - // Call the parent method. - auto base = RTC::Consumer::FillBuffer(builder); - // Add rtpStream. - std::vector> rtpStreams; - rtpStreams.emplace_back(this->rtpStream->FillBuffer(builder)); - - auto dump = FBS::Consumer::CreateConsumerDumpDirect(builder, base, &rtpStreams); - - return FBS::Consumer::CreateDumpResponse(builder, dump); - } - - flatbuffers::Offset SimpleConsumer::FillBufferStats( - flatbuffers::FlatBufferBuilder& builder) - { - MS_TRACE(); - - std::vector> rtpStreams; - - // Add stats of our send stream. - rtpStreams.emplace_back(this->rtpStream->FillBufferStats(builder)); - - // Add stats of our recv stream. - if (this->producerRtpStream) - { - rtpStreams.emplace_back(this->producerRtpStream->FillBufferStats(builder)); - } - - return FBS::Consumer::CreateGetStatsResponseDirect(builder, &rtpStreams); - } - - flatbuffers::Offset SimpleConsumer::FillBufferScore( - flatbuffers::FlatBufferBuilder& builder) const - { - MS_TRACE(); - - MS_ASSERT(this->producerRtpStreamScores, "producerRtpStreamScores not set"); - - uint8_t producerScore{ 0 }; - - if (this->producerRtpStream) - { - producerScore = this->producerRtpStream->GetScore(); - } - - return FBS::Consumer::CreateConsumerScoreDirect( - builder, this->rtpStream->GetScore(), producerScore, this->producerRtpStreamScores); - } - - void SimpleConsumer::HandleRequest(Channel::ChannelRequest* request) - { - MS_TRACE(); - - switch (request->method) - { - case Channel::ChannelRequest::Method::CONSUMER_DUMP: - { - auto dumpOffset = FillBuffer(request->GetBufferBuilder()); - - request->Accept(FBS::Response::Body::Consumer_DumpResponse, dumpOffset); - - break; - } - - case Channel::ChannelRequest::Method::CONSUMER_REQUEST_KEY_FRAME: - { - if (IsActive()) - { - RequestKeyFrame(); - } - - request->Accept(); - - break; - } - - case Channel::ChannelRequest::Method::CONSUMER_SET_PREFERRED_LAYERS: - { - // Accept with empty preferred layers object. - - auto responseOffset = - FBS::Consumer::CreateSetPreferredLayersResponse(request->GetBufferBuilder()); - - request->Accept(FBS::Response::Body::Consumer_SetPreferredLayersResponse, responseOffset); - - break; - } - - default: - { - // Pass it to the parent class. - RTC::Consumer::HandleRequest(request); - } - } - } - - void SimpleConsumer::ProducerRtpStream(RTC::RTP::RtpStreamRecv* rtpStream, uint32_t /*mappedSsrc*/) - { - MS_TRACE(); - - this->producerRtpStream = rtpStream; - } - - void SimpleConsumer::ProducerNewRtpStream(RTC::RTP::RtpStreamRecv* rtpStream, uint32_t /*mappedSsrc*/) - { - MS_TRACE(); - - this->producerRtpStream = rtpStream; - - // Emit the score event. - EmitScore(); - } - - void SimpleConsumer::ProducerRtpStreamScore( - RTC::RTP::RtpStreamRecv* /*rtpStream*/, uint8_t /*score*/, uint8_t /*previousScore*/) - { - MS_TRACE(); - - // Emit the score event. - EmitScore(); - } - - void SimpleConsumer::ProducerRtcpSenderReport(RTC::RTP::RtpStreamRecv* /*rtpStream*/, bool /*first*/) - { - MS_TRACE(); - - // Do nothing. - } - - uint8_t SimpleConsumer::GetBitratePriority() const - { - MS_TRACE(); - - MS_ASSERT(this->externallyManagedBitrate, "bitrate is not externally managed"); - - // Audio SimpleConsumer does not play the BWE game. - if (this->kind != RTC::Media::Kind::VIDEO) - { - return 0u; - } - - if (!IsActive()) - { - return 0u; - } - - return this->priority; - } - - uint32_t SimpleConsumer::IncreaseLayer(uint32_t bitrate, bool /*considerLoss*/) - { - MS_TRACE(); - - MS_ASSERT(this->externallyManagedBitrate, "bitrate is not externally managed"); - MS_ASSERT(this->kind == RTC::Media::Kind::VIDEO, "should be video"); - MS_ASSERT(IsActive(), "should be active"); - - // If this is not the first time this method is called within the same iteration, - // return 0 since a video SimpleConsumer does not keep state about this. - if (this->managingBitrate) - { - return 0u; - } - - this->managingBitrate = true; - - // Video SimpleConsumer does not really play the BWE game when. However, let's - // be honest and try to be nice. - auto nowMs = DepLibUV::GetTimeMs(); - auto desiredBitrate = this->producerRtpStream->GetBitrate(nowMs); - - if (desiredBitrate < bitrate) - { - return desiredBitrate; - } - else - { - return bitrate; - } - } - - void SimpleConsumer::ApplyLayers() - { - MS_TRACE(); - - MS_ASSERT(this->externallyManagedBitrate, "bitrate is not externally managed"); - MS_ASSERT(this->kind == RTC::Media::Kind::VIDEO, "should be video"); - MS_ASSERT(IsActive(), "should be active"); - - this->managingBitrate = false; - - // SimpleConsumer does not play the BWE game (even if video kind). - } - - uint32_t SimpleConsumer::GetDesiredBitrate() const - { - MS_TRACE(); - - MS_ASSERT(this->externallyManagedBitrate, "bitrate is not externally managed"); - - // Audio SimpleConsumer does not play the BWE game. - if (this->kind != RTC::Media::Kind::VIDEO) - { - return 0u; - } - - if (!IsActive()) - { - return 0u; - } - - auto nowMs = DepLibUV::GetTimeMs(); - auto desiredBitrate = this->producerRtpStream->GetBitrate(nowMs); - - // If consumer.rtpParameters.encodings[0].maxBitrate was given and it's - // greater than computed one, then use it. - auto maxBitrate = this->rtpParameters.encodings[0].maxBitrate; - - desiredBitrate = std::max(maxBitrate, desiredBitrate); - - return desiredBitrate; - } - - // NOLINTNEXTLINE(misc-no-recursion) - void SimpleConsumer::SendRtpPacket(RTC::RTP::Packet* packet, RTC::RTP::SharedPacket& sharedPacket) - { - MS_TRACE(); - -#ifdef MS_RTC_LOGGER_RTP - packet->logger.consumerId = this->id; -#endif - - if (!IsActive()) - { -#ifdef MS_RTC_LOGGER_RTP - packet->logger.Discarded(RTC::RtcLogger::RtpPacket::DiscardReason::CONSUMER_INACTIVE); -#endif - - this->rtpSeqManager.Drop(packet->GetSequenceNumber()); - - return; - } - - // If we need to sync, support key frames and this is not a key frame, ignore - // the packet. - if (this->syncRequired && this->keyFrameSupported && !packet->IsKeyFrame()) - { -#ifdef MS_RTC_LOGGER_RTP - packet->logger.Discarded(RTC::RtcLogger::RtpPacket::DiscardReason::NOT_A_KEYFRAME); -#endif - - // NOTE: No need to drop the packet in the RTP sequence manager since here - // we are blocking all packets but the key frame that would trigger sync - // below. - - // Store the packet for the scenario in which this packet is part of the - // key frame and it arrived before the first packet of the key frame. - StorePacketInTargetLayerRetransmissionBuffer(packet, sharedPacket); - - return; - } - - auto payloadType = packet->GetPayloadType(); - - // NOTE: This may happen if this Consumer supports just some codecs of those - // in the corresponding Producer. - if (!this->supportedCodecPayloadTypes[payloadType]) - { - MS_WARN_DEV("payload type not supported [payloadType:%" PRIu8 "]", payloadType); - -#ifdef MS_RTC_LOGGER_RTP - packet->logger.Discarded(RTC::RtcLogger::RtpPacket::DiscardReason::UNSUPPORTED_PAYLOAD_TYPE); -#endif - - this->rtpSeqManager.Drop(packet->GetSequenceNumber()); - - return; - } - - // Packets with only padding are not forwarded. - if (packet->GetPayloadLength() == 0) - { -#ifdef MS_RTC_LOGGER_RTP - packet->logger.Discarded(RTC::RtcLogger::RtpPacket::DiscardReason::EMPTY_PAYLOAD); -#endif - - this->rtpSeqManager.Drop(packet->GetSequenceNumber()); - - return; - } - - bool marker; - - // Process the payload if needed. Drop packet if necessary. - if (this->encodingContext && !packet->ProcessPayload(this->encodingContext.get(), marker)) - { - MS_DEBUG_DEV( - "discarding packet [ssrc:%" PRIu32 ", seq:%" PRIu16 ", ts:%" PRIu32 "]", - packet->GetSsrc(), - packet->GetSequenceNumber(), - packet->GetTimestamp()); - -#ifdef MS_RTC_LOGGER_RTP - packet->logger.Discarded(RTC::RtcLogger::RtpPacket::DiscardReason::DROPPED_BY_CODEC); -#endif - - this->rtpSeqManager.Drop(packet->GetSequenceNumber()); - - return; - } - - // Whether this is the first packet after re-sync. - const bool isSyncPacket = this->syncRequired; - - // Whether packets stored in the target layer retransmission buffer must be - // sent once this packet is sent. - bool sendPacketsInTargetLayerRetransmissionBuffer{ false }; - - // Sync sequence number and timestamp if required. - if (isSyncPacket) - { - if (packet->IsKeyFrame()) - { - MS_DEBUG_TAG( - rtp, - "sync key frame received [ssrc:%" PRIu32 ", seq:%" PRIu16 ", ts:%" PRIu32 "]", - packet->GetSsrc(), - packet->GetSequenceNumber(), - packet->GetTimestamp()); - - sendPacketsInTargetLayerRetransmissionBuffer = true; - } - - this->rtpSeqManager.Sync(packet->GetSequenceNumber() - 1); - - this->syncRequired = false; - } - - // Update RTP seq number and timestamp. - uint16_t seq; - - this->rtpSeqManager.Input(packet->GetSequenceNumber(), seq); - - // Save original packet fields. - auto origSsrc = packet->GetSsrc(); - auto origSeq = packet->GetSequenceNumber(); - - // Rewrite packet. - packet->SetSsrc(this->rtpParameters.encodings[0].ssrc); - packet->SetSequenceNumber(seq); - -#ifdef MS_RTC_LOGGER_RTP - packet->logger.sendRtpTimestamp = packet->GetTimestamp(); - packet->logger.sendSeqNumber = seq; -#endif - - if (isSyncPacket) - { - MS_DEBUG_TAG( - rtp, - "sending sync packet [ssrc:%" PRIu32 ", seq:%" PRIu16 ", ts:%" PRIu32 - "] from original [seq:%" PRIu16 "]", - packet->GetSsrc(), - packet->GetSequenceNumber(), - packet->GetTimestamp(), - origSeq); - } - - const RTC::RTP::RtpStreamSend::ReceivePacketResult result = - this->rtpStream->ReceivePacket(packet, sharedPacket); - - if (result != RTC::RTP::RtpStreamSend::ReceivePacketResult::DISCARDED) - { - // Send the packet. - this->listener->OnConsumerSendRtpPacket(this, packet); - - // May emit 'trace' event. - EmitTraceEventRtpAndKeyFrameTypes(packet); - } - else - { - MS_WARN_TAG( - rtp, - "failed to send packet [ssrc:%" PRIu32 ", seq:%" PRIu16 ", ts:%" PRIu32 - "] from original [seq:%" PRIu16 "]", - packet->GetSsrc(), - packet->GetSequenceNumber(), - packet->GetTimestamp(), - origSeq); - -#ifdef MS_RTC_LOGGER_RTP - packet->logger.Discarded(RTC::RtcLogger::RtpPacket::DiscardReason::SEND_RTP_STREAM_DISCARDED); -#endif - } - - // Restore packet fields. - packet->SetSsrc(origSsrc); - packet->SetSequenceNumber(origSeq); - - // If sharedPacket doesn't have a packet inside and it has been stored we - // need to clone the packet into it. - if (!sharedPacket.HasPacket() && result == RTC::RTP::RtpStreamSend::ReceivePacketResult::ACCEPTED_AND_STORED) - { - sharedPacket.Assign(packet); - } - - // If sent packet was the first packet of a key frame, let's send buffered - // packets belonging to the same key frame that arrived earlier due to - // packet misorder. - if (sendPacketsInTargetLayerRetransmissionBuffer) - { - // NOTE: Only send buffered packets if the first packet containing the key - // frame was sent. - if (result != RTC::RTP::RtpStreamSend::ReceivePacketResult::DISCARDED) - { - for (auto& kv : this->targetLayerRetransmissionBuffer) - { - auto& bufferedSharedPacket = kv.second; - auto* bufferedPacket = bufferedSharedPacket.GetPacket(); - - if (bufferedPacket->GetSequenceNumber() > origSeq) - { - MS_DEBUG_DEV( - "sending packet buffered in the target layer retransmission buffer [ssrc:%" PRIu32 - ", seq:%" PRIu16 ", ts:%" PRIu32 - "] after sending first packet of the key frame [ssrc:%" PRIu32 ", seq:%" PRIu16 - ", ts:%" PRIu32 "]", - bufferedPacket->GetSsrc(), - bufferedPacket->GetSequenceNumber(), - bufferedPacket->GetTimestamp(), - packet->GetSsrc(), - packet->GetSequenceNumber(), - packet->GetTimestamp()); - - SendRtpPacket(bufferedPacket, bufferedSharedPacket); - - // Be sure that the target layer retransmission buffer has not been - // emptied as a result of sending this packet. If so, exit the loop. - if (this->targetLayerRetransmissionBuffer.empty()) - { - MS_DEBUG_DEV( - "target layer retransmission buffer emptied while iterating it, exiting the loop"); - - break; - } - } - } - } - - this->targetLayerRetransmissionBuffer.clear(); - } - } - - bool SimpleConsumer::GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs) - { - MS_TRACE(); - - if (static_cast((nowMs - this->lastRtcpSentTime) * 1.15) < this->maxRtcpInterval) - { - return true; - } - - auto* senderReport = this->rtpStream->GetRtcpSenderReport(nowMs); - - if (!senderReport) - { - return true; - } - - // Build SDES chunk for this sender. - auto* sdesChunk = this->rtpStream->GetRtcpSdesChunk(); - - auto* delaySinceLastRrSsrcInfo = this->rtpStream->GetRtcpXrDelaySinceLastRrSsrcInfo(nowMs); - - // RTCP Compound packet buffer cannot hold the data. - if (!packet->Add(senderReport, sdesChunk, delaySinceLastRrSsrcInfo)) - { - return false; - } - - this->lastRtcpSentTime = nowMs; - - return true; - } - - void SimpleConsumer::NeedWorstRemoteFractionLost( - uint32_t /*mappedSsrc*/, uint8_t& worstRemoteFractionLost) - { - MS_TRACE(); - - if (!IsActive()) - { - return; - } - - auto fractionLost = this->rtpStream->GetFractionLost(); - - // If our fraction lost is worse than the given one, update it. - worstRemoteFractionLost = std::max(fractionLost, worstRemoteFractionLost); - } - - void SimpleConsumer::ReceiveNack(RTC::RTCP::FeedbackRtpNackPacket* nackPacket) - { - MS_TRACE(); - - if (!IsActive()) - { - return; - } - - // May emit 'trace' event. - EmitTraceEventNackType(); - - this->rtpStream->ReceiveNack(nackPacket); - } - - void SimpleConsumer::ReceiveKeyFrameRequest( - RTC::RTCP::FeedbackPs::MessageType messageType, uint32_t ssrc) - { - MS_TRACE(); - - switch (messageType) - { - case RTC::RTCP::FeedbackPs::MessageType::PLI: - { - EmitTraceEventPliType(ssrc); - - break; - } - - case RTC::RTCP::FeedbackPs::MessageType::FIR: - { - EmitTraceEventFirType(ssrc); - - break; - } - - default:; - } - - this->rtpStream->ReceiveKeyFrameRequest(messageType); - - if (IsActive()) - { - RequestKeyFrame(); - } - } - - void SimpleConsumer::ReceiveRtcpReceiverReport(RTC::RTCP::ReceiverReport* report) - { - MS_TRACE(); - - this->rtpStream->ReceiveRtcpReceiverReport(report); - } - - void SimpleConsumer::ReceiveRtcpXrReceiverReferenceTime(RTC::RTCP::ReceiverReferenceTime* report) - { - MS_TRACE(); - - this->rtpStream->ReceiveRtcpXrReceiverReferenceTime(report); - } - - uint32_t SimpleConsumer::GetTransmissionRate(uint64_t nowMs) - { - MS_TRACE(); - - if (!IsActive()) - { - return 0u; - } - - return this->rtpStream->GetBitrate(nowMs); - } - - float SimpleConsumer::GetRtt() const - { - MS_TRACE(); - - return this->rtpStream->GetRtt(); - } - - void SimpleConsumer::UserOnTransportConnected() - { - MS_TRACE(); - - this->syncRequired = true; - - if (IsActive()) - { - RequestKeyFrame(); - } - } - - void SimpleConsumer::UserOnTransportDisconnected() - { - MS_TRACE(); - - this->rtpStream->Pause(); - this->targetLayerRetransmissionBuffer.clear(); - } - - void SimpleConsumer::UserOnPaused() - { - MS_TRACE(); - - this->rtpStream->Pause(); - this->targetLayerRetransmissionBuffer.clear(); - - if (this->externallyManagedBitrate && this->kind == RTC::Media::Kind::VIDEO) - { - this->listener->OnConsumerNeedZeroBitrate(this); - } - } - - void SimpleConsumer::UserOnResumed() - { - MS_TRACE(); - - this->syncRequired = true; - - if (IsActive()) - { - RequestKeyFrame(); - } - } - - void SimpleConsumer::CreateRtpStream() - { - MS_TRACE(); - - auto& encoding = this->rtpParameters.encodings[0]; - const auto* mediaCodec = this->rtpParameters.GetCodecForEncoding(encoding); - - MS_DEBUG_TAG( - rtp, "[ssrc:%" PRIu32 ", payloadType:%" PRIu8 "]", encoding.ssrc, mediaCodec->payloadType); - - // Set stream params. - RTC::RTP::RtpStream::Params params; - - params.ssrc = encoding.ssrc; - params.payloadType = mediaCodec->payloadType; - params.mimeType = mediaCodec->mimeType; - params.clockRate = mediaCodec->clockRate; - params.cname = this->rtpParameters.rtcp.cname; - - // Check in band FEC in codec parameters. - if (mediaCodec->parameters.HasInteger("useinbandfec") && mediaCodec->parameters.GetInteger("useinbandfec") == 1) - { - MS_DEBUG_TAG(rtp, "in band FEC enabled"); - - params.useInBandFec = true; - } - - // Check DTX in codec parameters. - if (mediaCodec->parameters.HasInteger("usedtx") && mediaCodec->parameters.GetInteger("usedtx") == 1) - { - MS_DEBUG_TAG(rtp, "DTX enabled"); - - params.useDtx = true; - } - - // Check DTX in the encoding. - if (encoding.dtx) - { - MS_DEBUG_TAG(rtp, "DTX enabled"); - - params.useDtx = true; - } - - for (const auto& fb : mediaCodec->rtcpFeedback) - { - if (!params.useNack && fb.type == "nack" && fb.parameter.empty()) - { - MS_DEBUG_2TAGS(rtp, rtcp, "NACK supported"); - - params.useNack = true; - } - else if (!params.usePli && fb.type == "nack" && fb.parameter == "pli") - { - MS_DEBUG_2TAGS(rtp, rtcp, "PLI supported"); - - params.usePli = true; - } - else if (!params.useFir && fb.type == "ccm" && fb.parameter == "fir") - { - MS_DEBUG_2TAGS(rtp, rtcp, "FIR supported"); - - params.useFir = true; - } - } - - this->rtpStream = new RTC::RTP::RtpStreamSend(this, params, this->rtpParameters.mid); - this->rtpStreams.push_back(this->rtpStream); - - // If the Consumer is paused, tell the RtpStreamSend. - if (IsPaused() || IsProducerPaused()) - { - this->rtpStream->Pause(); - } - - const auto* rtxCodec = this->rtpParameters.GetRtxCodecForEncoding(encoding); - - if (rtxCodec && encoding.hasRtx) - { - this->rtpStream->SetRtx(rtxCodec->payloadType, encoding.rtx.ssrc); - } - } - - void SimpleConsumer::RequestKeyFrame() - { - MS_TRACE(); - - if (this->kind != RTC::Media::Kind::VIDEO) - { - return; - } - - auto mappedSsrc = this->consumableRtpEncodings[0].ssrc; - - this->listener->OnConsumerKeyFrameRequested(this, mappedSsrc); - } - - void SimpleConsumer::StorePacketInTargetLayerRetransmissionBuffer( - RTC::RTP::Packet* packet, RTC::RTP::SharedPacket& sharedPacket) - { - MS_TRACE(); - - MS_DEBUG_DEV( - "storing packet in target layer retransmission buffer [ssrc:%" PRIu32 ", seq:%" PRIu16 - ", ts:%" PRIu32 "]", - packet->GetSsrc(), - packet->GetSequenceNumber(), - packet->GetTimestamp()); - - // Store original packet into the buffer. Only clone once and only if - // necessary. - if (!sharedPacket.HasPacket()) - { - sharedPacket.Assign(packet); - } - // Assert that, if sharedPacket was already filled, both packet and - // sharedPacket are the very same RTP packet. - else - { - sharedPacket.AssertSamePacket(packet); - } - - this->targetLayerRetransmissionBuffer[packet->GetSequenceNumber()] = sharedPacket; - - if (this->targetLayerRetransmissionBuffer.size() > TargetLayerRetransmissionBufferSize) - { - this->targetLayerRetransmissionBuffer.erase(this->targetLayerRetransmissionBuffer.begin()); - } - } - - void SimpleConsumer::EmitScore() const - { - MS_TRACE(); - - auto scoreOffset = FillBufferScore(this->shared->channelNotifier->GetBufferBuilder()); - - auto notificationOffset = FBS::Consumer::CreateScoreNotification( - this->shared->channelNotifier->GetBufferBuilder(), scoreOffset); - - this->shared->channelNotifier->Emit( - this->id, - FBS::Notification::Event::CONSUMER_SCORE, - FBS::Notification::Body::Consumer_ScoreNotification, - notificationOffset); - } - - void SimpleConsumer::OnRtpStreamScore( - RTC::RTP::RtpStream* /*rtpStream*/, uint8_t /*score*/, uint8_t /*previousScore*/) - { - MS_TRACE(); - - // Emit the score event. - EmitScore(); - } - - void SimpleConsumer::OnRtpStreamRetransmitRtpPacket( - RTC::RTP::RtpStreamSend* /*rtpStream*/, RTC::RTP::Packet* packet) - { - MS_TRACE(); - - this->listener->OnConsumerRetransmitRtpPacket(this, packet); - - // May emit 'trace' event. - EmitTraceEventRtpAndKeyFrameTypes(packet, this->rtpStream->HasRtx()); - } -} // namespace RTC diff --git a/worker/src/RTC/Transport.cpp b/worker/src/RTC/Transport.cpp index ec9dac0038..643c8efe14 100644 --- a/worker/src/RTC/Transport.cpp +++ b/worker/src/RTC/Transport.cpp @@ -12,6 +12,7 @@ #include "FBS/transport.h" #include "RTC/BweType.hpp" #include "RTC/Consts.hpp" +#include "RTC/MultiStreamConsumer.hpp" #include "RTC/PipeConsumer.hpp" #include "RTC/RTCP/FeedbackPs.hpp" #include "RTC/RTCP/FeedbackPsAfb.hpp" @@ -20,8 +21,6 @@ #include "RTC/RTCP/FeedbackRtpTransport.hpp" #include "RTC/RTCP/XrDelaySinceLastRr.hpp" #include "RTC/RtpDictionaries.hpp" -#include "RTC/SimpleConsumer.hpp" -#include "RTC/SimulcastConsumer.hpp" #include "RTC/SvcConsumer.hpp" #ifdef MS_RTC_LOGGER_RTP #include "RTC/RtcLogger.hpp" @@ -806,7 +805,8 @@ namespace RTC case RTC::RtpParameters::Type::SIMPLE: { // This may throw. - consumer = new RTC::SimpleConsumer(this->shared, consumerId, producerId, this, body); + consumer = new RTC::MultiStreamConsumer( + this->shared, consumerId, producerId, RTC::RtpParameters::Type::SIMPLE, this, body); break; } @@ -814,7 +814,8 @@ namespace RTC case RTC::RtpParameters::Type::SIMULCAST: { // This may throw. - consumer = new RTC::SimulcastConsumer(this->shared, consumerId, producerId, this, body); + consumer = new RTC::MultiStreamConsumer( + this->shared, consumerId, producerId, RTC::RtpParameters::Type::SIMULCAST, this, body); break; } diff --git a/worker/test/src/RTC/TestSimpleConsumer.cpp b/worker/test/src/RTC/TestMultiStreamConsumer.cpp similarity index 90% rename from worker/test/src/RTC/TestSimpleConsumer.cpp rename to worker/test/src/RTC/TestMultiStreamConsumer.cpp index 5a23c943d1..f01250be4a 100644 --- a/worker/test/src/RTC/TestSimpleConsumer.cpp +++ b/worker/test/src/RTC/TestMultiStreamConsumer.cpp @@ -3,13 +3,13 @@ #include "Channel/ChannelSocket.hpp" #include "FBS/rtpParameters.h" #include "FBS/transport.h" +#include "RTC/MultiStreamConsumer.hpp" #include "RTC/RTP/Packet.hpp" #include "RTC/RTP/RtpStream.hpp" #include "RTC/RTP/RtpStreamRecv.hpp" #include "RTC/RTP/SharedPacket.hpp" #include "RTC/RtpDictionaries.hpp" #include "RTC/Shared.hpp" -#include "RTC/SimpleConsumer.hpp" #include namespace @@ -113,7 +113,7 @@ namespace return rtpParameters.FillBuffer(builder); }; - std::unique_ptr createConsumer(ConsumerListener* listener) + std::unique_ptr createConsumer(ConsumerListener* listener) { flatbuffers::FlatBufferBuilder bufferBuilder; @@ -141,10 +141,11 @@ namespace const auto* consumeRequest = flatbuffers::GetRoot(buf); - return std::make_unique( + return std::make_unique( &shared, consumeRequest->consumerId()->str(), consumeRequest->producerId()->str(), + RTC::RtpParameters::Type::SIMPLE, listener, consumeRequest); } @@ -168,26 +169,33 @@ namespace rtpStream(createRtpStreamRecv()) { // Set producer scores and producer stream. - const std::vector scores{ 10 }; + // NOTE: This must be static because the Consumer stores the given vector + // pointer which is supposed to exist in the associated Producer (but here + // there is no associated Producer). + static const std::vector scores{ 10 }; consumer->ProducerRtpStreamScores(&scores); - consumer->ProducerNewRtpStream(rtpStream.get(), 1234); + // NOTE: mappedSsrc here MUST be 1234567890 (otherwise Consumer will crash). + // This is guaranteed by Producer class, however here we must do it manually. + consumer->ProducerNewRtpStream(rtpStream.get(), 1234567890); } std::unique_ptr listener; - std::unique_ptr consumer; + std::unique_ptr consumer; std::unique_ptr rtpStream; }; } // namespace -SCENARIO("SimpleConsumer", "[rtp][consumer]") +SCENARIO("MultiStreamConsumer", "[rtp][consumer]") { + // TODO: We should NOT parse RTP packets for tests anymore. We should use + // RTC::RTP::Packet::Factory() instead. // clang-format off uint8_t buffer[] = { 0x80, 0x01, 0x00, 0x08, 0x00, 0x00, 0x00, 0x04, - 0x00, 0x00, 0x00, 0x05, + 0x49, 0x96, 0x02, 0xD2, // SSRC: 1234567890 (must be this exact value). // Payload (4 bytes). 0xFF, 0xFF, 0xFF, 0xFF, // From here this is just buffer enough for the variable length payload so