A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from http://www.ncbi.nlm.nih.gov/IEB/ToolBox/CPP_DOC/doxyhtml/psgs__dispatcher_8cpp_source.html below:

NCBI C++ ToolKit: src/app/pubseq_gateway/server/psgs_dispatcher.cpp Source File

50  size_t

request_id = (size_t)(handle->data);

51

app->GetProcessorDispatcher()->OnRequestTimer(request_id);

58  size_t

request_id = (size_t)(handle->data);

60

app->GetProcessorDispatcher()->OnRequestTimerClose(request_id);

67  size_t

request_id = (size_t)(user_data);

68

app->GetProcessorDispatcher()->EraseProcessorGroup(request_id);

78  "has been reached. Please increase the MAX_PROCESSOR_GROUPS " 84  string

processor_group_name = processor->GetGroupName();

88

processor_group_name);

93  "The group '"

<< processor_group_name <<

"' is tried to be " 94  "registered more than once. Exiting."

);

99  size_t

limit = app->GetProcessorMaxConcurrency(processor_group_name);

127

ret[

proc

->GetGroupName()] = index;

136

shared_ptr<CPSGS_Reply> reply)

141  if

(

proc

->CanProcess(request, reply)) {

142

ret.push_back(

proc

->GetName());

143  if

(request->NeedTrace()) {

145

reply->SendTrace(

proc

->GetName() +

146  " processor reports it can process request"

,

147

request->GetStartTimestamp(),

false

);

150  if

(request->NeedTrace()) {

152

reply->SendTrace(

proc

->GetName() +

153  " processor reports it cannot process request"

,

154

request->GetStartTimestamp(),

false

);

160  string msg

=

"No matching processors found"

;

166

request->GetStartTimestamp());

169

reply->SetCompleted();

176

list<shared_ptr<IPSGS_Processor>>

178

shared_ptr<CPSGS_Reply> reply,

179  const

list<string> & processor_names)

181  if

(processor_names.empty()) {

183  "Try to dispatch empty list of processors"

);

186

list<shared_ptr<IPSGS_Processor>> ret;

189  auto

request_id = request->GetRequestId();

195  if

(find(processor_names.begin(), processor_names.end(),

196  proc

->GetName()) == processor_names.end()) {

202  size_t

proc_index = proc_count - priority;

207  if

(current_count >= limit) {

208

request->AddLimitedProcessor(

proc

->GetName(), limit);

211  if

(request->NeedTrace()) {

213

reply->SendTrace(

"Processor: "

+

proc

->GetName() +

214  " will not be tried to create because" 215  " the processor group limit has been exceeded." 216  " Limit: "

+ to_string(limit) +

217  " Current count: "

+ to_string(current_count),

218

request->GetStartTimestamp(),

false

);

227  for

(

const auto

& name :

proc

->WhatCanProcess(request, reply)) {

236  if

(request->NeedTrace()) {

238

reply->SendTrace(

"Try to create processor: "

+

proc

->GetName(),

239

request->GetStartTimestamp(),

false

);

241

shared_ptr<IPSGS_Processor> p(

proc

->CreateProcessor(request, reply,

248

p->SetUVThreadId(uv_thread_self());

252

procs->m_Processors.emplace_back(p,

ePSGS_Up

,

255  if

(request->NeedTrace()) {

257

reply->SendTrace(

"Processor "

+ p->GetName() +

258  " has been created successfully (priority: "

+

259

to_string(priority) +

")"

,

260

request->GetStartTimestamp(),

false

);

263  if

(request->NeedTrace()) {

265

reply->SendTrace(

"Processor "

+

proc

->GetName() +

266  " has not been created"

,

267

request->GetStartTimestamp(),

false

);

276  size_t

limited_processor_count = request->GetLimitedProcessorCount();

279

request->SetRequestContext();

281  if

(limited_processor_count == 0) {

282  msg

=

"No matching processors found"

;

286  msg

= to_string(limited_processor_count) +

" of "

+

287

to_string(processor_names.size()) +

288  " processor(s) were not instantiated due to the limit on " 289  "concurrent processors is exceeded ("

+

290

request->GetLimitedProcessorsMessage() +

")"

;

295

reply->PrepareReplyMessage(

msg

, status_code,

297

reply->PrepareReplyCompletion(status_code,

298

request->GetStartTimestamp());

301

reply->SetCompleted();

305  auto

* grp = procs.release();

311

unique_ptr<SProcessorGroup>> req_grp = make_pair(request_id,

312

unique_ptr<SProcessorGroup>(grp));

350  bool

need_trace = processor->

GetRequest

()->NeedTrace();

351  size_t

request_id = processor->

GetRequest

()->GetRequestId();

354

vector<IPSGS_Processor *> to_be_canceled;

371  "Processor: "

+ processor->

GetName

() +

" (priority: "

+

373  ") signalled start dealing with data for the client"

,

374

processor->

GetRequest

()->GetStartTimestamp(),

386  for

(

const auto

&

proc

: procs->second->m_Processors) {

387  if

(

proc

.m_Processor.get() == processor) {

394  PSG_ERROR

(

"Logic error: the processor dispatcher received " 395  "'start processing' when its dispatch status is not UP (i.e. FINISHED)"

);

403  for

(

auto

&

proc

: procs->second->m_Processors) {

404

current_proc =

proc

.m_Processor.get();

406  if

(current_proc != processor) {

409

to_be_canceled.push_back(current_proc);

415

procs->second->m_StartedProcessing = processor;

420  for

(

auto

&

proc

: to_be_canceled) {

425  "Invoking Cancel() for the processor: "

+

proc

->GetName() +

426  " (priority: "

+ to_string(

proc

->GetPriority()) +

")"

,

427

processor->

GetRequest

()->GetStartTimestamp(),

false

);

446  PSG_ERROR

(

"SignalFinishProcessing() is called not from an assigned " 447  "thread (and libuv loop). " 448  "Current thread: "

<< uv_thread_self() <<

450  " Processor: "

<< processor->

GetName

() <<

463  auto

reply = processor->

GetReply

();

465  size_t

request_id = request->GetRequestId();

468  bool

need_trace = request->NeedTrace();

469  bool

started_processor_finished =

false

;

470

vector<IPSGS_Processor::EPSGS_Status> proc_statuses;

472  size_t

finishing_count = 0;

473  size_t

finished_count = 0;

480  "Dispatcher received signal (from "

+

482  ") that the processor "

+ processor->

GetName

() +

483  " (priority: "

+ to_string(processor->

GetPriority

()) +

484  ") finished while the reported status is "

+

486  ". Ignore this call and continue."

,

502  for

(

auto

&

proc

: procs->second->m_Processors) {

504

proc_statuses.push_back(

proc

.m_Processor->GetStatus());

506  if

(

proc

.m_Processor.get() == processor) {

510  if

(

proc

.m_DoneStatusRegistered ==

false

) {

514  proc

.m_DoneStatusRegistered =

true

;

519  if

(

proc

.m_ProcPerformanceRegistered ==

false

) {

522  proc

.m_ProcPerformanceRegistered =

true

;

531  if

(processor_status !=

proc

.m_LastReportedTraceStatus) {

532  proc

.m_LastReportedTraceStatus = processor_status;

534  "Dispatcher received signal (from framework)" 535  " that the processor "

+ processor->

GetName

() +

536  " (priority: "

+ to_string(processor->

GetPriority

()) +

537  ") has changed the status to "

+

545  switch

(

proc

.m_DispatchStatus) {

565  switch

(

proc

.m_DispatchStatus) {

571  "Dispatcher received signal (from processor)" 572  " that the processor "

+ processor->

GetName

() +

573  " (priority: "

+ to_string(processor->

GetPriority

()) +

574  ") finished with status "

+

576  " when the dispatcher already knows that the " 577  "processor finished (registered status: "

+

588  proc

.m_FinishStatus = processor_status;

593  "Dispatcher received signal (from processor)" 594  " that the processor "

+ processor->

GetName

() +

595  " (priority: "

+ to_string(processor->

GetPriority

()) +

596  ") finished with status "

+

604  "Dispatcher received signal (from processor)" 605  " that the previously canceled processor "

+

607  " (priority: "

+ to_string(processor->

GetPriority

()) +

608  ") finished with status "

+

623  if

(processor == procs->second->m_StartedProcessing) {

625

started_processor_finished =

true

;

632  switch

(

proc

.m_DispatchStatus) {

654  size_t total_count

= procs->second->m_Processors.size();

655  bool

pre_finished = (finished_count + finishing_count) ==

total_count

;

656  bool

fully_finished = finished_count ==

total_count

;

657  bool

need_unlock =

true

;

658  bool

safe_to_delete =

false

;

660  if

(fully_finished) {

661

procs->second->m_AllProcessorsFinished =

true

;

662

safe_to_delete = procs->second->IsSafeToDelete();

670

procs->second->StopRequestTimer();

673  if

(procs->second->m_StartedProcessing ==

nullptr

) {

677

started_processor_finished =

true

;

680  if

(!reply->IsFinished() && reply->IsOutputReady() && started_processor_finished) {

686

procs->second->m_LowLevelClose);

690

procs->second->m_LowLevelClose);

695  "Dispatcher: request processing finished; " 696  "final status: "

+ to_string(request_status) +

697  ". The processors group will be deleted later."

,

701

reply->PrepareReplyCompletion(request_status,

702

request->GetStartTimestamp());

703

procs->second->m_FinallyFlushed =

true

;

709

procs->second->m_RequestStopPrinted =

true

;

713

need_unlock =

false

;

718

reply->SetCompleted();

728  if

(safe_to_delete) {

736

app->GetUvLoopBinder(processor->

GetUVThreadId

()).PostponeInvoke(

754

vector<IPSGS_Processor *> to_be_canceled;

765  for

(

auto

&

proc

: procs->second->m_Processors) {

768

to_be_canceled.push_back(

proc

.m_Processor.get());

773  for

(

auto

&

proc

: to_be_canceled) {

783  auto

request_type = request->GetRequestType();

790  switch

(request_type) {

792

counters.Increment(

nullptr

,

796

counters.Increment(

nullptr

,

800

counters.Increment(

nullptr

,

804

counters.Increment(

nullptr

,

808

counters.Increment(

nullptr

,

812

counters.Increment(

nullptr

,

816

counters.Increment(

nullptr

,

823

request->SetRequestContext();

857

shared_ptr<CPSGS_Reply> reply,

858

vector<IPSGS_Processor::EPSGS_Status> proc_statuses,

859  bool

low_level_close)

864  if

(low_level_close) {

875  size_t

count_200 = 0;

876  size_t

count_404_or_cancel = 0;

877  size_t

count_timeout = 0;

878  size_t

count_unauthorized = 0;

879  for

(

const auto

status: proc_statuses) {

886

++count_404_or_cancel;

894

++count_unauthorized;

906  size_t

count_limited_procs = request->GetLimitedProcessorCount();

907  if

(count_404_or_cancel == proc_statuses.size() &&

908

count_limited_procs == 0) {

914  if

(count_unauthorized > 0) {

922  if

(count_timeout > 0) {

927  if

(count_limited_procs > 0) {

929  string msg

=

"Instantiated processors found nothing and there were "

+

930

to_string(count_limited_procs) +

931  " processor(s) which have not been tried to be instantiated " 932  "due to their concurrency limit has been exceeded ("

+

933

request->GetLimitedProcessorsMessage() +

")"

;

946

shared_ptr<CPSGS_Request> request,

947

shared_ptr<CPSGS_Reply> reply,

948  bool

low_level_close)

950  if

(low_level_close) {

970  for

(

const auto

& item : processed_names) {

971

not_found_names.

erase

(item.second);

972

error_names.

erase

(item.second);

976  int

overall_status = 200;

979  size_t

count_200 = 0;

980  for

(

const auto

& name : annot_request->

m_Names

) {

981  if

(annot_request->

WasSent

(name)) {

982

result_per_na[name] = 200;

987  auto

it = error_names.

find

(name);

988  if

(not_found_names.

find

(name) != not_found_names.

end

() &&

989

it == error_names.

end

()) {

991

result_per_na[name] = 404;

995  if

(it != error_names.

end

()) {

997

result_per_na[name] = it->second;

998

overall_status =

max

(overall_status, it->second);

1005

result_per_na[name] = 404;

1008  if

(!low_level_close) {

1010

reply->SendPerNamedAnnotationResults(

ToJsonString

(result_per_na));

1013  if

(overall_status == 200) {

1014  if

(count_200 == 0) {

1015

overall_status = 404;

1025

shared_ptr<CPSGS_Request> request,

1026

shared_ptr<CPSGS_Reply> reply)

1029

reply->SendTrace(

msg

, request->GetStartTimestamp(),

false

);

1036

shared_ptr<CPSGS_Request> request,

1037

shared_ptr<CPSGS_Reply> reply)

1042

request->NeedProcessorEvents()) {

1043

reply->PrepareProcessorProgressMessage(

1057

vector<IPSGS_Processor *> to_be_canceled;

1060  size_t

total_procs = 0;

1061  size_t

cancel_count = 0;

1074  if

(!procs->second->m_FinallyFlushed) {

1076

first_proc = procs->second->m_Processors[0].m_Processor.get();

1077

total_procs = procs->second->m_Processors.size();

1082  for

(

auto

&

proc

: procs->second->m_Processors) {

1084  if

(!proc_names.empty())

1085

proc_names +=

", "

;

1086

proc_names +=

proc

.m_Processor->GetName() +

1091

to_be_canceled.push_back(

proc

.m_Processor.get());

1101

procs->second->m_LowLevelClose =

true

;

1107  if

(total_procs > 0) {

1109

first_proc->

GetRequest

()->SetRequestContext();

1111  string

request_name = first_proc->

GetRequest

()->GetName();

1114  if

(cancel_count > 0) {

1115  PSG_WARNING

(

"The client connection has been closed. "

+

1116

to_string(cancel_count) +

" out of "

+

1117

to_string(total_procs) +

1118  " processors were in progress and will be canceled " 1119  "(connection id: "

+ to_string(connection_id) +

1120  "; request: "

+ request_name +

"["

+ to_string(request_id) +

1121  "]; processor(s): "

+ proc_names +

")."

);

1123  PSG_WARNING

(

"The client connection has been closed. " 1124  "There were 0 processors in progress (out of "

+

1125

to_string(total_procs) +

1126  ") so no processors will be canceled " 1127  "(connection id: "

+ to_string(connection_id) +

1128  "; request: "

+ request_name +

"["

+ to_string(request_id) +

1129  "]; processor(s): "

+ proc_names +

")."

);

1133  for

(

auto

&

proc

: to_be_canceled) {

1137  if

(first_proc !=

nullptr

) {

1145

app->GetUvLoopBinder(first_proc->

GetUVThreadId

()).PostponeInvoke(

1159

vector<IPSGS_Processor *> to_be_canceled;

1162  for

(

size_t

bucket_index = 0; bucket_index <

PROC_BUCKETS

; ++bucket_index) {

1166  for

(

auto

&

proc

: procs.second->m_Processors) {

1168

to_be_canceled.push_back(

proc

.m_Processor.get());

1175  for

(

auto

&

proc

: to_be_canceled) {

1178

to_be_canceled.clear();

1186

vector<IPSGS_Processor *> to_be_canceled;

1198

vector<SProcessorData> & processors = procs->second->m_Processors;

1199  auto

* first_processor = processors.front().m_Processor.get();

1200  auto

reply = first_processor->GetReply();

1201  auto

request = first_processor->GetRequest();

1202  unsigned long

from_last_activity_to_now_ms =

1203

reply->GetTimespanFromLastActivityToNowMks() / 1000;

1210  if

(request->NeedTrace()) {

1212

reply->SendTrace(

"The request timer of "

+

1214  " ms triggered however the last activity with the " 1215  "reply was "

+ to_string(from_last_activity_to_now_ms) +

1216  " ms ago. The request timer will be restarted."

,

1217

request->GetStartTimestamp(),

false

);

1221

procs->second->RestartTimer(timeout);

1226  if

(request->NeedTrace()) {

1228

reply->SendTrace(

"The request timer of "

+

1230  " ms is over. All the not canceled yet processors " 1231  "will receive the Cancel() call"

,

1232

request->GetStartTimestamp(),

false

);

1235

reply->PrepareRequestTimeoutMessage(

1236  "Timed out due to prolonged backend(s) inactivity. No response for "

+

1241  for

(

auto

&

proc

: procs->second->m_Processors) {

1243

to_be_canceled.push_back(

proc

.m_Processor.get());

1249  for

(

auto

&

proc

: to_be_canceled) {

1264

procs->second->m_Libh2oFinished =

true

;

1266  if

(procs->second->IsSafeToDelete()) {

1267  auto

processor = procs->second->m_Processors[0].m_Processor;

1269  if

(processor->IsUVThreadAssigned()) {

1272

app->GetUvLoopBinder(processor->GetUVThreadId()).PostponeInvoke(

1296  if

(!procs->second->IsSafeToDelete()) {

1301  if

(!procs->second->m_RequestStopPrinted) {

1302  if

(procs->second->m_LowLevelClose) {

1308  auto

processor = procs->second->m_Processors.begin()->m_Processor;

1309  auto

reply = processor->GetReply();

1310  auto

request = processor->GetRequest();

1317

procs->second->m_LowLevelClose);

1320

vector<IPSGS_Processor::EPSGS_Status> proc_statuses;

1321  for

(

auto

&

proc

: procs->second->m_Processors) {

1322

proc_statuses.push_back(

proc

.m_Processor->GetStatus());

1325

procs->second->m_LowLevelClose);

1328

procs->second->m_RequestStopPrinted =

true

;

1341  for

(

const auto

& proc_data : procs->second->m_Processors) {

1342  if

(!per_proc.empty())

1344

per_proc +=

"processor: "

+ proc_data.m_Processor->GetName() +

1345  " [dispatch status: "

+ to_string(proc_data.m_DispatchStatus) +

1346  "; finish status: "

+ to_string(proc_data.m_FinishStatus) +

1347  "; done status registered: "

+ to_string(proc_data.m_DoneStatusRegistered) +

1348  "; proc performance registered: "

+ to_string(proc_data.m_ProcPerformanceRegistered) +

1352  string

started_proc =

"started processor: "

;

1353  if

(procs->second->m_StartedProcessing)

1354

started_proc += procs->second->m_StartedProcessing->GetName();

1356

started_proc +=

"none"

;

1358  PSG_ERROR

(

"Removing a processor group for which " 1359  "request stop has not been issued " 1360  "(request id: "

+ to_string(request_id) +

1361  "; timer active: "

+ to_string(procs->second->m_TimerActive) +

1362  "; timer closed: "

+ to_string(procs->second->m_TimerClosed) +

1363  "; request final flush: "

+ to_string(procs->second->m_FinallyFlushed) +

1364  "; all processors finished: "

+ to_string(procs->second->m_AllProcessorsFinished) +

1365  "; lib h2o finished: "

+ to_string(procs->second->m_Libh2oFinished) +

1366  "; low level close: "

+ to_string(procs->second->m_LowLevelClose) +

1367  "; number of processors: "

+ to_string(procs->second->m_Processors.size()) +

1369  "; "

+ started_proc +

")"

);

1373  auto

first_processor = procs->second->m_Processors.begin()->m_Processor;

1374

request_type = first_processor->GetRequest()->GetRequestType();

1375

group = procs->second.release();

1384  if

(group !=

nullptr

) {

1402

procs->second->m_TimerClosed =

true

;

1404  if

(procs->second->IsSafeToDelete()) {

1418  size_t

proc_priority = processor->

GetPriority

();

1419  size_t

proc_index = proc_count - proc_priority;

1430  size_t

current_count;

1431  size_t

limit_reached_count;

1433

&limit_reached_count);

1445  bool

found =

false

;

1456  for

(

size_t

index=0; index <

PROC_BUCKETS

; ++index) {

1460  IPSGS_Processor

* first_proc = processors_group.second->m_Processors[0].m_Processor.get();

1461

shared_ptr<CPSGS_Request> request = first_proc->

GetRequest

();

1464

proc_group.

SetByKey

(

"Request details"

, request->Serialize());

1465

proc_group.

SetBoolean

(

"Timer active"

, processors_group.second->m_TimerActive);

1466

proc_group.

SetBoolean

(

"Timer handle closed"

, processors_group.second->m_TimerClosed);

1467

proc_group.

SetBoolean

(

"Finally flushed"

, processors_group.second->m_FinallyFlushed);

1468

proc_group.

SetBoolean

(

"All processors finished"

, processors_group.second->m_AllProcessorsFinished);

1469

proc_group.

SetBoolean

(

"Libh2o finished"

, processors_group.second->m_Libh2oFinished);

1470

proc_group.

SetBoolean

(

"Low level close"

, processors_group.second->m_LowLevelClose);

1471

proc_group.

SetBoolean

(

"Is safe to delete"

, processors_group.second->IsSafeToDelete());

1473  if

(processors_group.second->m_StartedProcessing ==

nullptr

)

1474

proc_group.

SetNull

(

"Signal start processor"

);

1476

proc_group.

SetString

(

"Signal start processor"

, processors_group.second->m_StartedProcessing->GetName());

1479  auto

now = psg_clock_t::now();

1481  for

(

const auto

& processor : processors_group.second->m_Processors) {

1490

timestamp = processor.m_Processor->GetProcessInvokeTimestamp(

is_valid

);

1492

mks = chrono::duration_cast<chrono::microseconds>(now - timestamp).count();

1493  proc

.SetInteger(

"processor started ago mks"

, mks);

1495  proc

.SetString(

"processor started ago mks"

,

"Not happened yet"

);

1498

timestamp = processor.m_Processor->GetSignalStartTimestamp(

is_valid

);

1500

mks = chrono::duration_cast<chrono::microseconds>(now - timestamp).count();

1501  proc

.SetInteger(

"processor signal start ago mks"

, mks);

1503  proc

.SetString(

"processor signal start ago mks"

,

"Not happened yet"

);

1506

timestamp = processor.m_Processor->GetSignalFinishTimestamp(

is_valid

);

1508

mks = chrono::duration_cast<chrono::microseconds>(now - timestamp).count();

1509  proc

.SetInteger(

"processor signal finish ago mks"

, mks);

1511  proc

.SetString(

"processor signal finish ago mks"

,

"Not happened yet"

);

1514  proc

.SetString(

"processor started ago mks"

,

"Collecting switched off"

);

1515  proc

.SetString(

"processor signal start ago mks"

,

"Collecting switched off"

);

1516  proc

.SetString(

"processor signal finish ago mks"

,

"Collecting switched off"

);

1519  proc

.SetString(

"processor name"

, processor.m_Processor->GetName());

1520  proc

.SetString(

"processor group name"

, processor.m_Processor->GetGroupName());

1521  proc

.SetInteger(

"processor priority"

, processor.m_Processor->GetPriority());

1522  proc

.SetString(

"dispatch status"

,

1524  proc

.SetString(

"finish status"

,

1526  proc

.SetString(

"processor reported status"

,

1531

proc_group.

SetByKey

(

"Processors"

, processors);

1532

status.

Append

(proc_group);

void RegisterProcessorGroupName(const string &group_name, TProcessorPriority priority)

void UnregisterActiveProcGroup(CPSGS_Request::EPSGS_Type request_type, SProcessorGroup *proc_group)

void RegisterActiveProcGroup(CPSGS_Request::EPSGS_Type request_type, SProcessorGroup *proc_group)

static CJsonNode NewArrayNode()

Create a new JSON array node.

void SetString(const string &key, const string &value)

Set a JSON object element to the specified string value.

void SetNull(const string &key)

Set a JSON object element to the specified null value.

void SetBoolean(const string &key, bool value)

Set a JSON object element to the specified boolean value.

void SetByKey(const string &key, CJsonNode::TInstance value)

For a JSON object node, insert a new element or update an existing element.

static CJsonNode NewObjectNode()

Create a new JSON object node.

void Append(CJsonNode::TInstance value)

For an array node, add a new element at the end of the array.

void RegisterProcessorPerformance(IPSGS_Processor *processor, IPSGS_Processor::EPSGS_Status proc_finish_status)

void RegisterProcessorDone(CPSGS_Request::EPSGS_Type request_type, IPSGS_Processor *processor)

void RegisterForTimeSeries(CPSGS_Request::EPSGS_Type request_type, CRequestStatus::ECode status)

void IncrementRequestStopCounter(int status)

@ ePSGS_GetBlobBySeqIdRequest

@ ePSGS_AccessionVersionHistory

@ ePSGS_GetBlobBySatSatKeyRequest

@ ePSGS_GetNamedAnnotations

void Increment(IPSGS_Processor *processor, EPSGS_CounterType counter)

void SignalFinishProcessing(IPSGS_Processor *processor, EPSGS_SignalSource source)

The processor signals that it finished one way or another; including when a processor is canceled.

bool IsGroupAlive(size_t request_id)

map< string, size_t > GetConcurrentCounters(void)

void OnRequestTimer(size_t request_id)

void AddProcessor(unique_ptr< IPSGS_Processor > processor)

Register processor (one to serve as a processor factory)

vector< string > m_RegisteredProcessorGroups

void PopulateStatus(CJsonNode &status)

CRequestStatus::ECode x_ConcludeRequestStatus(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, vector< IPSGS_Processor::EPSGS_Status > proc_statuses, bool low_level_close)

uint64_t m_RequestTimeoutMillisec

map< string, size_t > GetProcessorGroupToIndexMap(void) const

Provides a map between a processor group name and a unique zero-based index of the group.

SProcessorConcurrency m_ProcessorConcurrency[16]

void NotifyRequestFinished(size_t request_id)

void OnRequestTimerClose(size_t request_id)

CRequestStatus::ECode x_MapProcessorFinishToStatus(IPSGS_Processor::EPSGS_Status status) const

void OnLibh2oFinished(size_t request_id)

CRequestStatus::ECode x_ConcludeIDGetNARequestStatus(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, bool low_level_close)

void x_PrintRequestStop(shared_ptr< CPSGS_Request > request, CRequestStatus::ECode status, size_t bytes_sent)

void StartRequestTimer(size_t request_id)

void x_SendProgressMessage(IPSGS_Processor::EPSGS_Status finish_status, IPSGS_Processor *processor, shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply)

static string SignalSourceToString(EPSGS_SignalSource source)

void x_SendTrace(const string &msg, shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply)

list< string > PreliminaryDispatchRequest(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply)

Return list of processor names which reported that they can process the request.

void x_DecrementConcurrencyCounter(IPSGS_Processor *processor)

static string ProcessorStatusToString(EPSGS_ProcessorStatus st)

void EraseProcessorGroup(size_t request_id)

size_t x_GetBucketIndex(size_t request_id) const

unordered_map< size_t, unique_ptr< SProcessorGroup > > m_ProcessorGroups[100]

void RegisterProcessorsForMomentousCounters(void)

void SignalConnectionCanceled(size_t request_id)

An http connection can be canceled so this method will be invoked for such a case.

list< shared_ptr< IPSGS_Processor > > DispatchRequest(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, const list< string > &processor_names)

Return list of processors which can be used to process the request.

IPSGS_Processor::EPSGS_StartProcessing SignalStartProcessing(IPSGS_Processor *processor)

The processor signals that it is going to provide data to the client.

list< unique_ptr< IPSGS_Processor > > m_RegisteredProcessors

@ ePSGS_AnnotationRequest

@ ePSGS_BlobBySatSatKeyRequest

@ ePSGS_IPGResolveRequest

@ ePSGS_AccessionVersionHistoryRequest

@ ePSGS_BlobBySeqIdRequest

CPSGSCounters & GetCounters(void)

COperationTiming & GetTiming(void)

static CPubseqGatewayApp * GetInstance(void)

Interface class (and self-factory) for request processor objects that can retrieve data from a given ...

virtual string GetName(void) const =0

Tells the processor name (used in logging and tracing)

static string StatusToString(EPSGS_Status status)

Converts the processor status to a string for tracing and logging purposes.

virtual EPSGS_Status GetStatus(void)=0

Tells the processor status (if it has finished or in progress)

shared_ptr< CPSGS_Reply > GetReply(void) const

Provides the reply wrapper.

uv_thread_t GetUVThreadId(void) const

Provides the libuv thread id which runs the processor.

shared_ptr< CPSGS_Request > GetRequest(void) const

Provides the user request.

bool IsUVThreadAssigned(void) const

Tells if a libuv thread id has been assigned to the processor.

EPSGS_StartProcessing

Tells wether to continue or not after a processor called SignalStartProcessing() method.

TProcessorPriority GetPriority(void) const

Provides the processor priority.

EPSGS_Status

The GetStatus() method returns a processor current status.

static string StatusToProgressMessage(EPSGS_Status status)

Converts the processor status to a string for protocol message.

const_iterator end() const

const_iterator find(const key_type &key) const

const_iterator find(const key_type &key) const

const_iterator end() const

static bool is_valid(const char *num, int type, CONV_RESULT *cr)

void PrintRequestStop(void)

Print request stop message (for request-driven applications)

void SetBytesWr(Int8 bytes)

CDiagContext & GetDiagContext(void)

Get diag context instance.

static void SetRequestContext(CRequestContext *ctx)

Shortcut to CDiagContextThreadData::GetThreadData().SetRequestContext()

static CRequestContext & GetRequestContext(void)

Shortcut to CDiagContextThreadData::GetThreadData().GetRequestContext()

void SetRequestStatus(int status)

#define ERR_POST(message)

Error posting with file, line number information but without error codes.

void Reset(void)

Reset all properties to the initial state.

@ eDiag_Error

Error message.

@ e499_BrokenConnection

Non-standard status code - used to indicate broken connection while serving normal request.

@ e503_ServiceUnavailable

@ e500_InternalServerError

void Critical(CExceptionArgs_Base &args)

#define NCBI_THROW(exception_class, err_code, message)

Generic macro to throw an exception, given the exception class, error code and message string.

const CharType(& source)[N]

static unsigned long int total_count

void request_timer_cb(uv_timer_t *handle)

bool g_AllowProcessorTiming

void erase_processor_group_cb(void *user_data)

void request_timer_close_cb(uv_handle_t *handle)

#define MAX_PROCESSOR_GROUPS

string ToJsonString(const CBioseqInfoRecord &bioseq_info, SPSGS_ResolveRequest::TPSGS_BioseqIncludeData include_data_flags, const string &custom_blob_id)

#define PSG_ERROR(message)

#define PSG_WARNING(message)

@ ePSGS_NotFoundAndNotInstantiated

psg_clock_t::time_point psg_time_point_t

static SLJIT_INLINE sljit_ins msg(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)

size_t GetCurrentCount(void) const

void IncrementLimitReachedCount(void)

void IncrementCurrentCount(void)

void DecrementCurrentCount(void)

void GetCurrentAndLimitReachedCounts(size_t *current, size_t *limit_reached)

bool WasSent(const string &annot_name) const

map< string, int > GetErrorNames(void) const

set< string > GetNotFoundNames(void) const

vector< pair< TProcessorPriority, string > > GetProcessedNames(void) const

void ReportResultStatus(const string &annot_name, EPSGS_ResultStatus rs)


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4