A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://github.com/Haivision/srt/commit/86d1eb2d98491c71313e887c1bedcecce0932f90 below:

[core] Added CUDT::isRcvReady() with mutex lock. · Haivision/srt@86d1eb2 · GitHub

@@ -551,7 +551,7 @@ void srt::CUDT::getOpt(SRT_SOCKOPT optName, void *optval, int &optlen)

551 551

else

552 552

{

553 553

enterCS(m_RecvLock);

554 -

if (m_pRcvBuffer && m_pRcvBuffer->isRcvDataReady())

554 +

if (m_pRcvBuffer && isRcvBufferReady())

555 555

event |= SRT_EPOLL_IN;

556 556

leaveCS(m_RecvLock);

557 557

if (m_pSndBuffer && (m_config.iSndBufSize > m_pSndBuffer->getCurrBufSize()))

@@ -5211,7 +5211,6 @@ void * srt::CUDT::tsbpd(void *param)

5211 5211

{

5212 5212

int32_t skiptoseqno = SRT_SEQNO_NONE;

5213 5213

bool passack = true; // Get next packet to wait for even if not acked

5214 - 5215 5214

rxready = self->m_pRcvBuffer->getRcvFirstMsg((tsbpdtime), (passack), (skiptoseqno), (current_pkt_seq), rcv_base_seq);

5216 5215 5217 5216

HLOGC(tslog.Debug,

@@ -6052,7 +6051,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)

6052 6051 6053 6052

UniqueLock recvguard(m_RecvLock);

6054 6053 6055 -

if ((m_bBroken || m_bClosing) && !m_pRcvBuffer->isRcvDataReady())

6054 +

if ((m_bBroken || m_bClosing) && !isRcvBufferReady())

6056 6055

{

6057 6056

if (m_bShutdown)

6058 6057

{

@@ -6084,7 +6083,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)

6084 6083 6085 6084

CSync rcond (m_RecvDataCond, recvguard);

6086 6085

CSync tscond (m_RcvTsbPdCond, recvguard);

6087 -

if (!m_pRcvBuffer->isRcvDataReady())

6086 +

if (!isRcvBufferReady())

6088 6087

{

6089 6088

if (!m_config.bSynRecving)

6090 6089

{

@@ -6096,7 +6095,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)

6096 6095

if (m_config.iRcvTimeOut < 0)

6097 6096

{

6098 6097

THREAD_PAUSED();

6099 -

while (stillConnected() && !m_pRcvBuffer->isRcvDataReady())

6098 +

while (stillConnected() && !isRcvBufferReady())

6100 6099

{

6101 6100

// Do not block forever, check connection status each 1 sec.

6102 6101

rcond.wait_for(seconds_from(1));

@@ -6108,7 +6107,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)

6108 6107

const steady_clock::time_point exptime =

6109 6108

steady_clock::now() + milliseconds_from(m_config.iRcvTimeOut);

6110 6109

THREAD_PAUSED();

6111 -

while (stillConnected() && !m_pRcvBuffer->isRcvDataReady())

6110 +

while (stillConnected() && !isRcvBufferReady())

6112 6111

{

6113 6112

if (!rcond.wait_until(exptime)) // NOT means "not received a signal"

6114 6113

break; // timeout

@@ -6122,7 +6121,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)

6122 6121

if (!m_bConnected)

6123 6122

throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);

6124 6123 6125 -

if ((m_bBroken || m_bClosing) && !m_pRcvBuffer->isRcvDataReady())

6124 +

if ((m_bBroken || m_bClosing) && !isRcvBufferReady())

6126 6125

{

6127 6126

// See at the beginning

6128 6127

if (!m_config.bMessageAPI && m_bShutdown)

@@ -6152,7 +6151,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)

6152 6151

HLOGP(tslog.Debug, "NOT pinging TSBPD - not set");

6153 6152

}

6154 6153 6155 -

if (!m_pRcvBuffer->isRcvDataReady())

6154 +

if (!isRcvBufferReady())

6156 6155

{

6157 6156

// read is not available any more

6158 6157

uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false);

@@ -6607,6 +6606,23 @@ int srt::CUDT::recvmsg2(char* data, int len, SRT_MSGCTRL& w_mctrl)

6607 6606

return receiveBuffer(data, len);

6608 6607

}

6609 6608 6609 +

size_t srt::CUDT::getAvailRcvBufferSizeLock() const

6610 +

{

6611 +

ScopedLock lck(m_RcvBufferLock);

6612 +

return m_pRcvBuffer->getAvailBufSize();

6613 +

}

6614 + 6615 +

size_t srt::CUDT::getAvailRcvBufferSizeNoLock() const

6616 +

{

6617 +

return m_pRcvBuffer->getAvailBufSize();

6618 +

}

6619 + 6620 +

bool srt::CUDT::isRcvBufferReady() const

6621 +

{

6622 +

ScopedLock lck(m_RcvBufferLock);

6623 +

return m_pRcvBuffer->isRcvDataReady();

6624 +

}

6625 + 6610 6626

// int by_exception: accepts values of CUDTUnited::ErrorHandling:

6611 6627

// - 0 - by return value

6612 6628

// - 1 - by exception

@@ -6647,7 +6663,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_

6647 6663

{

6648 6664

HLOGC(arlog.Debug, log << CONID() << "receiveMessage: CONNECTION BROKEN - reading from recv buffer just for formality");

6649 6665

enterCS(m_RcvBufferLock);

6650 -

int res = m_pRcvBuffer->readMsg(data, len);

6666 +

const int res = m_pRcvBuffer->readMsg(data, len);

6651 6667

leaveCS(m_RcvBufferLock);

6652 6668

w_mctrl.srctime = 0;

6653 6669

@@ -6662,7 +6678,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_

6662 6678

HLOGP(tslog.Debug, "NOT pinging TSBPD - not set");

6663 6679

}

6664 6680 6665 -

if (!m_pRcvBuffer->isRcvDataReady())

6681 +

if (!isRcvBufferReady())

6666 6682

{

6667 6683

// read is not available any more

6668 6684

uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false);

@@ -6713,7 +6729,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_

6713 6729

throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0);

6714 6730

}

6715 6731 6716 -

if (!m_pRcvBuffer->isRcvDataReady())

6732 +

if (!isRcvBufferReady())

6717 6733

{

6718 6734

// Kick TsbPd thread to schedule next wakeup (if running)

6719 6735

if (m_bTsbPd)

@@ -6792,12 +6808,12 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_

6792 6808

{

6793 6809

HLOGP(tslog.Debug, "receiveMessage: DATA COND: KICKED.");

6794 6810

}

6795 -

} while (stillConnected() && !timeout && (!m_pRcvBuffer->isRcvDataReady()));

6811 +

} while (stillConnected() && !timeout && (!isRcvBufferReady()));

6796 6812

THREAD_RESUMED();

6797 6813 6798 6814

HLOGC(tslog.Debug,

6799 6815

log << CONID() << "receiveMessage: lock-waiting loop exited: stillConntected=" << stillConnected()

6800 -

<< " timeout=" << timeout << " data-ready=" << m_pRcvBuffer->isRcvDataReady());

6816 +

<< " timeout=" << timeout << " data-ready=" << isRcvBufferReady());

6801 6817

}

6802 6818 6803 6819

/* XXX DEBUG STUFF - enable when required

@@ -6829,7 +6845,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_

6829 6845

}

6830 6846

} while ((res == 0) && !timeout);

6831 6847 6832 -

if (!m_pRcvBuffer->isRcvDataReady())

6848 +

if (!isRcvBufferReady())

6833 6849

{

6834 6850

// Falling here means usually that res == 0 && timeout == true.

6835 6851

// res == 0 would repeat the above loop, unless there was also a timeout.

@@ -6990,7 +7006,7 @@ int64_t srt::CUDT::recvfile(fstream &ofs, int64_t &offset, int64_t size, int blo

6990 7006

{

6991 7007

if (!m_bConnected || !m_CongCtl.ready())

6992 7008

throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);

6993 -

else if ((m_bBroken || m_bClosing) && !m_pRcvBuffer->isRcvDataReady())

7009 +

else if ((m_bBroken || m_bClosing) && !isRcvBufferReady())

6994 7010

{

6995 7011

if (!m_config.bMessageAPI && m_bShutdown)

6996 7012

return 0;

@@ -7072,14 +7088,14 @@ int64_t srt::CUDT::recvfile(fstream &ofs, int64_t &offset, int64_t size, int blo

7072 7088

CSync rcond (m_RecvDataCond, recvguard);

7073 7089 7074 7090

THREAD_PAUSED();

7075 -

while (stillConnected() && !m_pRcvBuffer->isRcvDataReady())

7091 +

while (stillConnected() && !isRcvBufferReady())

7076 7092

rcond.wait();

7077 7093

THREAD_RESUMED();

7078 7094

}

7079 7095 7080 7096

if (!m_bConnected)

7081 7097

throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);

7082 -

else if ((m_bBroken || m_bClosing) && !m_pRcvBuffer->isRcvDataReady())

7098 +

else if ((m_bBroken || m_bClosing) && !isRcvBufferReady())

7083 7099

{

7084 7100

if (!m_config.bMessageAPI && m_bShutdown)

7085 7101

return 0;

@@ -7098,7 +7114,7 @@ int64_t srt::CUDT::recvfile(fstream &ofs, int64_t &offset, int64_t size, int blo

7098 7114

}

7099 7115

}

7100 7116 7101 -

if (!m_pRcvBuffer->isRcvDataReady())

7117 +

if (!isRcvBufferReady())

7102 7118

{

7103 7119

// read is not available any more

7104 7120

uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false);

@@ -7239,7 +7255,7 @@ void srt::CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous)

7239 7255 7240 7256

if (m_pRcvBuffer)

7241 7257

{

7242 -

perf->byteAvailRcvBuf = m_pRcvBuffer->getAvailBufSize() * m_config.iMSS;

7258 +

perf->byteAvailRcvBuf = getAvailRcvBufferSizeLock() * m_config.iMSS;

7243 7259

if (instantaneous) // no need for historical API for Rcv side

7244 7260

{

7245 7261

perf->pktRcvBuf = m_pRcvBuffer->getRcvDataSize(perf->byteRcvBuf, perf->msRcvBuf);

@@ -7489,7 +7505,7 @@ void srt::CUDT::releaseSynch()

7489 7505

// [[using locked(m_RcvBufferLock)]];

7490 7506

int32_t srt::CUDT::ackDataUpTo(int32_t ack)

7491 7507

{

7492 -

int acksize = CSeqNo::seqoff(m_iRcvLastSkipAck, ack);

7508 +

const int acksize = CSeqNo::seqoff(m_iRcvLastSkipAck, ack);

7493 7509 7494 7510

HLOGC(xtlog.Debug, log << "ackDataUpTo: %" << ack << " vs. current %" << m_iRcvLastSkipAck

7495 7511

<< " (signing off " << acksize << " packets)");

@@ -7691,7 +7707,7 @@ void srt::CUDT::sendCtrl(UDTMessageType pkttype, const int32_t* lparam, void* rp

7691 7707

int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)

7692 7708

{

7693 7709

SRT_ASSERT(ctrlpkt.getMsgTimeStamp() != 0);

7694 -

int32_t ack;

7710 +

int32_t ack; // First unacknowledged packet seqnuence number (acknowledge up to ack).

7695 7711

int nbsent = 0;

7696 7712

int local_prevack = 0;

7697 7713

@@ -7881,7 +7897,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)

7881 7897

data[ACKD_RCVLASTACK] = m_iRcvLastAck;

7882 7898

data[ACKD_RTT] = m_iSRTT;

7883 7899

data[ACKD_RTTVAR] = m_iRTTVar;

7884 -

data[ACKD_BUFFERLEFT] = m_pRcvBuffer->getAvailBufSize();

7900 +

data[ACKD_BUFFERLEFT] = getAvailRcvBufferSizeNoLock();

7885 7901

// a minimum flow window of 2 is used, even if buffer is full, to break potential deadlock

7886 7902

if (data[ACKD_BUFFERLEFT] < 2)

7887 7903

data[ACKD_BUFFERLEFT] = 2;

@@ -9666,7 +9682,7 @@ int srt::CUDT::processData(CUnit* in_unit)

9666 9682

continue;

9667 9683

}

9668 9684 9669 -

const int avail_bufsize = m_pRcvBuffer->getAvailBufSize();

9685 +

const int avail_bufsize = getAvailRcvBufferSizeNoLock();

9670 9686

if (offset >= avail_bufsize)

9671 9687

{

9672 9688

// This is already a sequence discrepancy. Probably there could be found

@@ -9773,7 +9789,7 @@ int srt::CUDT::processData(CUnit* in_unit)

9773 9789

LOGC(qrlog.Debug, log << CONID() << "RECEIVED: seq=" << rpkt.m_iSeqNo

9774 9790

<< " offset=" << offset

9775 9791

<< " BUFr=" << avail_bufsize

9776 -

<< " avail=" << m_pRcvBuffer->getAvailBufSize()

9792 +

<< " avail=" << getAvailRcvBufferSizeNoLock()

9777 9793

<< " buffer=(" << m_iRcvLastSkipAck

9778 9794

<< ":" << m_iRcvCurrSeqNo // -1 = size to last index

9779 9795

<< "+" << CSeqNo::incseq(m_iRcvLastSkipAck, m_pRcvBuffer->capacity()-1)

@@ -11123,7 +11139,7 @@ void srt::CUDT::addEPoll(const int eid)

11123 11139

return;

11124 11140 11125 11141

enterCS(m_RecvLock);

11126 -

if (m_pRcvBuffer->isRcvDataReady())

11142 +

if (isRcvBufferReady())

11127 11143

{

11128 11144

uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, true);

11129 11145

}


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4