Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions node/src/test/test-Consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,10 @@ test('transport.consume() succeeds', async () => {
producerScore: 0,
producerScores: [0],
});
expect(audioConsumer.preferredLayers).toBeUndefined();
expect(audioConsumer.preferredLayers).toEqual({
spatialLayer: 0,
temporalLayer: 0,
});
expect(audioConsumer.currentLayers).toBeUndefined();
expect(audioConsumer.appData).toEqual({ baz: 'LOL' });

Expand Down Expand Up @@ -879,7 +882,10 @@ test('consumer.setPreferredLayers() succeed', async () => {

await audioConsumer.setPreferredLayers({ spatialLayer: 1, temporalLayer: 1 });

expect(audioConsumer.preferredLayers).toBeUndefined();
expect(audioConsumer.preferredLayers).toEqual({
spatialLayer: 0,
temporalLayer: 0,
});

await videoConsumer.setPreferredLayers({ spatialLayer: 2, temporalLayer: 3 });

Expand Down
105 changes: 77 additions & 28 deletions node/src/test/test-Producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import * as FbsProducer from '../fbs/producer';

type TestContext = {
mediaCodecs: mediasoup.types.RouterRtpCodecCapability[];
audioProducerOptions: mediasoup.types.ProducerOptions;
audioProducerOptions1: mediasoup.types.ProducerOptions;
audioProducerOptions2: mediasoup.types.ProducerOptions;
videoProducerOptions: mediasoup.types.ProducerOptions;
worker?: mediasoup.types.Worker;
router?: mediasoup.types.Router;
Expand All @@ -33,6 +34,11 @@ const ctx: TestContext = {
foo: '111',
},
},
{
kind: 'audio',
mimeType: 'audio/PCMA',
clockRate: 8000,
},
{
kind: 'video',
mimeType: 'video/VP8',
Expand All @@ -51,7 +57,7 @@ const ctx: TestContext = {
rtcpFeedback: [], // Will be ignored.
},
]),
audioProducerOptions: utils.deepFreeze<mediasoup.types.ProducerOptions>({
audioProducerOptions1: utils.deepFreeze<mediasoup.types.ProducerOptions>({
kind: 'audio',
rtpParameters: {
mid: 'AUDIO',
Expand Down Expand Up @@ -81,11 +87,34 @@ const ctx: TestContext = {
],
// Missing encodings on purpose.
rtcp: {
cname: 'audio-1',
cname: 'audio-cname',
},
},
appData: { foo: 1, bar: '2' },
}),
audioProducerOptions2: utils.deepFreeze<mediasoup.types.ProducerOptions>({
kind: 'audio',
rtpParameters: {
mid: 'AUDIO-2',
codecs: [
{
mimeType: 'audio/PCMA',
payloadType: 1,
clockRate: 8000,
},
],
headerExtensions: [
{
uri: 'urn:ietf:params:rtp-hdrext:sdes:mid',
id: 10,
},
],
encodings: [{ ssrc: 20000000 }],
rtcp: {
cname: 'audio-cname',
},
},
}),
videoProducerOptions: utils.deepFreeze<mediasoup.types.ProducerOptions>({
kind: 'video',
rtpParameters: {
Expand Down Expand Up @@ -129,7 +158,7 @@ const ctx: TestContext = {
{ ssrc: 22222228, rtx: { ssrc: 22222229 } },
],
rtcp: {
cname: 'video-1',
cname: 'video-cname',
},
},
appData: { foo: 1, bar: '2' },
Expand Down Expand Up @@ -158,33 +187,53 @@ afterEach(async () => {
test('webRtcTransport1.produce() succeeds', async () => {
const onObserverNewProducer = jest.fn();

ctx.webRtcTransport1!.observer.once('newproducer', onObserverNewProducer);
ctx.webRtcTransport1!.observer.on('newproducer', onObserverNewProducer);

const audioProducer = await ctx.webRtcTransport1!.produce(
ctx.audioProducerOptions
const audioProducer1 = await ctx.webRtcTransport1!.produce(
ctx.audioProducerOptions1
);

expect(onObserverNewProducer).toHaveBeenCalledTimes(1);
expect(onObserverNewProducer).toHaveBeenCalledWith(audioProducer);
expect(typeof audioProducer.id).toBe('string');
expect(audioProducer.closed).toBe(false);
expect(audioProducer.kind).toBe('audio');
expect(typeof audioProducer.rtpParameters).toBe('object');
expect(audioProducer.type).toBe('simple');
const audioProducer2 = await ctx.webRtcTransport1!.produce(
ctx.audioProducerOptions2
);

expect(onObserverNewProducer).toHaveBeenCalledTimes(2);
expect(onObserverNewProducer).toHaveBeenCalledWith(audioProducer1);
expect(onObserverNewProducer).toHaveBeenCalledWith(audioProducer2);

expect(typeof audioProducer1.id).toBe('string');
expect(audioProducer1.closed).toBe(false);
expect(audioProducer1.kind).toBe('audio');
expect(typeof audioProducer1.rtpParameters).toBe('object');
expect(audioProducer1.type).toBe('simple');
// Private API.
expect(typeof audioProducer.consumableRtpParameters).toBe('object');
expect(audioProducer.paused).toBe(false);
expect(audioProducer.score).toEqual([]);
expect(audioProducer.appData).toEqual({ foo: 1, bar: '2' });
expect(typeof audioProducer1.consumableRtpParameters).toBe('object');
expect(audioProducer1.paused).toBe(false);
expect(audioProducer1.score).toEqual([]);
expect(audioProducer1.appData).toEqual({ foo: 1, bar: '2' });

expect(typeof audioProducer2.id).toBe('string');
expect(audioProducer2.closed).toBe(false);
expect(audioProducer2.kind).toBe('audio');
expect(typeof audioProducer2.rtpParameters).toBe('object');
expect(audioProducer2.type).toBe('simple');
// Private API.
expect(typeof audioProducer2.consumableRtpParameters).toBe('object');
expect(audioProducer2.paused).toBe(false);
expect(audioProducer2.score).toEqual([]);
expect(audioProducer2.appData).toEqual({});

await expect(ctx.router!.dump()).resolves.toMatchObject({
mapProducerIdConsumerIds: [{ key: audioProducer.id, values: [] }],
mapProducerIdConsumerIds: expect.arrayContaining([
{ key: audioProducer1.id, values: [] },
{ key: audioProducer2.id, values: [] },
]),
mapConsumerIdProducerId: [],
});

await expect(ctx.webRtcTransport1!.dump()).resolves.toMatchObject({
id: ctx.webRtcTransport1!.id,
producerIds: [audioProducer.id],
producerIds: expect.arrayContaining([audioProducer1.id, audioProducer2.id]),
consumerIds: [],
});
}, 2000);
Expand Down Expand Up @@ -489,7 +538,7 @@ test('transport.produce() with no MID and with single encoding without RID or SS

test('producer.dump() succeeds', async () => {
const audioProducer = await ctx.webRtcTransport1!.produce(
ctx.audioProducerOptions
ctx.audioProducerOptions1
);

const dump1 = await audioProducer.dump();
Expand Down Expand Up @@ -599,7 +648,7 @@ test('producer.dump() succeeds', async () => {

test('producer.getStats() succeeds', async () => {
const audioProducer = await ctx.webRtcTransport1!.produce(
ctx.audioProducerOptions
ctx.audioProducerOptions1
);

const videoProducer = await ctx.webRtcTransport2!.produce(
Expand All @@ -613,7 +662,7 @@ test('producer.getStats() succeeds', async () => {

test('producer.pause() and resume() succeed', async () => {
const audioProducer = await ctx.webRtcTransport1!.produce(
ctx.audioProducerOptions
ctx.audioProducerOptions1
);

const onObserverPause = jest.fn();
Expand Down Expand Up @@ -647,7 +696,7 @@ test('producer.pause() and resume() succeed', async () => {

test('producer.pause() and resume() emit events', async () => {
const audioProducer = await ctx.webRtcTransport1!.produce(
ctx.audioProducerOptions
ctx.audioProducerOptions1
);

const promises = [];
Expand All @@ -672,7 +721,7 @@ test('producer.pause() and resume() emit events', async () => {

test('producer.enableTraceEvent() succeed', async () => {
const audioProducer = await ctx.webRtcTransport1!.produce(
ctx.audioProducerOptions
ctx.audioProducerOptions1
);

await audioProducer.enableTraceEvent(['rtp', 'pli']);
Expand Down Expand Up @@ -705,7 +754,7 @@ test('producer.enableTraceEvent() succeed', async () => {

test('producer.enableTraceEvent() with wrong arguments rejects with TypeError', async () => {
const audioProducer = await ctx.webRtcTransport1!.produce(
ctx.audioProducerOptions
ctx.audioProducerOptions1
);

// @ts-expect-error --- Testing purposes.
Expand Down Expand Up @@ -776,7 +825,7 @@ test('Producer emits "score"', async () => {

test('producer.close() succeeds', async () => {
const audioProducer = await ctx.webRtcTransport1!.produce(
ctx.audioProducerOptions
ctx.audioProducerOptions1
);

const onObserverClose = jest.fn();
Expand All @@ -801,7 +850,7 @@ test('producer.close() succeeds', async () => {

test('Producer methods reject if closed', async () => {
const audioProducer = await ctx.webRtcTransport1!.produce(
ctx.audioProducerOptions
ctx.audioProducerOptions1
);

audioProducer.close();
Expand Down
16 changes: 14 additions & 2 deletions rust/tests/integration/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,13 @@ fn consume_succeeds() {
producer_scores: vec![0]
}
);
assert_eq!(audio_consumer.preferred_layers(), None);
assert_eq!(
audio_consumer.preferred_layers(),
Some(ConsumerLayers {
spatial_layer: 0,
temporal_layer: Some(0)
})
);
assert_eq!(audio_consumer.current_layers(), None);
assert_eq!(
audio_consumer
Expand Down Expand Up @@ -1320,7 +1326,13 @@ fn set_preferred_layers_succeeds() {
.await
.expect("Failed to set preferred layers consumer");

assert_eq!(audio_consumer.preferred_layers(), None);
assert_eq!(
audio_consumer.preferred_layers(),
Some(ConsumerLayers {
spatial_layer: 0,
temporal_layer: Some(0)
})
);
}

{
Expand Down
3 changes: 2 additions & 1 deletion worker/include/RTC/Consumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "Channel/ChannelRequest.hpp"
#include "Channel/ChannelSocket.hpp"
#include "FBS/consumer.h"
#include "FBS/transport.h"
#include "RTC/ConsumerTypes.hpp"
#include "RTC/RTCP/CompoundPacket.hpp"
#include "RTC/RTCP/FeedbackRtpNack.hpp"
Expand Down Expand Up @@ -132,7 +133,7 @@ namespace RTC
RTC::RTP::RtpStreamRecv* rtpStream, uint8_t score, uint8_t previousScore) = 0;
virtual void ProducerRtcpSenderReport(RTC::RTP::RtpStreamRecv* rtpStream, bool first) = 0;
void ProducerClosed();
void SetExternallyManagedBitrate()
virtual void SetExternallyManagedBitrate()
{
this->externallyManagedBitrate = true;
}
Expand Down
3 changes: 3 additions & 0 deletions worker/include/RTC/ConsumerTypes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace RTC
}

VideoLayers(const VideoLayers& other) = default;

bool operator==(const VideoLayers& other) const
{
return spatial == other.spatial && temporal == other.temporal;
Expand All @@ -29,6 +30,8 @@ namespace RTC
return !(*this == other);
}

void Dump(int indentation) const;

void Reset()
{
spatial = -1;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef MS_RTC_SIMULCAST_CONSUMER_HPP
#define MS_RTC_SIMULCAST_CONSUMER_HPP
#ifndef MS_RTC_MULTI_STREAM_CONSUMER_HPP
#define MS_RTC_MULTI_STREAM_CONSUMER_HPP

#include "FBS/consumer.h"
#include "RTC/Consumer.hpp"
Expand All @@ -10,16 +10,17 @@

namespace RTC
{
class SimulcastConsumer : public RTC::Consumer, public RTC::RTP::RtpStreamSend::Listener
class MultiStreamConsumer : public RTC::Consumer, public RTC::RTP::RtpStreamSend::Listener
{
public:
SimulcastConsumer(
MultiStreamConsumer(
RTC::Shared* shared,
const std::string& id,
const std::string& producerId,
RTC::RtpParameters::Type type,
RTC::Consumer::Listener* listener,
const FBS::Transport::ConsumeRequest* data);
~SimulcastConsumer() override;
~MultiStreamConsumer() override;

public:
flatbuffers::Offset<FBS::Consumer::DumpResponse> FillBuffer(
Expand Down Expand Up @@ -60,6 +61,14 @@ namespace RTC
void ProducerRtpStreamScore(
RTC::RTP::RtpStreamRecv* rtpStream, uint8_t score, uint8_t previousScore) override;
void ProducerRtcpSenderReport(RTC::RTP::RtpStreamRecv* rtpStream, bool first) override;
void SetExternallyManagedBitrate() override
{
// Only allow externally managed bitrate video.
if (this->kind == RTC::Media::Kind::VIDEO)
{
this->externallyManagedBitrate = true;
}
}
uint8_t GetBitratePriority() const override;
uint32_t IncreaseLayer(uint32_t bitrate, bool considerLoss) override;
void ApplyLayers() override;
Expand Down Expand Up @@ -116,6 +125,7 @@ namespace RTC
absl::flat_hash_map<uint32_t, int16_t> mapMappedSsrcSpatialLayer;
std::vector<RTC::RTP::RtpStreamSend*> rtpStreams;
std::vector<RTC::RTP::RtpStreamRecv*> producerRtpStreams; // Indexed by spatial layer.
bool keyFrameSupported{ false };
bool syncRequired{ false };
int16_t spatialLayerToSync{ -1 };
bool lastSentPacketHasMarker{ false };
Expand Down
Loading
Loading