A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://github.com/Haivision/srt/commit/8f68f613c5546aca16b301ab0dceae6db28bb6c3 below:

[core] refactor Group::recv() base on new rcv buffer to support messa… · Haivision/srt@8f68f61 · GitHub

@@ -2195,6 +2195,193 @@ static bool isValidSeqno(int32_t iBaseSeqno, int32_t iPktSeqno)

2195 2195

return false;

2196 2196

}

2197 2197 2198 +

#ifdef ENABLE_NEW_RCVBUFFER

2199 +

int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)

2200 +

{

2201 +

// First, acquire GlobControlLock to make sure all member sockets still exist

2202 +

enterCS(m_Global.m_GlobControlLock);

2203 +

ScopedLock guard(m_GroupLock);

2204 + 2205 +

if (m_bClosing)

2206 +

{

2207 +

// The group could be set closing in the meantime, but if

2208 +

// this is only about to be set by another thread, this thread

2209 +

// must fist wait for being able to acquire this lock.

2210 +

// The group will not be deleted now because it is added usage counter

2211 +

// by this call, but will be released once it exits.

2212 +

leaveCS(m_Global.m_GlobControlLock);

2213 +

throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);

2214 +

}

2215 + 2216 +

// Now, still under lock, check if all sockets still can be dispatched

2217 +

send_CheckValidSockets();

2218 +

leaveCS(m_Global.m_GlobControlLock);

2219 + 2220 +

if (m_bClosing)

2221 +

throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);

2222 + 2223 +

// Later iteration over it might be less efficient than

2224 +

// by vector, but we'll also often try to check a single id

2225 +

// if it was ever seen broken, so that it's skipped.

2226 +

set<CUDTSocket*> broken;

2227 + 2228 +

for (;;)

2229 +

{

2230 +

if (!m_bOpened || !m_bConnected)

2231 +

{

2232 +

LOGC(grlog.Error,

2233 +

log << boolalpha << "grp/recv: $" << id() << ": ABANDONING: opened=" << m_bOpened

2234 +

<< " connected=" << m_bConnected);

2235 +

throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);

2236 +

}

2237 + 2238 +

vector<CUDTSocket*> aliveMembers;

2239 +

recv_CollectAliveAndBroken(aliveMembers, broken);

2240 +

if (aliveMembers.empty())

2241 +

{

2242 +

LOGC(grlog.Error, log << "grp/recv: ALL LINKS BROKEN, ABANDONING.");

2243 +

m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);

2244 +

throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);

2245 +

}

2246 + 2247 +

vector<CUDTSocket*> readySockets;

2248 +

if (m_bSynRecving)

2249 +

readySockets = recv_WaitForReadReady(aliveMembers, broken);

2250 +

else

2251 +

readySockets = aliveMembers;

2252 + 2253 +

if (m_bClosing)

2254 +

{

2255 +

HLOGC(grlog.Debug, log << "grp/recv: $" << id() << ": GROUP CLOSED, ABANDONING.");

2256 +

throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);

2257 +

}

2258 + 2259 +

// Find the first readable packet among all member sockets.

2260 +

CUDTSocket* socketToRead = NULL;

2261 +

CRcvBufferNew::PacketInfo infoToRead = {-1, false, time_point()};

2262 +

for (vector<CUDTSocket*>::const_iterator si = readySockets.begin(); si != readySockets.end(); ++si)

2263 +

{

2264 +

CUDTSocket* ps = *si;

2265 + 2266 +

ScopedLock lg(ps->core().m_RcvBufferLock);

2267 +

if (m_RcvBaseSeqNo != SRT_SEQNO_NONE)

2268 +

{

2269 +

// Drop here to make sure the getFirstReadablePacketInfo() below return fresher packet.

2270 +

int cnt = ps->core().dropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo));

2271 +

if (cnt > 0)

2272 +

{

2273 +

HLOGC(grlog.Debug,

2274 +

log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": dropped " << cnt

2275 +

<< " packets before reading: m_RcvBaseSeqNo=" << m_RcvBaseSeqNo);

2276 +

}

2277 +

}

2278 + 2279 +

const CRcvBufferNew::PacketInfo info =

2280 +

ps->core().m_pRcvBuffer->getFirstReadablePacketInfo(steady_clock::now());

2281 +

if (info.seqno == SRT_SEQNO_NONE)

2282 +

{

2283 +

HLOGC(grlog.Debug, log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": Nothing to read.");

2284 +

continue;

2285 +

}

2286 +

// We need to qualify the sequence, just for a case.

2287 +

if (m_RcvBaseSeqNo != SRT_SEQNO_NONE && !isValidSeqno(m_RcvBaseSeqNo, info.seqno))

2288 +

{

2289 +

LOGC(grlog.Error,

2290 +

log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": SEQUENCE DISCREPANCY: base=%"

2291 +

<< m_RcvBaseSeqNo << " vs pkt=%" << info.seqno << ", setting ESECFAIL");

2292 +

ps->core().m_bBroken = true;

2293 +

broken.insert(ps);

2294 +

continue;

2295 +

}

2296 +

if (socketToRead == NULL || CSeqNo::seqcmp(info.seqno, infoToRead.seqno) < 0)

2297 +

{

2298 +

socketToRead = ps;

2299 +

infoToRead = info;

2300 +

}

2301 +

}

2302 + 2303 +

if (socketToRead == NULL)

2304 +

{

2305 +

if (m_bSynRecving)

2306 +

{

2307 +

HLOGC(grlog.Debug,

2308 +

log << "grp/recv: $" << id() << ": No links reported any fresher packet, re-polling.");

2309 +

continue;

2310 +

}

2311 +

else

2312 +

{

2313 +

HLOGC(grlog.Debug,

2314 +

log << "grp/recv: $" << id() << ": No links reported any fresher packet, clearing readiness.");

2315 +

m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);

2316 +

throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0);

2317 +

}

2318 +

}

2319 +

else

2320 +

{

2321 +

HLOGC(grlog.Debug,

2322 +

log << "grp/recv: $" << id() << ": Found first readable packet from @" << socketToRead->m_SocketID

2323 +

<< ": seq=" << infoToRead.seqno << " gap=" << infoToRead.seq_gap

2324 +

<< " time=" << FormatTime(infoToRead.tsbpd_time));

2325 +

}

2326 + 2327 +

const int res = socketToRead->core().receiveMessage((buf), len, (w_mc), CUDTUnited::ERH_RETURN);

2328 +

HLOGC(grlog.Debug,

2329 +

log << "grp/recv: $" << id() << ": @" << socketToRead->m_SocketID << ": Extracted data with %"

2330 +

<< w_mc.pktseq << " #" << w_mc.msgno << ": " << (res <= 0 ? "(NOTHING)" : BufferStamp(buf, res)));

2331 +

if (res == 0)

2332 +

{

2333 +

LOGC(grlog.Warn,

2334 +

log << "grp/recv: $" << id() << ": @" << socketToRead->m_SocketID << ": Retrying next socket...");

2335 +

// This socket will not be socketToRead in the next turn because receiveMessage() return 0 here.

2336 +

continue;

2337 +

}

2338 +

if (res == SRT_ERROR)

2339 +

{

2340 +

LOGC(grlog.Warn,

2341 +

log << "grp/recv: $" << id() << ": @" << socketToRead->m_SocketID << ": " << srt_getlasterror_str()

2342 +

<< ". Retrying next socket...");

2343 +

broken.insert(socketToRead);

2344 +

continue;

2345 +

}

2346 +

fillGroupData((w_mc), w_mc);

2347 + 2348 +

HLOGC(grlog.Debug,

2349 +

log << "grp/recv: $" << id() << ": Update m_RcvBaseSeqNo: %" << m_RcvBaseSeqNo << " -> %" << w_mc.pktseq);

2350 +

m_RcvBaseSeqNo = w_mc.pktseq;

2351 + 2352 +

// Update stats as per delivery

2353 +

m_stats.recv.count(res);

2354 +

updateAvgPayloadSize(res);

2355 + 2356 +

for (vector<CUDTSocket*>::const_iterator si = aliveMembers.begin(); si != aliveMembers.end(); ++si)

2357 +

{

2358 +

CUDTSocket* ps = *si;

2359 +

ScopedLock lg(ps->core().m_RcvBufferLock);

2360 +

if (m_RcvBaseSeqNo != SRT_SEQNO_NONE)

2361 +

{

2362 +

int cnt = ps->core().dropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo));

2363 +

if (cnt > 0)

2364 +

{

2365 +

HLOGC(grlog.Debug,

2366 +

log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": dropped " << cnt

2367 +

<< " packets after reading: m_RcvBaseSeqNo=" << m_RcvBaseSeqNo);

2368 +

}

2369 +

}

2370 +

}

2371 +

for (vector<CUDTSocket*>::const_iterator si = aliveMembers.begin(); si != aliveMembers.end(); ++si)

2372 +

{

2373 +

CUDTSocket* ps = *si;

2374 +

if (!ps->core().isRcvBufferReady())

2375 +

m_Global.m_EPoll.update_events(ps->m_SocketID, ps->core().m_sPollID, SRT_EPOLL_IN, false);

2376 +

}

2377 + 2378 +

return res;

2379 +

}

2380 +

LOGC(grlog.Error, log << "grp/recv: UNEXPECTED RUN PATH, ABANDONING.");

2381 +

m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);

2382 +

throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0);

2383 +

}

2384 +

#else

2198 2385

// The "app reader" version of the reading function.

2199 2386

// This reads the packets from every socket treating them as independent

2200 2387

// and prepared to work with the application. Then packets are sorted out

@@ -2731,6 +2918,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)

2731 2918

}

2732 2919

}

2733 2920

}

2921 +

#endif

2734 2922 2735 2923

// [[using locked(m_GroupLock)]]

2736 2924

CUDTGroup::ReadPos* CUDTGroup::checkPacketAhead()


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4