A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from http://www.ncbi.nlm.nih.gov/IEB/ToolBox/CPP_DOC/doxyhtml/psg__client__transport_8cpp_source.html below:

NCBI C++ ToolKit: src/objtools/pubseq_gateway/client/psg_client_transport.cpp Source File

34 #ifdef HAVE_PSG_CLIENT 44 #include <unordered_map> 45 #include <type_traits> 51 #define __STDC_FORMAT_MACROS 91 NCBI_PARAM_DEF

(

unsigned

, PSG, throttle_by_consecutive_connection_failures, 0);

153  for

(

const auto

& cookie : *m_Cookies) {

154  if

(cookie.GetName() == name) {

164  auto

combine = [](

auto

p,

auto

v) {

return

v.empty()? v : p.Get() +

'='

+

NStr::URLEncode

(v); };

166  auto

admin_cookie = combine(admin_auth_token_name, admin_auth_token.Get());

167  auto

hup_cookie = combine(auth_token_name, auth_token.Get().empty() ? get_auth_token() : auth_token.Get());

169  return

admin_cookie.empty() ? hup_cookie : hup_cookie.empty() ? admin_cookie : admin_cookie +

"; "

+ hup_cookie;

174  auto value

= TPSG_RequestTimeout::GetDefault();

176  if

(

value

< io_timer_period) {

178  " was increased to the minimum allowed value ('"

<< io_timer_period <<

"')"

);

179  value

= io_timer_period;

182  return static_cast<unsigned>

(

value

/ io_timer_period);

187  auto value

= TPSG_CompetitiveAfter::GetDefault();

188

timeout *= io_timer_period;

190  if

((

value

> 0.0) && (

value

< io_timer_period)) {

192  " was increased to the minimum allowed value ('"

<< io_timer_period <<

"')"

);

193  value

= io_timer_period;

196  if

(

value

>= timeout) {

198  "as it was greater or equal to request timeout ('"

<< timeout <<

"')"

);

199

}

else if

(

value

> 0.0) {

203  return static_cast<unsigned>

(timeout / io_timer_period);

210  if

(!

ip

.empty()) os <<

";IP="

<<

ip

;

211  if

(port) os <<

";PORT="

<< port;

212  if

(m_Params.proxy) os <<

";PROXY="

<< m_Params.proxy;

214  ERR_POST

(Message <<

id

<<

": "

<< address << path <<

";SID="

<< sid <<

";PHID="

<< phid << os.str());

227

os <<

"<BINARY DATA OF "

<< chunk.size() <<

" BYTES>"

;

240  ERR_POST

(Message <<

id

<<

": Retrying ("

<< retries <<

" retries remaining) after "

<<

error

);

253  for

(

const auto

& event : m_Events) {

254  auto ms

= get<0>(event);

255  auto type

= get<1>(event);

256  auto

thread_id = get<2>(event);

257

os << fixed <<

id

<<

'\t'

<<

ms

<<

'\t'

<<

type

<<

'\t'

<< thread_id <<

'\n'

;

260

cout << os.str() << flush;

269  static

constexpr

auto

prefix =

"\trequest\ttype="

;

302  static

constexpr

auto

prefix =

"\treply_item\ttype="

;

345  static

constexpr

auto

prefix =

"\tskipped_blob\treason="

;

374  static

constexpr

auto

prefix =

"\treply_item_status\tstatus="

;

407  static

constexpr

auto

prefix =

"\tmessage\tseverity="

;

445  static

constexpr

auto

prefix =

"\tretries\tevent="

;

465 template

<SPSG_Stats::EGroup group>

470  for

(

auto

& counter :

data

.back()) {

475 template

<SPSG_Stats::EGroup group>

481  const auto

&

g

=

data

[group];

484  for

(

auto i

: TGroup::values) {

485  auto n

=

g

[

static_cast<size_t>

(

i

)].load();

486  if

(

n

)

ERR_POST

(

Note

<< prefix << report << TGroup::prefix << TGroup::ValueName(

i

) <<

"&count="

<<

n

);

492

Apply<SInit>(eRequest, m_Data);

495 template

<

class

TWhat,

class

...

TArgs

>

500  switch

(start_with) {

501  case

eRequest: TWhat::template Func<eRequest> (std::forward<TArgs>(args)...);

502  case

eReplyItem: TWhat::template Func<eReplyItem> (std::forward<TArgs>(args)...);

503  case

eSkippedBlob: TWhat::template Func<eSkippedBlob> (std::forward<TArgs>(args)...);

504  case

eReplyItemStatus: TWhat::template Func<eReplyItemStatus> (std::forward<TArgs>(args)...);

505  case eMessage

: TWhat::template Func<eMessage> (std::forward<TArgs>(args)...);

506  case

eRetries: TWhat::template Func<eRetries> (std::forward<TArgs>(args)...);

510 template

<

class

...

TArgs

>

513

Apply<SReport>(eRequest, m_Data, std::forward<TArgs>(args)...);

517

m_Data(eTimeUntilResend + 1)

538  auto

v =

data

.first.load();

539  auto n

=

data

.second.load();

540  if

(

n

)

ERR_POST

(

Note

<< prefix << report <<

'\t'

<<

GetName

(

i

) <<

"\taverage="

<<

double

(v /

n

) / milli::den);

544 template

<

class

TDataId>

548  static auto

Tuple(

const CPSG_BlobId

&

id

) {

return

tie(

id

.

GetId

(),

id

.GetLastModified()); }

549  static auto

Tuple(

const CPSG_ChunkId

&

id

) {

return

tuple<int, const string&>(

id

.GetId2Chunk(),

id

.GetId2Info()); }

550  bool

operator()(

const

TDataId& lhs,

const

TDataId& rhs)

const

{

return

Tuple(lhs) < Tuple(rhs); }

556  if

(

auto

locked = m_Ids.GetLock()) {

557

total = locked->

size

();

561  for

(

const auto

& data_id : *locked) {

562  auto created

= unique_ids.emplace(data_id, 1);

567  ERR_POST

(

Note

<< prefix << report << data_prefix <<

"\ttotal="

<< total <<

"&unique="

<< unique_ids.

size

());

569  auto

received = m_Received.load();

570  auto

read = m_Read.load();

571  if

(received)

ERR_POST

(

Note

<< prefix << report << data_prefix <<

"_data\treceived="

<< received <<

"&read="

<< read);

575  for

(

const auto

& p : unique_ids) {

576  auto created

= group_by_count.emplace(p.second, 1);

580  for

(

const auto

& p : group_by_count) {

581  ERR_POST

(

Note

<< prefix << report << data_prefix <<

"_retrievals\tnumber="

<< p.first <<

"&unique_ids="

<< p.second);

594  return SecondsToMs

(TPSG_StatsPeriod::GetDefault());

606  const auto

prefix =

"PSG_STATS\t"

;

615  for

(

const auto

& server : *servers_locked) {

616  auto n

= server.stats.load();

617  if

(

n

)

ERR_POST

(

Note

<< prefix << report <<

"\tserver\tname="

<< server.address <<

"&requests_sent="

<<

n

);

624  if

(m && ((min_severity ==

eDiag_Trace

) || ((m.severity !=

eDiag_Trace

) && (m.severity >= min_severity)))) {

634

m_InProgress.store(

true

);

675  const auto

message =

"Protocol error: received less than expected"

;

676  bool

missing =

false

;

678  if

(

auto

items_locked =

items

.GetLock()) {

679  for

(

auto

& item : *items_locked) {

680  if

(item->state.InProgress()) {

681

item.GetLock()->state.AddError(message);

682

item->state.SetComplete();

689  if

(missing || reply_item_locked->expected.Cmp<greater>(reply_item_locked->received)) {

690

reply_item_locked->state.AddError(message);

693

reply_item_locked->state.SetComplete();

702  if

(

auto

items_locked =

items

.GetLock()) {

703  for

(

auto

& item : *items_locked) {

704  if

(item->state.InProgress()) {

705

item.GetLock()->state.AddError(message);

706

item->state.SetComplete();

712

reply_item_locked->state.AddError(message, status);

713

reply_item_locked->state.SetComplete();

723  bool

was_in_progress =

reply_item

->state.InProgress();

725  if

(

auto

new_items_locked =

new_items

.GetLock()) {

726  if

(!new_items_locked->empty()) {

727  auto

rv = new_items_locked->front();

728

new_items_locked->pop_front();

734  if

(!was_in_progress) {

745  items

.GetLock()->clear();

751  auto

guard = m_ExistingGuard.lock();

756

m_ExistingGuard = guard;

763

full_path(std::move(p)),

785  static const string kPrefix

=

"\n\nPSG-Reply-Chunk: "

;

795  if

(++index ==

kPrefix

.size()) {

803  if

(

reply

->raw && !index) {

810  const auto

message =

"Protocol error: prefix mismatch"

;

812  if

(

Retry

(message)) {

816  reply

->reply_item.GetLock()->state.AddError(message);

823  while

(*

data

!=

'\n'

) {

833  const auto

& size_str = args.

GetValue

(

"size"

);

834  const auto size

= size_str.empty() ? 0ul : stoul(size_str);

858

chunk.append(

data

, data_size);

887  return code

.empty() ? optional<int>{} : atoi(

code

.c_str());

898  auto

& reply_item_ts =

reply

->reply_item;

901  if

(

auto

item_locked = reply_item_ts.GetLock()) {

902  if

(

auto

update_result =

UpdateItem

(item_type, *item_locked, args); update_result ==

eRetry503

) {

904

}

else if

(update_result ==

eNewItem

) {

911

reply_item_ts.NotifyOne();

914  if

(

auto

reply_item_locked = reply_item_ts.GetLock()) {

915  auto

& reply_item = *reply_item_locked;

916

++reply_item.received;

918  if

(reply_item.expected.Cmp<

less

>(reply_item.received)) {

919

reply_item.state.AddError(

"Protocol error: received more than expected"

);

923  auto

item_id = args.GetValue(

"item_id"

);

925  bool

to_create = !item_by_id;

928  if

(

auto

items_locked =

reply

->items.GetLock()) {

929

items_locked->emplace_back();

930

item_by_id = &items_locked->back();

934  if

(

auto

item_locked = item_by_id->GetLock()) {

935  auto

update_result =

UpdateItem

(item_type, *item_locked, args);

942

item_locked->args = std::move(args);

949  reply

->new_items.GetLock()->emplace_back(item_by_id);

952

reply_item_ts.NotifyOne();

956

item_by_id->NotifyOne();

959  reply

->queue->NotifyOne();

976  auto

n_chunks = args.

GetValue

(

"n_chunks"

);

978  if

(!n_chunks.empty()) {

982

item.

state

.

AddError

(

"Protocol error: contradicting n_chunks"

);

988  if

(

const auto

status = get_status(); can_retry_503(status,

"Server returned a meta with status 503"

)) {

999  static

atomic_bool reported(

false

);

1001  if

(!reported.exchange(

true

)) {

1002  ERR_POST

(

"Received unknown chunk type: "

<< chunk_type.second.get());

1005  if

(TPSG_FailOnUnknownChunks::GetDefault()) {

1006

item.

state

.

AddError

(

"Protocol error: unknown chunk type '"

+ chunk_type.second +

'\''

);

1018

}

else if

(

const auto

status = get_status(); can_retry_503(status, chunk.c_str())) {

1027  auto

blob_chunk = args.

GetValue

(

"blob_chunk"

);

1028  auto

index = blob_chunk.empty() ? 0 : stoul(blob_chunk);

1041  if

(item.

chunks

.size() <= index) item.

chunks

.resize(index + 1);

1043

item.

chunks

[index] = std::move(chunk);

1049

item.

state

.

AddError

(

"Protocol error: received more than expected"

);

1053  reply

->reply_item.GetLock()->state.AddError(

"Protocol error: received more than expected"

);

1069  if

(

auto

tci = TUvNgHttp2_TestIdentity::GetDefault(); tci.empty()) {

1071

}

else if

(tci.starts_with(

"peer_id_"

)) {

1080  static const string

ncbi_peer_id(

Init

());

1081  return

ncbi_peer_id;

1086 #define HTTP_STATUS_HEADER ":status" 1091 template

<

class

... TNgHttp2Cbs>

1098

TPSG_Https::GetDefault(),

1100

std::forward<TNgHttp2Cbs>(callbacks)...,

1105

{

":method"

,

"GET"

},

1106

{

":scheme"

, TPSG_Https::GetDefault() ?

"https"

:

"http"

},

1107

{

":authority"

, m_Authority },

1111

{

"http_ncbi_sid"

},

1112

{

"http_ncbi_phid"

},

1114

{

"x-forwarded-for"

}

1126  if

(

auto

[processor_id, req] = it->second.Get(); req) {

1127  auto result

= req->OnReplyData(processor_id, (

const char

*)

data

,

len

);

1130

it->second.ResetTime();

1139

req->reply->SetComplete();

1156  reply

->debug_printout << retries <<

error

<< endl;

1179  m_Buffer

.

args

=

"item_id=1&item_type=unknown&chunk_type=data_and_meta&n_chunks=1"

s;

1181  m_Buffer

.

args

=

"item_id=0&item_type=reply&chunk_type=meta&n_chunks=2"

s;

1188  auto

context_guard = req->context.Set();

1189  auto

rv = req->Fail(processor_id,

error

, refused_stream);

1195

req->reply->debug_printout.id <<

"', "

<<

error

);

1206  if

(

auto

[processor_id, req] = it->second.Get(); req) {

1207  auto

context_guard = req->context.Set();

1208  auto

& debug_printout = req->reply->debug_printout;

1209

debug_printout << error_code << endl;

1215  if

(

RetryFail

(processor_id, req,

error

, error_code == NGHTTP2_REFUSED_STREAM)) {

1219  if

(req->reply->raw) {

1223

req->OnReplyDone(processor_id)->SetComplete();

1226

debug_printout.id <<

"' successfully"

);

1239  if

((frame->hd.type == NGHTTP2_HEADERS) && (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) &&

1242  auto

stream_id = frame->hd.stream_id;

1243  auto

status_str =

reinterpret_cast<const char

*

>

(

value

);

1252  if

(

auto

[processor_id, req] = it->second.Get(); req) {

1254

req->OnReplyDone(processor_id)->SetFailed(

error

, status);

1269  auto

context_guard = req->context.Set();

1272  const auto

& path = req->full_path;

1273  const auto

& session_id =

context

.GetSessionID();

1274  const auto

& sub_hit_id =

context

.GetNextSubHitID();

1276  const auto

& client_ip =

context

.GetClientIP();

1284  if

(!client_ip.empty()) {

1292  if

(stream_id < 0) {

1310 template

<

class

TOnRetry,

class

TOnFail>

1313  auto

[processor_id, req] =

Get

();

1323  if

(req->Retry(

error

)) {

1331

on_fail(processor_id, req);

1344  auto

on_fail = [&](

auto

processor_id,

auto

req) {

Fail

(processor_id, req,

error

); };

1347  if

(it->second.CheckExpiration(

m_Params

,

error

, on_retry, on_fail)) {

1357  bool

some_requests_failed =

false

;

1360  if

(

auto

[processor_id, req] = pair.second.Get(); req) {

1362

some_requests_failed =

true

;

1367  if

(some_requests_failed) {

1376  switch

(frame->hd.type) {

1377  case

NGHTTP2_DATA:

return "DATA"

sv;

1378  case

NGHTTP2_HEADERS:

return "HEADERS"

sv;

1379  case

NGHTTP2_PRIORITY:

return "PRIORITY"

sv;

1380  case

NGHTTP2_RST_STREAM:

return "RST_STREAM"

sv;

1381  case

NGHTTP2_SETTINGS:

return "SETTINGS"

sv;

1382  case

NGHTTP2_PUSH_PROMISE:

return "PUSH_PROMISE"

sv;

1383  case

NGHTTP2_PING:

return "PING"

sv;

1384  case

NGHTTP2_GOAWAY:

return "GOAWAY"

sv;

1385  case

NGHTTP2_WINDOW_UPDATE:

return "WINDOW_UPDATE"

sv;

1386  case

NGHTTP2_CONTINUATION:

return "CONTINUATION"

sv;

1387  case

NGHTTP2_ALTSVC:

return "ALTSVC"

sv;

1388  case

NGHTTP2_ORIGIN:

return "ORIGIN"

sv;

1399  auto

it =

m_Requests

.find(frame->hd.stream_id);

1400  auto id

= it !=

m_Requests

.end() ? it->second.GetId() :

"*"

sv;

1410  if

(error_rate.empty())

return

;

1412  string

numerator_str, denominator_str;

1414  if

(!

NStr::SplitInTwo

(error_rate,

"/"

, numerator_str, denominator_str))

return

;

1421  if

(

n

> 0)

numerator

=

static_cast<size_t>

(

n

);

1443

m_Stats(std::move(p)),

1445

m_Timer(this, s_OnTimer, Configured(), 0)

1465  auto

stats_locked =

m_Stats

.GetLock();

1486  "' reached the maximum number of failures in a row ("

<<

params

.

max_failures

<<

')'

);

1500  ERR_POST

(

Warning

<<

"Server '"

<< address <<

"' is considered bad/overloaded (" 1516

threshold_reg.first.reset();

1526  for

(

auto

& server : m_Sessions) {

1527  for

(

auto

& session : server.sessions) {

1535  auto

servers_locked = m_Servers.GetLock();

1536  auto

& servers = *servers_locked;

1538  for

(

auto

& server : servers) {

1539

server.throttling.StartClose();

1547  auto

servers_locked = m_Servers.GetLock();

1548  auto

& servers = *servers_locked;

1550  for

(

auto

& server : servers) {

1551

server.throttling.FinishClose();

1558  auto

servers_locked = m_Servers.GetLock();

1559  auto

& servers = *servers_locked;

1562  const auto

servers_size = m_Servers->size();

1563  const auto

sessions_size = m_Sessions.size();

1565  _ASSERT

(servers_size >= sessions_size);

1567  for

(

auto

new_servers = servers_size - sessions_size; new_servers; --new_servers) {

1568  auto

& server = servers[servers_size - new_servers];

1569

m_Sessions.emplace_back().sessions.emplace_back(server, m_Params, m_Queue, handle->loop);

1570  PSG_IO_TRACE

(

"Session for server '"

<< server.address <<

"' was added"

);

1576  auto

available_servers = 0;

1578  for

(

auto

& server : m_Sessions) {

1579

server.current_rate = server->rate.load();

1581  if

(server.current_rate) {

1582

++available_servers;

1586  auto

remaining_submits = m_Params.max_concurrent_submits.Get();

1588  auto i

= m_Sessions.begin();

1589  auto

[timed_req, processor_id, req] = decltype(m_Queue.Pop()){};

1590  auto

d = m_Random.first;

1591  auto

request_rate = 0.0;

1592  auto

target_rate = 0.0;

1596  auto

get_request = [&, &timed_req = timed_req, &processor_id = processor_id, &req = req]() {

1597

tie(timed_req, processor_id, req) = m_Queue.Pop();

1605

target_rate = d(m_Random.second);

1606  _DEBUG_CODE

(req_id = req->reply->debug_printout.id;);

1607  PSG_IO_TRACE

(

"Ready to submit request '"

<< req_id <<

'\''

);

1611  auto

next_server = [&]() {

1612  if

(++

i

== m_Sessions.end())

i

= m_Sessions.begin();

1615  auto

ignore_server = [&]() {

1616  if

(--available_servers) {

1617

d = uniform_real_distribution<>(0.0, d.max() -

i

->current_rate);

1618  i

->current_rate = 0.0;

1623  auto

find_session = [&]() {

1624  auto

s =

i

->sessions.begin();

1625  for

(; (s !=

i

->sessions.end()) && s->IsFull(); ++s);

1626  return

make_pair(s !=

i

->sessions.end(), s);

1629  while

(available_servers && remaining_submits) {

1631  if

(!req && !get_request()) {

1636  for

(; request_rate +=

i

->current_rate, request_rate < target_rate; next_server());

1638  auto

& server = *

i

;

1641  if

(server->throttling.Active()) {

1642  PSG_IO_TRACE

(

"Server '"

<< server->address <<

"' is throttled, ignoring"

);

1646

}

else if

(server->available_streams <= 0) {

1647  PSG_IO_TRACE

(

"Server '"

<< server->address <<

"' is at request limit, ignoring"

);

1651

}

else if

(

auto

[found, session] = find_session(); !found) {

1652  PSG_IO_TRACE

(

"Server '"

<< server->address <<

"' has no sessions available, ignoring"

);

1656

}

else if

(!session->CanProcessRequest(req)) {

1657  PSG_IO_TRACE

(

"Server '"

<< session->GetId() <<

"' already working/worked on the request, trying to find another one"

);

1659

req->submitted_by.Reset();

1663

}

else if

(!session->ProcessRequest(*std::move(timed_req), processor_id, std::move(req))) {

1664  PSG_IO_TRACE

(

"Server '"

<< session->GetId() <<

"' failed to get request '"

<< req_id <<

"' with rate = "

<< target_rate);

1669  PSG_IO_TRACE

(

"Server '"

<< session->GetId() <<

"' got request '"

<< req_id <<

"' with rate = "

<< target_rate);

1670

--remaining_submits;

1674  if

(session->IsFull() && (distance(session, server.sessions.end()) == 1)) {

1678  if

(server.sessions.size() >= max_sessions) {

1679  PSG_IO_TRACE

(

"Server '"

<< server->address <<

"' reached session limit"

);

1682

server.sessions.emplace_back(*server, m_Params, m_Queue, handle->loop);

1683  PSG_IO_TRACE

(

"Additional session for server '"

<< server->address <<

"' was added"

);

1691

m_Queue.Emplace(*std::move(timed_req));

1694  if

(!remaining_submits) {

1695  PSG_IO_TRACE

(

"Max concurrent submits reached, submitted: "

<< m_Params.max_concurrent_submits);

1698  PSG_IO_TRACE

(

"No sessions available [anymore], submitted: "

<< m_Params.max_concurrent_submits - remaining_submits);

1704  const auto

kRegularRate = nextafter(0.009, 1.0);

1705  const auto

kStandbyRate = 0.001;

1707  const auto

& service_name = m_Service.GetServiceName();

1708  auto

discovered = m_Service();

1710  auto

total_preferred_regular_rate = 0.0;

1711  auto

total_preferred_standby_rate = 0.0;

1712  auto

total_regular_rate = 0.0;

1713  auto

total_standby_rate = 0.0;

1716  for

(

auto

& server : discovered) {

1719  if

(server.second >= kRegularRate) {

1720  if

(is_server_preferred) {

1721

total_preferred_regular_rate += server.second;

1724

total_regular_rate += server.second;

1726

}

else if

(server.second >= kStandbyRate) {

1727  if

(is_server_preferred) {

1728

total_preferred_standby_rate += server.second;

1731

total_standby_rate += server.second;

1735  if

(m_NoServers(total_regular_rate || total_standby_rate, SUv_Timer::GetThat<SUv_Timer>(handle))) {

1736  ERR_POST

(

"No servers in service '"

<< service_name <<

'\''

);

1740  const auto

localhost_preference = TPSG_LocalhostPreference::GetDefault();

1741  const auto

total_preferred_standby_percentage = localhost_preference ? 1.0 - 1.0 / localhost_preference : 0.0;

1742  const auto

have_any_regular_servers = total_regular_rate > 0.0;

1743  const auto

have_nonpreferred_regular_servers = total_regular_rate > total_preferred_regular_rate;

1744  const auto

have_nonpreferred_standby_servers = total_standby_rate > total_preferred_standby_rate;

1745  const auto

regular_rate_adjustment =

1746

(localhost_preference <= 1) ||

1747

(total_preferred_regular_rate != 0.0) ||

1748

(total_preferred_standby_rate == 0.0) ||

1749

(!have_nonpreferred_regular_servers && !have_nonpreferred_standby_servers);

1750  auto

rate_total = 0.0;

1753  for

(

auto

& server : discovered) {

1755  const auto

old_rate = server.second;

1757  if

(server.second >= kRegularRate) {

1758  if

(is_server_preferred) {

1759  if

(regular_rate_adjustment) {

1760

server.second *= localhost_preference;

1761

}

else if

(have_nonpreferred_regular_servers) {

1762

server.second = 0.0;

1763

}

else if

(have_nonpreferred_standby_servers) {

1764

server.second = 0.0;

1767  if

(regular_rate_adjustment) {

1769

}

else if

(have_nonpreferred_regular_servers) {

1770

server.second *= (1 - total_preferred_standby_percentage) / total_regular_rate;

1771

}

else if

(have_nonpreferred_standby_servers) {

1772

server.second = 0.0;

1775

}

else if

(server.second >= kStandbyRate) {

1776  if

(is_server_preferred) {

1777  if

(regular_rate_adjustment && have_any_regular_servers) {

1778

server.second = 0.0;

1779

}

else if

(regular_rate_adjustment) {

1780

server.second *= localhost_preference;

1781

}

else if

(have_nonpreferred_regular_servers) {

1782

server.second *= total_preferred_standby_percentage / total_preferred_standby_rate;

1783

}

else if

(have_nonpreferred_standby_servers) {

1784

server.second *= total_preferred_standby_percentage / total_preferred_standby_rate;

1787  if

(regular_rate_adjustment && have_any_regular_servers && (localhost_preference || have_nonpreferred_regular_servers)) {

1788

server.second = 0.0;

1789

}

else if

(regular_rate_adjustment) {

1791

}

else if

(have_nonpreferred_regular_servers) {

1792

server.second = 0.0;

1793

}

else if

(have_nonpreferred_standby_servers) {

1794

server.second *= (1 - total_preferred_standby_percentage) / (total_standby_rate - total_preferred_standby_rate);

1799

rate_total += server.second;

1801  if

(old_rate != server.second) {

1802  PSG_DISCOVERY_TRACE

(

"Rate for '"

<< server.first <<

"' adjusted from "

<< old_rate <<

" to "

<< server.second);

1806  auto

servers_locked = m_Servers.GetLock();

1807  auto

& servers = *servers_locked;

1810  for

(

auto

& server : servers) {

1812  auto

it = find_if(discovered.begin(), discovered.end(), address_same);

1816  PSG_DISCOVERY_TRACE

(

"Server '"

<< server.address <<

"' disabled in service '"

<< service_name <<

'\''

);

1819

server.throttling.Discovered();

1820  auto

rate = it->second / rate_total;

1822  if

(server.rate != rate) {

1825

(server.rate ?

"' updated in service '"

:

"' enabled in service '"

) <<

1826

service_name <<

"' with rate = "

<< rate);

1837  for

(

auto

& server : discovered) {

1839  auto

rate = server.second / rate_total;

1840

servers.emplace_back(server.first, rate, m_Params.max_concurrent_requests_per_server, m_ThrottleParams, handle->loop);

1843

service_name <<

"' with rate = "

<< rate);

1847

m_QueuesRef.SignalAll();

1852  if

(m_Servers->fail_requests) {

1855

CheckRequestExpiration();

1858  for

(

auto

& server : m_Sessions) {

1859  for

(

auto

& session : server.sessions) {

1860

session.CheckRequestExpiration();

1867  auto

queue_locked = m_Queue.GetLockedQueue();

1868

list<SPSG_TimedRequest> retries;

1871  auto

on_retry = [&](

auto

req) { retries.emplace_back(req); m_Queue.Signal(); };

1872  auto

on_fail = [&](

auto

processor_id,

auto

req) { req->Fail(processor_id,

error

); };

1874  for

(

auto

it = queue_locked->begin(); it != queue_locked->end(); ) {

1875  if

(it->CheckExpiration(m_Params,

error

, on_retry, on_fail)) {

1876

it = queue_locked->erase(it);

1882

queue_locked->splice(queue_locked->end(), retries);

1887  auto

queue_locked = m_Queue.GetLockedQueue();

1890  for

(

auto

& timed_req : *queue_locked) {

1891  if

(

auto

[processor_id, req] = timed_req.Get(); req) {

1892  auto

context_guard = req->context.Set();

1893  auto

& debug_printout = req->reply->debug_printout;

1894

debug_printout <<

error

<< endl;

1895

req->OnReplyDone(processor_id)->SetFailed(

error

);

1896  PSG_IO_TRACE

(

"No servers to process request '"

<< debug_printout.id <<

'\''

);

1900

queue_locked->clear();

1905

m_Queue.Init(

this

, &loop, s_OnQueue);

1916

m_RetryDelay(

SecondsToMs

(TPSG_NoServersRetryDelay::GetDefault())),

1917

m_Timeout(

SecondsToMs

((params.request_timeout + params.competitive_after * params.request_retries) * params.io_timer_period)),

1918

m_FailRequests(const_cast<atomic_bool&>(servers->fail_requests))

1935  const auto

timeout_expired = m_Passed >= m_Timeout;

1936

m_FailRequests = timeout_expired;

1940

}

else if

(!timeout_expired) {

1954  if

(TPSG_Stats::GetDefault()) {

1955  return

make_shared<SPSG_Stats>(servers);

1968

m_StartBarrier(

TPSG_NumIo

::GetDefault() + 2),

1969

m_StopBarrier(

TPSG_NumIo

::GetDefault() + 1),

1971

m_RequestCounter(0),

1980

}

catch

(

const

std::system_error& e) {

1981  ERR_POST

(

Fatal

<<

"Failed to create I/O threads: "

<< e.what());

1992  for

(

auto

& io :

m_Io

) {

1999  if

(

m_Io

.size() == 0) {

2004  m_Queues

[idx].Emplace(std::move(req));

@ eEndOfReply

No more items expected in the (overall!) reply.

bool IsSingleServer() const

pair< SSocketAddress, double > TServer

static DLIST_TYPE *DLIST_NAME() first(DLIST_LIST_TYPE *list)

static const char * expected[]

#define _DEBUG_CODE(code)

string GetStringUID(TUID uid=0) const

Return string representation of UID.

CDiagContext & GetDiagContext(void)

Get diag context instance.

static void SetRequestContext(CRequestContext *ctx)

Shortcut to CDiagContextThreadData::GetThreadData().SetRequestContext()

static string GetStdStatusMessage(ECode code)

static CRequestContext & GetRequestContext(void)

Shortcut to CDiagContextThreadData::GetThreadData().GetRequestContext()

#define ERR_POST(message)

Error posting with file, line number information but without error codes.

EDiagSev

Severity level for the posted diagnostics.

@ eDiag_Trace

Trace message.

@ eDiag_Info

Informational message.

@ eDiag_Error

Error message.

@ eDiag_Warning

Warning message.

@ eDiag_Fatal

Fatal error – guarantees exit(or abort)

@ eDiag_Critical

Critical error message.

@ e503_ServiceUnavailable

@ e451_Unavailable_For_Legal_Reasons

void Warning(CExceptionArgs_Base &args)

void Fatal(CExceptionArgs_Base &args)

const CSeq_id & GetId(const CSeq_loc &loc, CScope *scope)

If all CSeq_ids embedded in CSeq_loc refer to the same CBioseq, returns the first CSeq_id found,...

@ eParam_Default

Default flags.

#define END_NCBI_SCOPE

End previously defined NCBI scope.

#define BEGIN_NCBI_SCOPE

Define ncbi namespace.

static CNCBI_IPAddr GetLocalHostAddress(ESwitch reget=eDefault)

Local host address in network byte order (cached for faster retrieval)

static string PrintableString(const CTempString str, TPrintableMode mode=fNewLine_Quote|fNonAscii_Passthru)

Get a printable version of the specified string.

static int StringToInt(const CTempString str, TStringToNumFlags flags=0, int base=10)

Convert string to int.

static string URLDecode(const CTempString str, EUrlDecode flag=eUrlDec_All)

URL-decode string.

static bool SplitInTwo(const CTempString str, const CTempString delim, string &str1, string &str2, TSplitFlags flags=0)

Split a string into two pieces using the specified delimiters.

static string URLEncode(const CTempString str, EUrlEncode flag=eUrlEnc_SkipMarkChars)

URL-encode string.

@ fAllowTrailingSpaces

Ignore trailing whitespace characters.

@ fConvErr_NoThrow

Do not throw an exception on error.

@ fAllowLeadingSpaces

Ignore leading whitespace characters in converted string.

string GetQueryString(EAmpEncoding amp_enc, NStr::EUrlEncode encode) const

Construct and return complete query string.

@ eAmp_Char

Use & to separate arguments.

where both of them are integers Note

NCBI_PARAM_TYPE(PSG, throttle_by_connection_error_rate) TPSG_ThrottleThreshold

NCBI_PARAM_TYPE(PSG, throttle_relaxation_period) TPSG_ThrottlePeriod

const TYPE & Get(const CNamedParameterList *param)

const struct ncbi::grid::netcache::search::fields::SIZE size

const struct ncbi::grid::netcache::search::fields::CREATED created

const GenericPointer< typename T::ValueType > T2 value

int strcmp(const char *str1, const char *str2)

double r(size_t dimension_, const Int4 *score_, const double *prob_, double theta_)

static bool less(const CSeq_feat *A, const CSeq_feat *B)

EPSG_Status

Retrieval result.

@ eSuccess

Successfully retrieved.

@ eInProgress

Retrieval is not finalized yet, more info may come.

@ eForbidden

User is not authorized for the retrieval.

@ eCanceled

Request canceled.

@ eError

An error was encountered while trying to send request or to read and to process the reply.

uint64_t s_GetStatsPeriod()

NCBI_PARAM_ENUM_DEF(EPSG_DebugPrintout, PSG, debug_printout, EPSG_DebugPrintout::eNone)

auto s_GetCode(const string &code)

NCBI_PARAM_DEF(double, PSG, request_timeout, 10.0)

auto s_GetFrameName(const nghttp2_frame *frame)

#define HTTP_STATUS_HEADER

EPSG_StatsCountersRetries

@ ePSG_StatsCountersRetries_Retry

@ ePSG_StatsCountersRetries_Timeout

uint64_t s_GetDiscoveryRepeat(const CServiceDiscovery &service)

EDiagSev s_GetSeverity(const string &severity)

PSG_PARAM_VALUE_DEF_MIN(unsigned, PSG, rd_buf_size, 64 *1024, 1024)

shared_ptr< SPSG_Stats > s_GetStats(SPSG_Servers::TTS &servers)

SPSG_IoCoordinator.

NCBI_PARAM_DEF_EX(string, PSG, service, "PSG2", eParam_Default, NCBI_PSG_SERVICE)

NCBI_PARAM_ENUM_ARRAY(EPSG_DebugPrintout, PSG, debug_printout)

#define PSG_IO_SESSION_TRACE(message)

#define PSG_THROTTLING_TRACE(message)

#define PSG_IO_TRACE(message)

#define PSG_DISCOVERY_TRACE(message)

uint64_t SecondsToMs(double seconds)

Defines CRequestStatus class for NCBI C++ diagnostic API.

static SLJIT_INLINE sljit_ins ms(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)

static SLJIT_INLINE sljit_ins l(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)

void Print(SSocketAddress address, const string &path, const string &sid, const string &phid, const string &ip, SUv_Tcp::TPort port)

int32_t Submit(const nghttp2_nv *nva, size_t nvlen, nghttp2_data_provider *data_prd=nullptr)

void Emplace(TArgs &&... args)

void NotifyOne() volatile

bool WaitUntil(TArgs &&... args) volatile

SNoServers(const SPSG_Params &params, SPSG_Servers::TTS &servers)

bool operator()(bool discovered, SUv_Timer *timer)

void OnShutdown(uv_async_t *)

void OnTimer(uv_timer_t *handle)

SPSG_Servers::TTS & m_Servers

string GetCookie(const string &name) const

bool AddRequest(shared_ptr< SPSG_Request > req, const atomic_bool &stopped, const CDeadline &deadline)

SPSG_Servers::TTS m_Servers

SUv_Barrier m_StartBarrier

SPSG_IoCoordinator(CServiceDiscovery service)

atomic< size_t > m_RequestCounter

SUv_Barrier m_StopBarrier

TPSG_AsyncQueues m_Queues

SPSG_Thread< SPSG_DiscoveryImpl > m_Discovery

vector< unique_ptr< SPSG_Thread< SPSG_IoImpl > > > m_Io

void OnTimer(uv_timer_t *handle)

void OnQueue(uv_async_t *handle)

void CheckRequestExpiration()

void OnShutdown(uv_async_t *handle)

SPSG_IoImpl.

void OnExecute(uv_loop_t &loop)

void AddNewServers(uv_async_t *handle)

int OnStreamClose(nghttp2_session *session, int32_t stream_id, uint32_t error_code)

bool Fail(SPSG_Processor::TId processor_id, shared_ptr< SPSG_Request > req, const SUvNgHttp2_Error &error, bool refused_stream=false)

bool RetryFail(SPSG_Processor::TId processor_id, shared_ptr< SPSG_Request > req, const SUvNgHttp2_Error &error, bool refused_stream=false)

SPSG_AsyncQueue & m_Queue

SPSG_Requests< SPSG_IoSession > m_Requests

SPSG_Submitter::TId GetInternalId() const

SPSG_IoSession(SPSG_Server &s, const SPSG_Params &params, SPSG_AsyncQueue &queue, uv_loop_t *loop, TNgHttp2Cbs &&... callbacks)

SPSG_IoSession.

void CheckRequestExpiration()

bool ProcessRequest(SPSG_TimedRequest timed_req, SPSG_Processor::TId processor_id, shared_ptr< SPSG_Request > req)

int OnHeader(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, size_t namelen, const uint8_t *value, size_t valuelen, uint8_t flags)

void OnReset(SUvNgHttp2_Error error) override

array< SNgHttp2_Header< NGHTTP2_NV_FLAG_NO_COPY_NAME >, eSize > m_Headers

int OnData(nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len)

int OnFrameRecv(nghttp2_session *session, const nghttp2_frame *frame)

static const string & Get()

static TValue GetDefault()

TPSG_RequestsPerIo requests_per_io

string GetCookie(function< string()> get_auth_token)

static unsigned s_GetCompetitiveAfter(double io_timer_period, double timeout)

const unsigned competitive_after

static unsigned s_GetRequestTimeout(double io_timer_period)

TPSG_IoTimerPeriod io_timer_period

const unsigned request_timeout

SPSG_Nullable< size_t > expected

vector< SPSG_Chunk > chunks

static int From(EPSG_Status status)

void SetComplete() volatile

void SetStatus(SStatus status, bool reset) volatile

void AddMessage(string message, EDiagSev severity, optional< int > code)

void AddError(string message, SStatus status=EPSG_Status::eError, EDiagSev severity=eDiag_Error, optional< int > code=nullopt)

deque< SPSG_Message > m_Messages

SPSG_Message GetMessage(EDiagSev min_severity)

shared_ptr< TPSG_Queue > queue

void SetFailed(string message, SState::SStatus status=EPSG_Status::eError)

SThreadSafe< list< SItem::TTS * > > new_items

SThreadSafe< list< SItem::TTS > > items

optional< SItem::TTS * > GetNextItem(CDeadline deadline)

unsigned GetRetries(SPSG_Retries::EType type, bool refused_stream)

unordered_map< string, SPSG_Reply::SItem::TTS * > m_ItemsByID

SPSG_Processor processed_by

bool Retry(const SUvNgHttp2_Error &error, bool refused_stream=false)

EStateResult StateData(const char *&data, size_t &len)

shared_ptr< SPSG_Reply > reply

SPSG_Request(string p, shared_ptr< SPSG_Reply > r, CRef< CRequestContext > c, const SPSG_Params &params)

bool Fail(SPSG_Processor::TId processor_id, const SUvNgHttp2_Error &error, bool refused_stream=false)

EStateResult StatePrefix(const char *&data, size_t &len)

auto & OnReplyDone(SPSG_Processor::TId processor_id)

EUpdateResult UpdateItem(SPSG_Args::EItemType item_type, SPSG_Reply::SItem &item, const SPSG_Args &args)

EStateResult StateArgs(const char *&data, size_t &len)

auto emplace(TArgs &&... args)

SPSG_Throttling throttling

const SSocketAddress address

vector< pair< atomic_uint64_t, atomic_uint > > m_Data

void Report(const char *prefix, unsigned report)

static const char * GetName(EAvgTime avg_time)

static const char * ValueName(type value)

static const char * ValueName(type value)

static const char * ValueName(type value)

static const char * ValueName(type value)

static const char * ValueName(type value)

static const char * ValueName(type value)

static void Func(TData &data)

static void Func(const TData &data, const char *prefix, unsigned report)

void Apply(EGroup start_with, TArgs &&... args)

void Report(TArgs &&... args)

vector< vector< atomic_uint > > TData

void Report(const char *prefix, unsigned report, const char *name)

SData< CPSG_BlobId > m_Blobs

void Report(const char *prefix, unsigned report)

SThreadSafe< unordered_set< string > > m_TSEs

SData< CPSG_ChunkId > m_Chunks

SPSG_Servers::TTS & m_Servers

SPSG_Stats(SPSG_Servers::TTS &servers)

SThreshold(string error_rate)

SPSG_ThrottleParams.

constexpr static size_t kMaxDenominator

TPSG_ThrottleUntilDiscovery until_discovery

const volatile uint64_t period

TPSG_ThrottleMaxFailures max_failures

pair< bitset< SPSG_ThrottleParams::SThreshold::kMaxDenominator >, size_t > threshold_reg

bool Adjust(const SSocketAddress &address, bool result)

SPSG_ThrottleParams params

SPSG_Throttling(const SSocketAddress &address, SPSG_ThrottleParams p, uv_loop_t *l)

SPSG_Throttling.

atomic< EThrottling > m_Active

SThreadSafe< SStats > m_Stats

static void s_OnSignal(uv_async_t *handle)

const SSocketAddress & m_Address

bool CheckExpiration(const SPSG_Params &params, const SUvNgHttp2_Error &error, TOnRetry on_retry, TOnFail on_fail)

static SUvNgHttp2_Error FromNgHttp2(T e, const char *w)

static const char * NgHttp2Str(T e)

SNgHttp2_Session m_Session

void Reset(SUvNgHttp2_Error error, SUv_Tcp::ECloseType close_type=SUv_Tcp::eCloseReset, bool shutdown=false)

pair< string, string > TCred

static const string & Get()

void Init(void *d, uv_loop_t *l, uv_async_cb cb)

TPort GetLocalPort() const

uint64_t GetDefaultRepeat() const

void SetRepeat(uint64_t r)

int g(Seg_Gsm *spe, Seq_Mtf *psm, Thd_Gsm *tdg)

static CS_CONTEXT * context


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4