@@ -2195,6 +2195,193 @@ static bool isValidSeqno(int32_t iBaseSeqno, int32_t iPktSeqno)
2195
2195
return false;
2196
2196
}
2197
2197
2198
+
#ifdef ENABLE_NEW_RCVBUFFER
2199
+
int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
2200
+
{
2201
+
// First, acquire GlobControlLock to make sure all member sockets still exist
2202
+
enterCS(m_Global.m_GlobControlLock);
2203
+
ScopedLock guard(m_GroupLock);
2204
+
2205
+
if (m_bClosing)
2206
+
{
2207
+
// The group could be set closing in the meantime, but if
2208
+
// this is only about to be set by another thread, this thread
2209
+
// must fist wait for being able to acquire this lock.
2210
+
// The group will not be deleted now because it is added usage counter
2211
+
// by this call, but will be released once it exits.
2212
+
leaveCS(m_Global.m_GlobControlLock);
2213
+
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
2214
+
}
2215
+
2216
+
// Now, still under lock, check if all sockets still can be dispatched
2217
+
send_CheckValidSockets();
2218
+
leaveCS(m_Global.m_GlobControlLock);
2219
+
2220
+
if (m_bClosing)
2221
+
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
2222
+
2223
+
// Later iteration over it might be less efficient than
2224
+
// by vector, but we'll also often try to check a single id
2225
+
// if it was ever seen broken, so that it's skipped.
2226
+
set<CUDTSocket*> broken;
2227
+
2228
+
for (;;)
2229
+
{
2230
+
if (!m_bOpened || !m_bConnected)
2231
+
{
2232
+
LOGC(grlog.Error,
2233
+
log << boolalpha << "grp/recv: $" << id() << ": ABANDONING: opened=" << m_bOpened
2234
+
<< " connected=" << m_bConnected);
2235
+
throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
2236
+
}
2237
+
2238
+
vector<CUDTSocket*> aliveMembers;
2239
+
recv_CollectAliveAndBroken(aliveMembers, broken);
2240
+
if (aliveMembers.empty())
2241
+
{
2242
+
LOGC(grlog.Error, log << "grp/recv: ALL LINKS BROKEN, ABANDONING.");
2243
+
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);
2244
+
throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
2245
+
}
2246
+
2247
+
vector<CUDTSocket*> readySockets;
2248
+
if (m_bSynRecving)
2249
+
readySockets = recv_WaitForReadReady(aliveMembers, broken);
2250
+
else
2251
+
readySockets = aliveMembers;
2252
+
2253
+
if (m_bClosing)
2254
+
{
2255
+
HLOGC(grlog.Debug, log << "grp/recv: $" << id() << ": GROUP CLOSED, ABANDONING.");
2256
+
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
2257
+
}
2258
+
2259
+
// Find the first readable packet among all member sockets.
2260
+
CUDTSocket* socketToRead = NULL;
2261
+
CRcvBufferNew::PacketInfo infoToRead = {-1, false, time_point()};
2262
+
for (vector<CUDTSocket*>::const_iterator si = readySockets.begin(); si != readySockets.end(); ++si)
2263
+
{
2264
+
CUDTSocket* ps = *si;
2265
+
2266
+
ScopedLock lg(ps->core().m_RcvBufferLock);
2267
+
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE)
2268
+
{
2269
+
// Drop here to make sure the getFirstReadablePacketInfo() below return fresher packet.
2270
+
int cnt = ps->core().dropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo));
2271
+
if (cnt > 0)
2272
+
{
2273
+
HLOGC(grlog.Debug,
2274
+
log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": dropped " << cnt
2275
+
<< " packets before reading: m_RcvBaseSeqNo=" << m_RcvBaseSeqNo);
2276
+
}
2277
+
}
2278
+
2279
+
const CRcvBufferNew::PacketInfo info =
2280
+
ps->core().m_pRcvBuffer->getFirstReadablePacketInfo(steady_clock::now());
2281
+
if (info.seqno == SRT_SEQNO_NONE)
2282
+
{
2283
+
HLOGC(grlog.Debug, log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": Nothing to read.");
2284
+
continue;
2285
+
}
2286
+
// We need to qualify the sequence, just for a case.
2287
+
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE && !isValidSeqno(m_RcvBaseSeqNo, info.seqno))
2288
+
{
2289
+
LOGC(grlog.Error,
2290
+
log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": SEQUENCE DISCREPANCY: base=%"
2291
+
<< m_RcvBaseSeqNo << " vs pkt=%" << info.seqno << ", setting ESECFAIL");
2292
+
ps->core().m_bBroken = true;
2293
+
broken.insert(ps);
2294
+
continue;
2295
+
}
2296
+
if (socketToRead == NULL || CSeqNo::seqcmp(info.seqno, infoToRead.seqno) < 0)
2297
+
{
2298
+
socketToRead = ps;
2299
+
infoToRead = info;
2300
+
}
2301
+
}
2302
+
2303
+
if (socketToRead == NULL)
2304
+
{
2305
+
if (m_bSynRecving)
2306
+
{
2307
+
HLOGC(grlog.Debug,
2308
+
log << "grp/recv: $" << id() << ": No links reported any fresher packet, re-polling.");
2309
+
continue;
2310
+
}
2311
+
else
2312
+
{
2313
+
HLOGC(grlog.Debug,
2314
+
log << "grp/recv: $" << id() << ": No links reported any fresher packet, clearing readiness.");
2315
+
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);
2316
+
throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0);
2317
+
}
2318
+
}
2319
+
else
2320
+
{
2321
+
HLOGC(grlog.Debug,
2322
+
log << "grp/recv: $" << id() << ": Found first readable packet from @" << socketToRead->m_SocketID
2323
+
<< ": seq=" << infoToRead.seqno << " gap=" << infoToRead.seq_gap
2324
+
<< " time=" << FormatTime(infoToRead.tsbpd_time));
2325
+
}
2326
+
2327
+
const int res = socketToRead->core().receiveMessage((buf), len, (w_mc), CUDTUnited::ERH_RETURN);
2328
+
HLOGC(grlog.Debug,
2329
+
log << "grp/recv: $" << id() << ": @" << socketToRead->m_SocketID << ": Extracted data with %"
2330
+
<< w_mc.pktseq << " #" << w_mc.msgno << ": " << (res <= 0 ? "(NOTHING)" : BufferStamp(buf, res)));
2331
+
if (res == 0)
2332
+
{
2333
+
LOGC(grlog.Warn,
2334
+
log << "grp/recv: $" << id() << ": @" << socketToRead->m_SocketID << ": Retrying next socket...");
2335
+
// This socket will not be socketToRead in the next turn because receiveMessage() return 0 here.
2336
+
continue;
2337
+
}
2338
+
if (res == SRT_ERROR)
2339
+
{
2340
+
LOGC(grlog.Warn,
2341
+
log << "grp/recv: $" << id() << ": @" << socketToRead->m_SocketID << ": " << srt_getlasterror_str()
2342
+
<< ". Retrying next socket...");
2343
+
broken.insert(socketToRead);
2344
+
continue;
2345
+
}
2346
+
fillGroupData((w_mc), w_mc);
2347
+
2348
+
HLOGC(grlog.Debug,
2349
+
log << "grp/recv: $" << id() << ": Update m_RcvBaseSeqNo: %" << m_RcvBaseSeqNo << " -> %" << w_mc.pktseq);
2350
+
m_RcvBaseSeqNo = w_mc.pktseq;
2351
+
2352
+
// Update stats as per delivery
2353
+
m_stats.recv.count(res);
2354
+
updateAvgPayloadSize(res);
2355
+
2356
+
for (vector<CUDTSocket*>::const_iterator si = aliveMembers.begin(); si != aliveMembers.end(); ++si)
2357
+
{
2358
+
CUDTSocket* ps = *si;
2359
+
ScopedLock lg(ps->core().m_RcvBufferLock);
2360
+
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE)
2361
+
{
2362
+
int cnt = ps->core().dropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo));
2363
+
if (cnt > 0)
2364
+
{
2365
+
HLOGC(grlog.Debug,
2366
+
log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": dropped " << cnt
2367
+
<< " packets after reading: m_RcvBaseSeqNo=" << m_RcvBaseSeqNo);
2368
+
}
2369
+
}
2370
+
}
2371
+
for (vector<CUDTSocket*>::const_iterator si = aliveMembers.begin(); si != aliveMembers.end(); ++si)
2372
+
{
2373
+
CUDTSocket* ps = *si;
2374
+
if (!ps->core().isRcvBufferReady())
2375
+
m_Global.m_EPoll.update_events(ps->m_SocketID, ps->core().m_sPollID, SRT_EPOLL_IN, false);
2376
+
}
2377
+
2378
+
return res;
2379
+
}
2380
+
LOGC(grlog.Error, log << "grp/recv: UNEXPECTED RUN PATH, ABANDONING.");
2381
+
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);
2382
+
throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0);
2383
+
}
2384
+
#else
2198
2385
// The "app reader" version of the reading function.
2199
2386
// This reads the packets from every socket treating them as independent
2200
2387
// and prepared to work with the application. Then packets are sorted out
@@ -2731,6 +2918,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
2731
2918
}
2732
2919
}
2733
2920
}
2921
+
#endif
2734
2922
2735
2923
// [[using locked(m_GroupLock)]]
2736
2924
CUDTGroup::ReadPos* CUDTGroup::checkPacketAhead()
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4