@@ -551,7 +551,7 @@ void srt::CUDT::getOpt(SRT_SOCKOPT optName, void *optval, int &optlen)
551
551
else
552
552
{
553
553
enterCS(m_RecvLock);
554
-
if (m_pRcvBuffer && m_pRcvBuffer->isRcvDataReady())
554
+
if (m_pRcvBuffer && isRcvBufferReady())
555
555
event |= SRT_EPOLL_IN;
556
556
leaveCS(m_RecvLock);
557
557
if (m_pSndBuffer && (m_config.iSndBufSize > m_pSndBuffer->getCurrBufSize()))
@@ -5211,7 +5211,6 @@ void * srt::CUDT::tsbpd(void *param)
5211
5211
{
5212
5212
int32_t skiptoseqno = SRT_SEQNO_NONE;
5213
5213
bool passack = true; // Get next packet to wait for even if not acked
5214
-
5215
5214
rxready = self->m_pRcvBuffer->getRcvFirstMsg((tsbpdtime), (passack), (skiptoseqno), (current_pkt_seq), rcv_base_seq);
5216
5215
5217
5216
HLOGC(tslog.Debug,
@@ -6052,7 +6051,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)
6052
6051
6053
6052
UniqueLock recvguard(m_RecvLock);
6054
6053
6055
-
if ((m_bBroken || m_bClosing) && !m_pRcvBuffer->isRcvDataReady())
6054
+
if ((m_bBroken || m_bClosing) && !isRcvBufferReady())
6056
6055
{
6057
6056
if (m_bShutdown)
6058
6057
{
@@ -6084,7 +6083,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)
6084
6083
6085
6084
CSync rcond (m_RecvDataCond, recvguard);
6086
6085
CSync tscond (m_RcvTsbPdCond, recvguard);
6087
-
if (!m_pRcvBuffer->isRcvDataReady())
6086
+
if (!isRcvBufferReady())
6088
6087
{
6089
6088
if (!m_config.bSynRecving)
6090
6089
{
@@ -6096,7 +6095,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)
6096
6095
if (m_config.iRcvTimeOut < 0)
6097
6096
{
6098
6097
THREAD_PAUSED();
6099
-
while (stillConnected() && !m_pRcvBuffer->isRcvDataReady())
6098
+
while (stillConnected() && !isRcvBufferReady())
6100
6099
{
6101
6100
// Do not block forever, check connection status each 1 sec.
6102
6101
rcond.wait_for(seconds_from(1));
@@ -6108,7 +6107,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)
6108
6107
const steady_clock::time_point exptime =
6109
6108
steady_clock::now() + milliseconds_from(m_config.iRcvTimeOut);
6110
6109
THREAD_PAUSED();
6111
-
while (stillConnected() && !m_pRcvBuffer->isRcvDataReady())
6110
+
while (stillConnected() && !isRcvBufferReady())
6112
6111
{
6113
6112
if (!rcond.wait_until(exptime)) // NOT means "not received a signal"
6114
6113
break; // timeout
@@ -6122,7 +6121,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)
6122
6121
if (!m_bConnected)
6123
6122
throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
6124
6123
6125
-
if ((m_bBroken || m_bClosing) && !m_pRcvBuffer->isRcvDataReady())
6124
+
if ((m_bBroken || m_bClosing) && !isRcvBufferReady())
6126
6125
{
6127
6126
// See at the beginning
6128
6127
if (!m_config.bMessageAPI && m_bShutdown)
@@ -6152,7 +6151,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)
6152
6151
HLOGP(tslog.Debug, "NOT pinging TSBPD - not set");
6153
6152
}
6154
6153
6155
-
if (!m_pRcvBuffer->isRcvDataReady())
6154
+
if (!isRcvBufferReady())
6156
6155
{
6157
6156
// read is not available any more
6158
6157
uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false);
@@ -6607,6 +6606,23 @@ int srt::CUDT::recvmsg2(char* data, int len, SRT_MSGCTRL& w_mctrl)
6607
6606
return receiveBuffer(data, len);
6608
6607
}
6609
6608
6609
+
size_t srt::CUDT::getAvailRcvBufferSizeLock() const
6610
+
{
6611
+
ScopedLock lck(m_RcvBufferLock);
6612
+
return m_pRcvBuffer->getAvailBufSize();
6613
+
}
6614
+
6615
+
size_t srt::CUDT::getAvailRcvBufferSizeNoLock() const
6616
+
{
6617
+
return m_pRcvBuffer->getAvailBufSize();
6618
+
}
6619
+
6620
+
bool srt::CUDT::isRcvBufferReady() const
6621
+
{
6622
+
ScopedLock lck(m_RcvBufferLock);
6623
+
return m_pRcvBuffer->isRcvDataReady();
6624
+
}
6625
+
6610
6626
// int by_exception: accepts values of CUDTUnited::ErrorHandling:
6611
6627
// - 0 - by return value
6612
6628
// - 1 - by exception
@@ -6647,7 +6663,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_
6647
6663
{
6648
6664
HLOGC(arlog.Debug, log << CONID() << "receiveMessage: CONNECTION BROKEN - reading from recv buffer just for formality");
6649
6665
enterCS(m_RcvBufferLock);
6650
-
int res = m_pRcvBuffer->readMsg(data, len);
6666
+
const int res = m_pRcvBuffer->readMsg(data, len);
6651
6667
leaveCS(m_RcvBufferLock);
6652
6668
w_mctrl.srctime = 0;
6653
6669
@@ -6662,7 +6678,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_
6662
6678
HLOGP(tslog.Debug, "NOT pinging TSBPD - not set");
6663
6679
}
6664
6680
6665
-
if (!m_pRcvBuffer->isRcvDataReady())
6681
+
if (!isRcvBufferReady())
6666
6682
{
6667
6683
// read is not available any more
6668
6684
uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false);
@@ -6713,7 +6729,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_
6713
6729
throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0);
6714
6730
}
6715
6731
6716
-
if (!m_pRcvBuffer->isRcvDataReady())
6732
+
if (!isRcvBufferReady())
6717
6733
{
6718
6734
// Kick TsbPd thread to schedule next wakeup (if running)
6719
6735
if (m_bTsbPd)
@@ -6792,12 +6808,12 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_
6792
6808
{
6793
6809
HLOGP(tslog.Debug, "receiveMessage: DATA COND: KICKED.");
6794
6810
}
6795
-
} while (stillConnected() && !timeout && (!m_pRcvBuffer->isRcvDataReady()));
6811
+
} while (stillConnected() && !timeout && (!isRcvBufferReady()));
6796
6812
THREAD_RESUMED();
6797
6813
6798
6814
HLOGC(tslog.Debug,
6799
6815
log << CONID() << "receiveMessage: lock-waiting loop exited: stillConntected=" << stillConnected()
6800
-
<< " timeout=" << timeout << " data-ready=" << m_pRcvBuffer->isRcvDataReady());
6816
+
<< " timeout=" << timeout << " data-ready=" << isRcvBufferReady());
6801
6817
}
6802
6818
6803
6819
/* XXX DEBUG STUFF - enable when required
@@ -6829,7 +6845,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_
6829
6845
}
6830
6846
} while ((res == 0) && !timeout);
6831
6847
6832
-
if (!m_pRcvBuffer->isRcvDataReady())
6848
+
if (!isRcvBufferReady())
6833
6849
{
6834
6850
// Falling here means usually that res == 0 && timeout == true.
6835
6851
// res == 0 would repeat the above loop, unless there was also a timeout.
@@ -6990,7 +7006,7 @@ int64_t srt::CUDT::recvfile(fstream &ofs, int64_t &offset, int64_t size, int blo
6990
7006
{
6991
7007
if (!m_bConnected || !m_CongCtl.ready())
6992
7008
throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
6993
-
else if ((m_bBroken || m_bClosing) && !m_pRcvBuffer->isRcvDataReady())
7009
+
else if ((m_bBroken || m_bClosing) && !isRcvBufferReady())
6994
7010
{
6995
7011
if (!m_config.bMessageAPI && m_bShutdown)
6996
7012
return 0;
@@ -7072,14 +7088,14 @@ int64_t srt::CUDT::recvfile(fstream &ofs, int64_t &offset, int64_t size, int blo
7072
7088
CSync rcond (m_RecvDataCond, recvguard);
7073
7089
7074
7090
THREAD_PAUSED();
7075
-
while (stillConnected() && !m_pRcvBuffer->isRcvDataReady())
7091
+
while (stillConnected() && !isRcvBufferReady())
7076
7092
rcond.wait();
7077
7093
THREAD_RESUMED();
7078
7094
}
7079
7095
7080
7096
if (!m_bConnected)
7081
7097
throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
7082
-
else if ((m_bBroken || m_bClosing) && !m_pRcvBuffer->isRcvDataReady())
7098
+
else if ((m_bBroken || m_bClosing) && !isRcvBufferReady())
7083
7099
{
7084
7100
if (!m_config.bMessageAPI && m_bShutdown)
7085
7101
return 0;
@@ -7098,7 +7114,7 @@ int64_t srt::CUDT::recvfile(fstream &ofs, int64_t &offset, int64_t size, int blo
7098
7114
}
7099
7115
}
7100
7116
7101
-
if (!m_pRcvBuffer->isRcvDataReady())
7117
+
if (!isRcvBufferReady())
7102
7118
{
7103
7119
// read is not available any more
7104
7120
uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false);
@@ -7239,7 +7255,7 @@ void srt::CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous)
7239
7255
7240
7256
if (m_pRcvBuffer)
7241
7257
{
7242
-
perf->byteAvailRcvBuf = m_pRcvBuffer->getAvailBufSize() * m_config.iMSS;
7258
+
perf->byteAvailRcvBuf = getAvailRcvBufferSizeLock() * m_config.iMSS;
7243
7259
if (instantaneous) // no need for historical API for Rcv side
7244
7260
{
7245
7261
perf->pktRcvBuf = m_pRcvBuffer->getRcvDataSize(perf->byteRcvBuf, perf->msRcvBuf);
@@ -7489,7 +7505,7 @@ void srt::CUDT::releaseSynch()
7489
7505
// [[using locked(m_RcvBufferLock)]];
7490
7506
int32_t srt::CUDT::ackDataUpTo(int32_t ack)
7491
7507
{
7492
-
int acksize = CSeqNo::seqoff(m_iRcvLastSkipAck, ack);
7508
+
const int acksize = CSeqNo::seqoff(m_iRcvLastSkipAck, ack);
7493
7509
7494
7510
HLOGC(xtlog.Debug, log << "ackDataUpTo: %" << ack << " vs. current %" << m_iRcvLastSkipAck
7495
7511
<< " (signing off " << acksize << " packets)");
@@ -7691,7 +7707,7 @@ void srt::CUDT::sendCtrl(UDTMessageType pkttype, const int32_t* lparam, void* rp
7691
7707
int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
7692
7708
{
7693
7709
SRT_ASSERT(ctrlpkt.getMsgTimeStamp() != 0);
7694
-
int32_t ack;
7710
+
int32_t ack; // First unacknowledged packet seqnuence number (acknowledge up to ack).
7695
7711
int nbsent = 0;
7696
7712
int local_prevack = 0;
7697
7713
@@ -7881,7 +7897,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
7881
7897
data[ACKD_RCVLASTACK] = m_iRcvLastAck;
7882
7898
data[ACKD_RTT] = m_iSRTT;
7883
7899
data[ACKD_RTTVAR] = m_iRTTVar;
7884
-
data[ACKD_BUFFERLEFT] = m_pRcvBuffer->getAvailBufSize();
7900
+
data[ACKD_BUFFERLEFT] = getAvailRcvBufferSizeNoLock();
7885
7901
// a minimum flow window of 2 is used, even if buffer is full, to break potential deadlock
7886
7902
if (data[ACKD_BUFFERLEFT] < 2)
7887
7903
data[ACKD_BUFFERLEFT] = 2;
@@ -9666,7 +9682,7 @@ int srt::CUDT::processData(CUnit* in_unit)
9666
9682
continue;
9667
9683
}
9668
9684
9669
-
const int avail_bufsize = m_pRcvBuffer->getAvailBufSize();
9685
+
const int avail_bufsize = getAvailRcvBufferSizeNoLock();
9670
9686
if (offset >= avail_bufsize)
9671
9687
{
9672
9688
// This is already a sequence discrepancy. Probably there could be found
@@ -9773,7 +9789,7 @@ int srt::CUDT::processData(CUnit* in_unit)
9773
9789
LOGC(qrlog.Debug, log << CONID() << "RECEIVED: seq=" << rpkt.m_iSeqNo
9774
9790
<< " offset=" << offset
9775
9791
<< " BUFr=" << avail_bufsize
9776
-
<< " avail=" << m_pRcvBuffer->getAvailBufSize()
9792
+
<< " avail=" << getAvailRcvBufferSizeNoLock()
9777
9793
<< " buffer=(" << m_iRcvLastSkipAck
9778
9794
<< ":" << m_iRcvCurrSeqNo // -1 = size to last index
9779
9795
<< "+" << CSeqNo::incseq(m_iRcvLastSkipAck, m_pRcvBuffer->capacity()-1)
@@ -11123,7 +11139,7 @@ void srt::CUDT::addEPoll(const int eid)
11123
11139
return;
11124
11140
11125
11141
enterCS(m_RecvLock);
11126
-
if (m_pRcvBuffer->isRcvDataReady())
11142
+
if (isRcvBufferReady())
11127
11143
{
11128
11144
uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, true);
11129
11145
}
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4