@@ -11,12 +11,13 @@ class CRcvBufferReadMsg
11
11
: public ::testing::Test
12
12
{
13
13
protected:
14
-
CRcvBufferReadMsg()
14
+
CRcvBufferReadMsg(bool message_api = true)
15
+
: m_use_message_api(message_api)
15
16
{
16
17
// initialization code here
17
18
}
18
19
19
-
~CRcvBufferReadMsg()
20
+
virtual ~CRcvBufferReadMsg()
20
21
{
21
22
// cleanup any pending stuff, but no exceptions allowed
22
23
}
@@ -31,7 +32,7 @@ class CRcvBufferReadMsg
31
32
m_unit_queue->init(m_buff_size_pkts, 1500, AF_INET);
32
33
33
34
#if ENABLE_NEW_RCVBUFFER
34
-
const bool enable_msg_api = true;
35
+
const bool enable_msg_api = m_use_message_api;
35
36
const bool enable_peer_rexmit = true;
36
37
m_rcv_buffer = unique_ptr<CRcvBufferNew>(new CRcvBufferNew(m_init_seqno, m_buff_size_pkts, m_unit_queue.get(), enable_peer_rexmit, enable_msg_api));
37
38
#else
@@ -161,6 +162,7 @@ class CRcvBufferReadMsg
161
162
const int m_init_seqno = 1000;
162
163
int m_first_unack_seqno = m_init_seqno;
163
164
static const size_t m_payload_sz = 1456;
165
+
const bool m_use_message_api;
164
166
165
167
const sync::steady_clock::time_point m_tsbpd_base = sync::steady_clock::now(); // now() - HS.timestamp, microseconds
166
168
const sync::steady_clock::duration m_delay = sync::milliseconds_from(200);
@@ -834,4 +836,112 @@ TEST_F(CRcvBufferReadMsg, TSBPDGapBeforeValid)
834
836
EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity());
835
837
}
836
838
839
+
840
+
class CRcvBufferReadStream
841
+
: public CRcvBufferReadMsg
842
+
{
843
+
protected:
844
+
CRcvBufferReadStream()
845
+
: CRcvBufferReadMsg(false)
846
+
{}
847
+
848
+
virtual ~CRcvBufferReadStream() { }
849
+
};
850
+
851
+
852
+
// Add ten packets to the buffer in stream mode, read some of them.
853
+
// Try to add packets to occupied positions.
854
+
TEST_F(CRcvBufferReadStream, ReadSinglePackets)
855
+
{
856
+
const int num_pkts = 10;
857
+
ASSERT_LT(num_pkts, m_buff_size_pkts);
858
+
for (int i = 0; i < num_pkts; ++i)
859
+
{
860
+
EXPECT_EQ(addPacket(CSeqNo::incseq(m_init_seqno, i), false, false), 0);
861
+
}
862
+
863
+
// The available buffer size remains the same
864
+
// The value is reported by SRT receiver like this:
865
+
// data[ACKD_BUFFERLEFT] = m_pRcvBuffer->getAvailBufSize();
866
+
EXPECT_EQ(getAvailBufferSize(), m_buff_size_pkts - 1);
867
+
EXPECT_TRUE(hasAvailablePackets());
868
+
869
+
// Now acknowledge two packets
870
+
const int ack_pkts = 2;
871
+
ackPackets(2);
872
+
EXPECT_EQ(getAvailBufferSize(), m_buff_size_pkts - 1 - ack_pkts);
873
+
EXPECT_TRUE(hasAvailablePackets());
874
+
875
+
std::array<char, m_payload_sz> buff;
876
+
for (int i = 0; i < ack_pkts; ++i)
877
+
{
878
+
const size_t res = m_rcv_buffer->readBuffer(buff.data(), buff.size());
879
+
EXPECT_TRUE(size_t(res) == m_payload_sz);
880
+
EXPECT_EQ(getAvailBufferSize(), m_buff_size_pkts - ack_pkts + i);
881
+
EXPECT_TRUE(verifyPayload(buff.data(), res, CSeqNo::incseq(m_init_seqno, i)));
882
+
}
883
+
884
+
// Add packet to the position of oackets already read.
885
+
// Can't check the old buffer, as it does not handle a negative offset.
886
+
EXPECT_EQ(addPacket(m_init_seqno), -2);
887
+
888
+
// Add packet to a non-empty position.
889
+
EXPECT_EQ(addPacket(CSeqNo::incseq(m_init_seqno, ack_pkts)), -1);
890
+
891
+
const int num_pkts_left = num_pkts - ack_pkts;
892
+
ackPackets(num_pkts_left);
893
+
for (int i = 0; i < num_pkts_left; ++i)
894
+
{
895
+
const int res = m_rcv_buffer->readBuffer(buff.data(), buff.size());
896
+
EXPECT_TRUE(size_t(res) == m_payload_sz);
897
+
EXPECT_EQ(getAvailBufferSize(), m_buff_size_pkts - num_pkts_left + i);
898
+
EXPECT_TRUE(verifyPayload(buff.data(), res, CSeqNo::incseq(m_init_seqno, ack_pkts + i)));
899
+
}
900
+
EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity());
901
+
}
902
+
903
+
904
+
// Add packets to the buffer in stream mode. Read fractional number of packets
905
+
// to confirm a partially read packet stays in the buffer and is read properly afterwards.
906
+
TEST_F(CRcvBufferReadStream, ReadFractional)
907
+
{
908
+
const int num_pkts = 10;
909
+
ASSERT_LT(num_pkts, m_buff_size_pkts);
910
+
for (int i = 0; i < num_pkts; ++i)
911
+
{
912
+
EXPECT_EQ(addPacket(CSeqNo::incseq(m_init_seqno, i), false, false), 0);
913
+
}
914
+
915
+
// The available buffer size remains the same
916
+
// The value is reported by SRT receiver like this:
917
+
// data[ACKD_BUFFERLEFT] = m_pRcvBuffer->getAvailBufSize();
918
+
EXPECT_EQ(getAvailBufferSize(), m_buff_size_pkts - 1);
919
+
EXPECT_TRUE(hasAvailablePackets());
920
+
921
+
array<char, m_payload_sz * num_pkts> buff;
922
+
923
+
const size_t nfull_pkts = 2;
924
+
const size_t num_bytes1 = nfull_pkts * m_payload_sz + m_payload_sz / 2;
925
+
const int res1 = m_rcv_buffer->readBuffer(buff.data(), num_bytes1);
926
+
EXPECT_TRUE(size_t(res1) == num_bytes1);
927
+
EXPECT_EQ(getAvailBufferSize(), m_buff_size_pkts - nfull_pkts - 1);
928
+
EXPECT_TRUE(hasAvailablePackets());
929
+
930
+
const size_t num_bytes2 = m_payload_sz * (num_pkts - nfull_pkts - 1) + m_payload_sz / 2;
931
+
932
+
const int res2 = m_rcv_buffer->readBuffer(buff.data() + num_bytes1, buff.size() - num_bytes1);
933
+
EXPECT_TRUE(size_t(res2) == num_bytes2);
934
+
EXPECT_EQ(getAvailBufferSize(), m_buff_size_pkts - num_pkts - 1);
935
+
EXPECT_FALSE(hasAvailablePackets());
936
+
ackPackets(num_pkts); // Move the reference ACK position.
937
+
EXPECT_EQ(getAvailBufferSize(), m_buff_size_pkts - 1);
938
+
939
+
for (int i = 0; i < num_pkts; ++i)
940
+
{
941
+
EXPECT_TRUE(verifyPayload(buff.data() + i * m_payload_sz, m_payload_sz, CSeqNo::incseq(m_init_seqno, i))) << "i = " << i;
942
+
}
943
+
944
+
EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity());
945
+
}
946
+
837
947
#endif // ENABEL_NEW_RCVBUFFER
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4