request_id = (size_t)(handle->data);
51app->GetProcessorDispatcher()->OnRequestTimer(request_id);
58 size_trequest_id = (size_t)(handle->data);
60app->GetProcessorDispatcher()->OnRequestTimerClose(request_id);
67 size_trequest_id = (size_t)(user_data);
68app->GetProcessorDispatcher()->EraseProcessorGroup(request_id);
78 "has been reached. Please increase the MAX_PROCESSOR_GROUPS " 84 stringprocessor_group_name = processor->GetGroupName();
88processor_group_name);
93 "The group '"<< processor_group_name <<
"' is tried to be " 94 "registered more than once. Exiting.");
99 size_tlimit = app->GetProcessorMaxConcurrency(processor_group_name);
127ret[
proc->GetGroupName()] = index;
136shared_ptr<CPSGS_Reply> reply)
141 if(
proc->CanProcess(request, reply)) {
142ret.push_back(
proc->GetName());
143 if(request->NeedTrace()) {
145reply->SendTrace(
proc->GetName() +
146 " processor reports it can process request",
147request->GetStartTimestamp(),
false);
150 if(request->NeedTrace()) {
152reply->SendTrace(
proc->GetName() +
153 " processor reports it cannot process request",
154request->GetStartTimestamp(),
false);
160 string msg=
"No matching processors found";
166request->GetStartTimestamp());
169reply->SetCompleted();
176list<shared_ptr<IPSGS_Processor>>
178shared_ptr<CPSGS_Reply> reply,
179 constlist<string> & processor_names)
181 if(processor_names.empty()) {
183 "Try to dispatch empty list of processors");
186list<shared_ptr<IPSGS_Processor>> ret;
189 autorequest_id = request->GetRequestId();
195 if(find(processor_names.begin(), processor_names.end(),
196 proc->GetName()) == processor_names.end()) {
202 size_tproc_index = proc_count - priority;
207 if(current_count >= limit) {
208request->AddLimitedProcessor(
proc->GetName(), limit);
211 if(request->NeedTrace()) {
213reply->SendTrace(
"Processor: "+
proc->GetName() +
214 " will not be tried to create because" 215 " the processor group limit has been exceeded." 216 " Limit: "+ to_string(limit) +
217 " Current count: "+ to_string(current_count),
218request->GetStartTimestamp(),
false);
227 for(
const auto& name :
proc->WhatCanProcess(request, reply)) {
236 if(request->NeedTrace()) {
238reply->SendTrace(
"Try to create processor: "+
proc->GetName(),
239request->GetStartTimestamp(),
false);
241shared_ptr<IPSGS_Processor> p(
proc->CreateProcessor(request, reply,
248p->SetUVThreadId(uv_thread_self());
252procs->m_Processors.emplace_back(p,
ePSGS_Up,
255 if(request->NeedTrace()) {
257reply->SendTrace(
"Processor "+ p->GetName() +
258 " has been created successfully (priority: "+
259to_string(priority) +
")",
260request->GetStartTimestamp(),
false);
263 if(request->NeedTrace()) {
265reply->SendTrace(
"Processor "+
proc->GetName() +
266 " has not been created",
267request->GetStartTimestamp(),
false);
276 size_tlimited_processor_count = request->GetLimitedProcessorCount();
279request->SetRequestContext();
281 if(limited_processor_count == 0) {
282 msg=
"No matching processors found";
286 msg= to_string(limited_processor_count) +
" of "+
287to_string(processor_names.size()) +
288 " processor(s) were not instantiated due to the limit on " 289 "concurrent processors is exceeded ("+
290request->GetLimitedProcessorsMessage() +
")";
295reply->PrepareReplyMessage(
msg, status_code,
297reply->PrepareReplyCompletion(status_code,
298request->GetStartTimestamp());
301reply->SetCompleted();
305 auto* grp = procs.release();
311unique_ptr<SProcessorGroup>> req_grp = make_pair(request_id,
312unique_ptr<SProcessorGroup>(grp));
350 boolneed_trace = processor->
GetRequest()->NeedTrace();
351 size_trequest_id = processor->
GetRequest()->GetRequestId();
354vector<IPSGS_Processor *> to_be_canceled;
371 "Processor: "+ processor->
GetName() +
" (priority: "+
373 ") signalled start dealing with data for the client",
374processor->
GetRequest()->GetStartTimestamp(),
386 for(
const auto&
proc: procs->second->m_Processors) {
387 if(
proc.m_Processor.get() == processor) {
394 PSG_ERROR(
"Logic error: the processor dispatcher received " 395 "'start processing' when its dispatch status is not UP (i.e. FINISHED)");
403 for(
auto&
proc: procs->second->m_Processors) {
404current_proc =
proc.m_Processor.get();
406 if(current_proc != processor) {
409to_be_canceled.push_back(current_proc);
415procs->second->m_StartedProcessing = processor;
420 for(
auto&
proc: to_be_canceled) {
425 "Invoking Cancel() for the processor: "+
proc->GetName() +
426 " (priority: "+ to_string(
proc->GetPriority()) +
")",
427processor->
GetRequest()->GetStartTimestamp(),
false);
446 PSG_ERROR(
"SignalFinishProcessing() is called not from an assigned " 447 "thread (and libuv loop). " 448 "Current thread: "<< uv_thread_self() <<
450 " Processor: "<< processor->
GetName() <<
463 autoreply = processor->
GetReply();
465 size_trequest_id = request->GetRequestId();
468 boolneed_trace = request->NeedTrace();
469 boolstarted_processor_finished =
false;
470vector<IPSGS_Processor::EPSGS_Status> proc_statuses;
472 size_tfinishing_count = 0;
473 size_tfinished_count = 0;
480 "Dispatcher received signal (from "+
482 ") that the processor "+ processor->
GetName() +
483 " (priority: "+ to_string(processor->
GetPriority()) +
484 ") finished while the reported status is "+
486 ". Ignore this call and continue.",
502 for(
auto&
proc: procs->second->m_Processors) {
504proc_statuses.push_back(
proc.m_Processor->GetStatus());
506 if(
proc.m_Processor.get() == processor) {
510 if(
proc.m_DoneStatusRegistered ==
false) {
514 proc.m_DoneStatusRegistered =
true;
519 if(
proc.m_ProcPerformanceRegistered ==
false) {
522 proc.m_ProcPerformanceRegistered =
true;
531 if(processor_status !=
proc.m_LastReportedTraceStatus) {
532 proc.m_LastReportedTraceStatus = processor_status;
534 "Dispatcher received signal (from framework)" 535 " that the processor "+ processor->
GetName() +
536 " (priority: "+ to_string(processor->
GetPriority()) +
537 ") has changed the status to "+
545 switch(
proc.m_DispatchStatus) {
565 switch(
proc.m_DispatchStatus) {
571 "Dispatcher received signal (from processor)" 572 " that the processor "+ processor->
GetName() +
573 " (priority: "+ to_string(processor->
GetPriority()) +
574 ") finished with status "+
576 " when the dispatcher already knows that the " 577 "processor finished (registered status: "+
588 proc.m_FinishStatus = processor_status;
593 "Dispatcher received signal (from processor)" 594 " that the processor "+ processor->
GetName() +
595 " (priority: "+ to_string(processor->
GetPriority()) +
596 ") finished with status "+
604 "Dispatcher received signal (from processor)" 605 " that the previously canceled processor "+
607 " (priority: "+ to_string(processor->
GetPriority()) +
608 ") finished with status "+
623 if(processor == procs->second->m_StartedProcessing) {
625started_processor_finished =
true;
632 switch(
proc.m_DispatchStatus) {
654 size_t total_count= procs->second->m_Processors.size();
655 boolpre_finished = (finished_count + finishing_count) ==
total_count;
656 boolfully_finished = finished_count ==
total_count;
657 boolneed_unlock =
true;
658 boolsafe_to_delete =
false;
660 if(fully_finished) {
661procs->second->m_AllProcessorsFinished =
true;
662safe_to_delete = procs->second->IsSafeToDelete();
670procs->second->StopRequestTimer();
673 if(procs->second->m_StartedProcessing ==
nullptr) {
677started_processor_finished =
true;
680 if(!reply->IsFinished() && reply->IsOutputReady() && started_processor_finished) {
686procs->second->m_LowLevelClose);
690procs->second->m_LowLevelClose);
695 "Dispatcher: request processing finished; " 696 "final status: "+ to_string(request_status) +
697 ". The processors group will be deleted later.",
701reply->PrepareReplyCompletion(request_status,
702request->GetStartTimestamp());
703procs->second->m_FinallyFlushed =
true;
709procs->second->m_RequestStopPrinted =
true;
713need_unlock =
false;
718reply->SetCompleted();
728 if(safe_to_delete) {
736app->GetUvLoopBinder(processor->
GetUVThreadId()).PostponeInvoke(
754vector<IPSGS_Processor *> to_be_canceled;
765 for(
auto&
proc: procs->second->m_Processors) {
768to_be_canceled.push_back(
proc.m_Processor.get());
773 for(
auto&
proc: to_be_canceled) {
783 autorequest_type = request->GetRequestType();
790 switch(request_type) {
792counters.Increment(
nullptr,
796counters.Increment(
nullptr,
800counters.Increment(
nullptr,
804counters.Increment(
nullptr,
808counters.Increment(
nullptr,
812counters.Increment(
nullptr,
816counters.Increment(
nullptr,
823request->SetRequestContext();
857shared_ptr<CPSGS_Reply> reply,
858vector<IPSGS_Processor::EPSGS_Status> proc_statuses,
859 boollow_level_close)
864 if(low_level_close) {
875 size_tcount_200 = 0;
876 size_tcount_404_or_cancel = 0;
877 size_tcount_timeout = 0;
878 size_tcount_unauthorized = 0;
879 for(
const autostatus: proc_statuses) {
886++count_404_or_cancel;
894++count_unauthorized;
906 size_tcount_limited_procs = request->GetLimitedProcessorCount();
907 if(count_404_or_cancel == proc_statuses.size() &&
908count_limited_procs == 0) {
914 if(count_unauthorized > 0) {
922 if(count_timeout > 0) {
927 if(count_limited_procs > 0) {
929 string msg=
"Instantiated processors found nothing and there were "+
930to_string(count_limited_procs) +
931 " processor(s) which have not been tried to be instantiated " 932 "due to their concurrency limit has been exceeded ("+
933request->GetLimitedProcessorsMessage() +
")";
946shared_ptr<CPSGS_Request> request,
947shared_ptr<CPSGS_Reply> reply,
948 boollow_level_close)
950 if(low_level_close) {
970 for(
const auto& item : processed_names) {
971not_found_names.
erase(item.second);
972error_names.
erase(item.second);
976 intoverall_status = 200;
979 size_tcount_200 = 0;
980 for(
const auto& name : annot_request->
m_Names) {
981 if(annot_request->
WasSent(name)) {
982result_per_na[name] = 200;
987 autoit = error_names.
find(name);
988 if(not_found_names.
find(name) != not_found_names.
end() &&
989it == error_names.
end()) {
991result_per_na[name] = 404;
995 if(it != error_names.
end()) {
997result_per_na[name] = it->second;
998overall_status =
max(overall_status, it->second);
1005result_per_na[name] = 404;
1008 if(!low_level_close) {
1010reply->SendPerNamedAnnotationResults(
ToJsonString(result_per_na));
1013 if(overall_status == 200) {
1014 if(count_200 == 0) {
1015overall_status = 404;
1025shared_ptr<CPSGS_Request> request,
1026shared_ptr<CPSGS_Reply> reply)
1029reply->SendTrace(
msg, request->GetStartTimestamp(),
false);
1036shared_ptr<CPSGS_Request> request,
1037shared_ptr<CPSGS_Reply> reply)
1042request->NeedProcessorEvents()) {
1043reply->PrepareProcessorProgressMessage(
1057vector<IPSGS_Processor *> to_be_canceled;
1060 size_ttotal_procs = 0;
1061 size_tcancel_count = 0;
1074 if(!procs->second->m_FinallyFlushed) {
1076first_proc = procs->second->m_Processors[0].m_Processor.get();
1077total_procs = procs->second->m_Processors.size();
1082 for(
auto&
proc: procs->second->m_Processors) {
1084 if(!proc_names.empty())
1085proc_names +=
", ";
1086proc_names +=
proc.m_Processor->GetName() +
1091to_be_canceled.push_back(
proc.m_Processor.get());
1101procs->second->m_LowLevelClose =
true;
1107 if(total_procs > 0) {
1109first_proc->
GetRequest()->SetRequestContext();
1111 stringrequest_name = first_proc->
GetRequest()->GetName();
1114 if(cancel_count > 0) {
1115 PSG_WARNING(
"The client connection has been closed. "+
1116to_string(cancel_count) +
" out of "+
1117to_string(total_procs) +
1118 " processors were in progress and will be canceled " 1119 "(connection id: "+ to_string(connection_id) +
1120 "; request: "+ request_name +
"["+ to_string(request_id) +
1121 "]; processor(s): "+ proc_names +
").");
1123 PSG_WARNING(
"The client connection has been closed. " 1124 "There were 0 processors in progress (out of "+
1125to_string(total_procs) +
1126 ") so no processors will be canceled " 1127 "(connection id: "+ to_string(connection_id) +
1128 "; request: "+ request_name +
"["+ to_string(request_id) +
1129 "]; processor(s): "+ proc_names +
").");
1133 for(
auto&
proc: to_be_canceled) {
1137 if(first_proc !=
nullptr) {
1145app->GetUvLoopBinder(first_proc->
GetUVThreadId()).PostponeInvoke(
1159vector<IPSGS_Processor *> to_be_canceled;
1162 for(
size_tbucket_index = 0; bucket_index <
PROC_BUCKETS; ++bucket_index) {
1166 for(
auto&
proc: procs.second->m_Processors) {
1168to_be_canceled.push_back(
proc.m_Processor.get());
1175 for(
auto&
proc: to_be_canceled) {
1178to_be_canceled.clear();
1186vector<IPSGS_Processor *> to_be_canceled;
1198vector<SProcessorData> & processors = procs->second->m_Processors;
1199 auto* first_processor = processors.front().m_Processor.get();
1200 autoreply = first_processor->GetReply();
1201 autorequest = first_processor->GetRequest();
1202 unsigned longfrom_last_activity_to_now_ms =
1203reply->GetTimespanFromLastActivityToNowMks() / 1000;
1210 if(request->NeedTrace()) {
1212reply->SendTrace(
"The request timer of "+
1214 " ms triggered however the last activity with the " 1215 "reply was "+ to_string(from_last_activity_to_now_ms) +
1216 " ms ago. The request timer will be restarted.",
1217request->GetStartTimestamp(),
false);
1221procs->second->RestartTimer(timeout);
1226 if(request->NeedTrace()) {
1228reply->SendTrace(
"The request timer of "+
1230 " ms is over. All the not canceled yet processors " 1231 "will receive the Cancel() call",
1232request->GetStartTimestamp(),
false);
1235reply->PrepareRequestTimeoutMessage(
1236 "Timed out due to prolonged backend(s) inactivity. No response for "+
1241 for(
auto&
proc: procs->second->m_Processors) {
1243to_be_canceled.push_back(
proc.m_Processor.get());
1249 for(
auto&
proc: to_be_canceled) {
1264procs->second->m_Libh2oFinished =
true;
1266 if(procs->second->IsSafeToDelete()) {
1267 autoprocessor = procs->second->m_Processors[0].m_Processor;
1269 if(processor->IsUVThreadAssigned()) {
1272app->GetUvLoopBinder(processor->GetUVThreadId()).PostponeInvoke(
1296 if(!procs->second->IsSafeToDelete()) {
1301 if(!procs->second->m_RequestStopPrinted) {
1302 if(procs->second->m_LowLevelClose) {
1308 autoprocessor = procs->second->m_Processors.begin()->m_Processor;
1309 autoreply = processor->GetReply();
1310 autorequest = processor->GetRequest();
1317procs->second->m_LowLevelClose);
1320vector<IPSGS_Processor::EPSGS_Status> proc_statuses;
1321 for(
auto&
proc: procs->second->m_Processors) {
1322proc_statuses.push_back(
proc.m_Processor->GetStatus());
1325procs->second->m_LowLevelClose);
1328procs->second->m_RequestStopPrinted =
true;
1341 for(
const auto& proc_data : procs->second->m_Processors) {
1342 if(!per_proc.empty())
1344per_proc +=
"processor: "+ proc_data.m_Processor->GetName() +
1345 " [dispatch status: "+ to_string(proc_data.m_DispatchStatus) +
1346 "; finish status: "+ to_string(proc_data.m_FinishStatus) +
1347 "; done status registered: "+ to_string(proc_data.m_DoneStatusRegistered) +
1348 "; proc performance registered: "+ to_string(proc_data.m_ProcPerformanceRegistered) +
1352 stringstarted_proc =
"started processor: ";
1353 if(procs->second->m_StartedProcessing)
1354started_proc += procs->second->m_StartedProcessing->GetName();
1356started_proc +=
"none";
1358 PSG_ERROR(
"Removing a processor group for which " 1359 "request stop has not been issued " 1360 "(request id: "+ to_string(request_id) +
1361 "; timer active: "+ to_string(procs->second->m_TimerActive) +
1362 "; timer closed: "+ to_string(procs->second->m_TimerClosed) +
1363 "; request final flush: "+ to_string(procs->second->m_FinallyFlushed) +
1364 "; all processors finished: "+ to_string(procs->second->m_AllProcessorsFinished) +
1365 "; lib h2o finished: "+ to_string(procs->second->m_Libh2oFinished) +
1366 "; low level close: "+ to_string(procs->second->m_LowLevelClose) +
1367 "; number of processors: "+ to_string(procs->second->m_Processors.size()) +
1369 "; "+ started_proc +
")");
1373 autofirst_processor = procs->second->m_Processors.begin()->m_Processor;
1374request_type = first_processor->GetRequest()->GetRequestType();
1375group = procs->second.release();
1384 if(group !=
nullptr) {
1402procs->second->m_TimerClosed =
true;
1404 if(procs->second->IsSafeToDelete()) {
1418 size_tproc_priority = processor->
GetPriority();
1419 size_tproc_index = proc_count - proc_priority;
1430 size_tcurrent_count;
1431 size_tlimit_reached_count;
1433&limit_reached_count);
1445 boolfound =
false;
1456 for(
size_tindex=0; index <
PROC_BUCKETS; ++index) {
1460 IPSGS_Processor* first_proc = processors_group.second->m_Processors[0].m_Processor.get();
1461shared_ptr<CPSGS_Request> request = first_proc->
GetRequest();
1464proc_group.
SetByKey(
"Request details", request->Serialize());
1465proc_group.
SetBoolean(
"Timer active", processors_group.second->m_TimerActive);
1466proc_group.
SetBoolean(
"Timer handle closed", processors_group.second->m_TimerClosed);
1467proc_group.
SetBoolean(
"Finally flushed", processors_group.second->m_FinallyFlushed);
1468proc_group.
SetBoolean(
"All processors finished", processors_group.second->m_AllProcessorsFinished);
1469proc_group.
SetBoolean(
"Libh2o finished", processors_group.second->m_Libh2oFinished);
1470proc_group.
SetBoolean(
"Low level close", processors_group.second->m_LowLevelClose);
1471proc_group.
SetBoolean(
"Is safe to delete", processors_group.second->IsSafeToDelete());
1473 if(processors_group.second->m_StartedProcessing ==
nullptr)
1474proc_group.
SetNull(
"Signal start processor");
1476proc_group.
SetString(
"Signal start processor", processors_group.second->m_StartedProcessing->GetName());
1479 autonow = psg_clock_t::now();
1481 for(
const auto& processor : processors_group.second->m_Processors) {
1490timestamp = processor.m_Processor->GetProcessInvokeTimestamp(
is_valid);
1492mks = chrono::duration_cast<chrono::microseconds>(now - timestamp).count();
1493 proc.SetInteger(
"processor started ago mks", mks);
1495 proc.SetString(
"processor started ago mks",
"Not happened yet");
1498timestamp = processor.m_Processor->GetSignalStartTimestamp(
is_valid);
1500mks = chrono::duration_cast<chrono::microseconds>(now - timestamp).count();
1501 proc.SetInteger(
"processor signal start ago mks", mks);
1503 proc.SetString(
"processor signal start ago mks",
"Not happened yet");
1506timestamp = processor.m_Processor->GetSignalFinishTimestamp(
is_valid);
1508mks = chrono::duration_cast<chrono::microseconds>(now - timestamp).count();
1509 proc.SetInteger(
"processor signal finish ago mks", mks);
1511 proc.SetString(
"processor signal finish ago mks",
"Not happened yet");
1514 proc.SetString(
"processor started ago mks",
"Collecting switched off");
1515 proc.SetString(
"processor signal start ago mks",
"Collecting switched off");
1516 proc.SetString(
"processor signal finish ago mks",
"Collecting switched off");
1519 proc.SetString(
"processor name", processor.m_Processor->GetName());
1520 proc.SetString(
"processor group name", processor.m_Processor->GetGroupName());
1521 proc.SetInteger(
"processor priority", processor.m_Processor->GetPriority());
1522 proc.SetString(
"dispatch status",
1524 proc.SetString(
"finish status",
1526 proc.SetString(
"processor reported status",
1531proc_group.
SetByKey(
"Processors", processors);
1532status.
Append(proc_group);
void RegisterProcessorGroupName(const string &group_name, TProcessorPriority priority)
void UnregisterActiveProcGroup(CPSGS_Request::EPSGS_Type request_type, SProcessorGroup *proc_group)
void RegisterActiveProcGroup(CPSGS_Request::EPSGS_Type request_type, SProcessorGroup *proc_group)
static CJsonNode NewArrayNode()
Create a new JSON array node.
void SetString(const string &key, const string &value)
Set a JSON object element to the specified string value.
void SetNull(const string &key)
Set a JSON object element to the specified null value.
void SetBoolean(const string &key, bool value)
Set a JSON object element to the specified boolean value.
void SetByKey(const string &key, CJsonNode::TInstance value)
For a JSON object node, insert a new element or update an existing element.
static CJsonNode NewObjectNode()
Create a new JSON object node.
void Append(CJsonNode::TInstance value)
For an array node, add a new element at the end of the array.
void RegisterProcessorPerformance(IPSGS_Processor *processor, IPSGS_Processor::EPSGS_Status proc_finish_status)
void RegisterProcessorDone(CPSGS_Request::EPSGS_Type request_type, IPSGS_Processor *processor)
void RegisterForTimeSeries(CPSGS_Request::EPSGS_Type request_type, CRequestStatus::ECode status)
void IncrementRequestStopCounter(int status)
@ ePSGS_GetBlobBySeqIdRequest
@ ePSGS_AccessionVersionHistory
@ ePSGS_GetBlobBySatSatKeyRequest
@ ePSGS_GetNamedAnnotations
void Increment(IPSGS_Processor *processor, EPSGS_CounterType counter)
void SignalFinishProcessing(IPSGS_Processor *processor, EPSGS_SignalSource source)
The processor signals that it finished one way or another; including when a processor is canceled.
bool IsGroupAlive(size_t request_id)
map< string, size_t > GetConcurrentCounters(void)
void OnRequestTimer(size_t request_id)
void AddProcessor(unique_ptr< IPSGS_Processor > processor)
Register processor (one to serve as a processor factory)
vector< string > m_RegisteredProcessorGroups
void PopulateStatus(CJsonNode &status)
CRequestStatus::ECode x_ConcludeRequestStatus(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, vector< IPSGS_Processor::EPSGS_Status > proc_statuses, bool low_level_close)
uint64_t m_RequestTimeoutMillisec
map< string, size_t > GetProcessorGroupToIndexMap(void) const
Provides a map between a processor group name and a unique zero-based index of the group.
SProcessorConcurrency m_ProcessorConcurrency[16]
void NotifyRequestFinished(size_t request_id)
void OnRequestTimerClose(size_t request_id)
CRequestStatus::ECode x_MapProcessorFinishToStatus(IPSGS_Processor::EPSGS_Status status) const
void OnLibh2oFinished(size_t request_id)
CRequestStatus::ECode x_ConcludeIDGetNARequestStatus(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, bool low_level_close)
void x_PrintRequestStop(shared_ptr< CPSGS_Request > request, CRequestStatus::ECode status, size_t bytes_sent)
void StartRequestTimer(size_t request_id)
void x_SendProgressMessage(IPSGS_Processor::EPSGS_Status finish_status, IPSGS_Processor *processor, shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply)
static string SignalSourceToString(EPSGS_SignalSource source)
void x_SendTrace(const string &msg, shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply)
list< string > PreliminaryDispatchRequest(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply)
Return list of processor names which reported that they can process the request.
void x_DecrementConcurrencyCounter(IPSGS_Processor *processor)
static string ProcessorStatusToString(EPSGS_ProcessorStatus st)
void EraseProcessorGroup(size_t request_id)
size_t x_GetBucketIndex(size_t request_id) const
unordered_map< size_t, unique_ptr< SProcessorGroup > > m_ProcessorGroups[100]
void RegisterProcessorsForMomentousCounters(void)
void SignalConnectionCanceled(size_t request_id)
An http connection can be canceled so this method will be invoked for such a case.
list< shared_ptr< IPSGS_Processor > > DispatchRequest(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, const list< string > &processor_names)
Return list of processors which can be used to process the request.
IPSGS_Processor::EPSGS_StartProcessing SignalStartProcessing(IPSGS_Processor *processor)
The processor signals that it is going to provide data to the client.
list< unique_ptr< IPSGS_Processor > > m_RegisteredProcessors
@ ePSGS_AnnotationRequest
@ ePSGS_BlobBySatSatKeyRequest
@ ePSGS_IPGResolveRequest
@ ePSGS_AccessionVersionHistoryRequest
@ ePSGS_BlobBySeqIdRequest
CPSGSCounters & GetCounters(void)
COperationTiming & GetTiming(void)
static CPubseqGatewayApp * GetInstance(void)
Interface class (and self-factory) for request processor objects that can retrieve data from a given ...
virtual string GetName(void) const =0
Tells the processor name (used in logging and tracing)
static string StatusToString(EPSGS_Status status)
Converts the processor status to a string for tracing and logging purposes.
virtual EPSGS_Status GetStatus(void)=0
Tells the processor status (if it has finished or in progress)
shared_ptr< CPSGS_Reply > GetReply(void) const
Provides the reply wrapper.
uv_thread_t GetUVThreadId(void) const
Provides the libuv thread id which runs the processor.
shared_ptr< CPSGS_Request > GetRequest(void) const
Provides the user request.
bool IsUVThreadAssigned(void) const
Tells if a libuv thread id has been assigned to the processor.
EPSGS_StartProcessing
Tells wether to continue or not after a processor called SignalStartProcessing() method.
TProcessorPriority GetPriority(void) const
Provides the processor priority.
EPSGS_Status
The GetStatus() method returns a processor current status.
static string StatusToProgressMessage(EPSGS_Status status)
Converts the processor status to a string for protocol message.
const_iterator end() const
const_iterator find(const key_type &key) const
const_iterator find(const key_type &key) const
const_iterator end() const
static bool is_valid(const char *num, int type, CONV_RESULT *cr)
void PrintRequestStop(void)
Print request stop message (for request-driven applications)
void SetBytesWr(Int8 bytes)
CDiagContext & GetDiagContext(void)
Get diag context instance.
static void SetRequestContext(CRequestContext *ctx)
Shortcut to CDiagContextThreadData::GetThreadData().SetRequestContext()
static CRequestContext & GetRequestContext(void)
Shortcut to CDiagContextThreadData::GetThreadData().GetRequestContext()
void SetRequestStatus(int status)
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
void Reset(void)
Reset all properties to the initial state.
@ eDiag_Error
Error message.
@ e499_BrokenConnection
Non-standard status code - used to indicate broken connection while serving normal request.
@ e503_ServiceUnavailable
@ e500_InternalServerError
void Critical(CExceptionArgs_Base &args)
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
const CharType(& source)[N]
static unsigned long int total_count
void request_timer_cb(uv_timer_t *handle)
bool g_AllowProcessorTiming
void erase_processor_group_cb(void *user_data)
void request_timer_close_cb(uv_handle_t *handle)
#define MAX_PROCESSOR_GROUPS
string ToJsonString(const CBioseqInfoRecord &bioseq_info, SPSGS_ResolveRequest::TPSGS_BioseqIncludeData include_data_flags, const string &custom_blob_id)
#define PSG_ERROR(message)
#define PSG_WARNING(message)
@ ePSGS_NotFoundAndNotInstantiated
psg_clock_t::time_point psg_time_point_t
static SLJIT_INLINE sljit_ins msg(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
size_t GetCurrentCount(void) const
void IncrementLimitReachedCount(void)
void IncrementCurrentCount(void)
void DecrementCurrentCount(void)
void GetCurrentAndLimitReachedCounts(size_t *current, size_t *limit_reached)
bool WasSent(const string &annot_name) const
map< string, int > GetErrorNames(void) const
set< string > GetNotFoundNames(void) const
vector< pair< TProcessorPriority, string > > GetProcessedNames(void) const
void ReportResultStatus(const string &annot_name, EPSGS_ResultStatus rs)
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4