std::placeholders;
55shared_ptr<CPSGS_Request> request,
56shared_ptr<CPSGS_Reply> reply,
63this, _1, _2, _3, _4),
70this, _1, _2, _3, _4, _5),
72this, _1, _2, _3, _4, _5)),
87shared_ptr<CPSGS_Reply> reply)
const 96 if(valid_annots.empty())
100 autostartup_data_state = app->GetStartupDataState();
102 if(request->NeedTrace()) {
104 " request because Cassandra DB is not available.\n"+
106request->GetStartTimestamp());
117shared_ptr<CPSGS_Reply> reply)
const 125shared_ptr<CPSGS_Reply> reply,
138vector<string> valid_annots;
139 for(
const auto& name :
names) {
141valid_annots.push_back(name);
183 const string& message)
219message, status,
code,
240vector<SSatInfoEntry> bioseq_na_keyspaces = app->GetBioseqNAKeyspaces();
242 for(
const auto& bioseq_na_keyspace : bioseq_na_keyspaces) {
243unique_ptr<CCassNamedAnnotFetch> details;
252bioseq_na_keyspace.keyspace,
254bioseq_resolution.GetBioseqInfo().GetVersion(),
255bioseq_resolution.GetOriginalSeqIdType(),
258details->SetLoader(fetch_task);
264 this, _1, _2, _3, _4),
265details.get(), bioseq_na_keyspace.sat));
269 this, _1, _2, _3, _4, _5),
285 if(!fetch_details->ReadFinished()) {
286fetch_details->GetLoader()->Wait();
304}
else if(other_proc_priority <
m_Priority) {
309 "Bioseq info has already been sent by the other processor. " 310 "The data are to be sent because the other processor priority ("+
311to_string(other_proc_priority) +
") is lower than mine ("+
319 "Skip sending bioseq info because the other processor with priority "+
320to_string(other_proc_priority) +
" has already sent it " 321 "(my priority is "+ to_string(
m_Priority) +
")",
365 "Named annotation no-more-data callback",
369 "Named annotation data received",
385 "while the output has finished, ignoring");
435 boolannot_was_sent =
false;
444annot_was_sent =
true;
445}
else if(other_proc_priority <
m_Priority) {
450 "The NA name "+ annot_record.GetAnnotName() +
" has already " 451 "been processed by the other processor. The data are to be sent" 452 " because the other processor priority ("+
453to_string(other_proc_priority) +
") is lower than mine ("+
460annot_was_sent =
true;
465 "Skip sending NA name "+ annot_record.GetAnnotName() +
466 " because the other processor with priority "+
467to_string(other_proc_priority) +
" has already processed it " 468 "(my priority is "+ to_string(
m_Priority) +
")",
473 if(annot_was_sent) {
477annot_record.GetModified());
487 const string& message)
496 boolis_error =
IsError(severity);
524 if(fetch_completion_status < best_status) {
525best_status = fetch_completion_status;
567 "Retrieve blob props for "+ to_string(sat) +
"."+ to_string(sat_key) +
568 " to check if the blob size is small (if so to send it right away).",
578app->GetCounters().Increment(
this,
582to_string(blob_id.
m_Sat) +
583 " to a Cassandra keyspace while requesting the blob props";
602 boolneed_to_check_blob_exclude_cache =
610 if(need_to_check_blob_exclude_cache) {
611 boolcompleted =
true;
614 if(app->GetExcludeBlobCache()->IsInCache(
617completed, completed_time)) {
619 boolfinish_processing =
true;
629sent_mks_ago < m_AnnotRequest->m_ResendTimeoutMks) {
641app->GetExcludeBlobCache()->Remove(
644finish_processing =
false;
651 if(finish_processing) {
658unique_ptr<CCassBlobFetch> fetch_details;
661unique_ptr<CBlobRecord> blob_record(
new CBlobRecord);
664 autoblob_prop_cache_lookup_result =
676std::move(blob_record),
678fetch_details->SetLoader(load_task);
685trace_msg =
"Blob properties are not found";
687trace_msg =
"Blob properties are not found (cache lookup error)";
693fetch_details->RemoveFromExcludeBlobCache();
704fetch_details->SetLoader(load_task);
711 this, _1, _2, _3, _4, _5),
712fetch_details.get()));
724 "Cassandra request: "+
754 "Blob properties are not found",
767 unsigned intmax_to_send =
max(app->Settings().m_SendBlobIfSmall,
769 if(blob.
GetSize() > max_to_send) {
773 "Blob size is too large ("+ to_string(blob.
GetSize()) +
774 " > "+ to_string(max_to_send) +
" max allowed to send)",
815 const string& message)
841 const unsigned char* chunk_data,
842 unsigned intdata_size,
846chunk_data, data_size, chunk_no);
894 booloverall_final_state =
false;
901 if(details->InPeek()) {
904details->SetInPeek(
true);
905overall_final_state |=
x_Peek(details, need_wait);
906details->SetInPeek(
false);
922details->SetExcludeBlobCacheCompleted();
929 if(overall_final_state) {
942 if(!fetch_details->GetLoader())
945 boolfinal_state =
false;
947 if(!fetch_details->ReadFinished()) {
948final_state = fetch_details->GetLoader()->Wait();
952 if(fetch_details->GetLoader()->HasError() &&
956 string error= fetch_details->GetLoader()->LastError();
970fetch_details->GetLoader()->ClearError();
971fetch_details->SetReadFinished();
static const string kAnnotProcessorName
static CRegexp kAnnotNameRegexp("^NA\\d+\\.\\d+$", CRegexp::fCompile_ignore_case)
const string kCassandraProcessorGroupName
const string kCassandraProcessorEvent
CCassBlobTaskLoadBlob * GetLoader(void)
void SetDataReadyCB(shared_ptr< CCassDataCallbackReceiver > callback)
void SetPropsCallback(TBlobPropsCallback callback)
void SetErrorCB(TDataErrorCallback error_cb)
void SetReadFinished(void)
void SetDataReadyCB(shared_ptr< CCassDataCallbackReceiver > callback)
void SetConsumeCallback(TNAnnotConsumeCallback callback)
CCassNAnnotTaskFetch * GetLoader(void)
EPSGS_CacheLookupResult LookupBlobProp(IPSGS_Processor *processor, int sat, int sat_key, int64_t &last_modified, CBlobRecord &blob_record)
@ ePSGS_ServerSatToSatNameError
void Increment(IPSGS_Processor *processor, EPSGS_CounterType counter)
static vector< string > x_FilterNames(const vector< string > &names)
static bool x_IsNameValid(const string &name)
void x_Peek(bool need_wait)
vector< CRequestStatus::ECode > m_AnnotFetchCompletions
void x_OnSeqIdResolveFinished(SBioseqResolution &&bioseq_resolution)
virtual IPSGS_Processor * CreateProcessor(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, TProcessorPriority priority) const
Create processor to fulfil PSG request using the data source.
bool x_OnNamedAnnotData(CNAnnotRecord &&annot_record, bool last, CCassNamedAnnotFetch *fetch_details, int32_t sat)
void OnGetBlobError(CCassBlobFetch *fetch_details, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
SPSGS_AnnotRequest * m_AnnotRequest
bool x_NeedToRequestBlobProp(void)
virtual void Process(void)
Main processing function.
void OnGetBlobProp(CCassBlobFetch *fetch_details, CBlobRecord const &blob, bool is_found)
vector< string > m_ValidNames
void x_RequestBlobProp(int32_t sat, int32_t sat_key, int64_t last_modified)
virtual string GetGroupName(void) const
Tells the processor group name.
virtual EPSGS_Status GetStatus(void)
Tells the processor status (if it has finished or in progress)
virtual void ProcessEvent(void)
Called when an event happened which may require to have some processing.
virtual ~CPSGS_AnnotProcessor()
virtual vector< string > WhatCanProcess(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply) const
Needs to be implemented only for the ID/get_na requests.
void x_SendAnnotDataToClient(CNAnnotRecord &&annot_record, int32_t sat)
void x_OnNamedAnnotError(CCassNamedAnnotFetch *fetch_details, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
void OnAnnotBlobProp(CCassBlobFetch *fetch_details, CBlobRecord const &blob, bool is_found)
void x_OnSeqIdResolveError(CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
void x_OnResolutionGoodData(void)
virtual string GetName(void) const
Tells the processor name (used in logging and tracing)
void x_SendBioseqInfo(SBioseqResolution &bioseq_resolution)
virtual bool CanProcess(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply) const
Tells if processor can process the given request.
void OnGetBlobChunk(CCassBlobFetch *fetch_details, CBlobRecord const &blob, const unsigned char *chunk_data, unsigned int data_size, int chunk_no)
EPSGS_AccessionAdjustmentResult AdjustBioseqAccession(SBioseqResolution &bioseq_resolution)
void OnGetBlobChunk(bool cancelled, CCassBlobFetch *fetch_details, const unsigned char *chunk_data, unsigned int data_size, int chunk_no)
void OnGetBlobProp(CCassBlobFetch *fetch_details, CBlobRecord const &blob, bool is_found)
void OnGetBlobError(CCassBlobFetch *fetch_details, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
CRequestStatus::ECode CountError(CCassFetch *fetch_details, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message, EPSGS_LoggingFlag logging_flag, EPSGS_StatusUpdateFlag status_update_flag)
void UpdateOverallStatus(CRequestStatus::ECode status)
bool IsCassandraProcessorEnabled(shared_ptr< CPSGS_Request > request) const
CRequestStatus::ECode m_Status
bool IsError(EDiagSev severity) const
list< unique_ptr< CCassFetch > > m_FetchDetails
bool AreAllFinishedRead(void) const
IPSGS_Processor::EPSGS_Status GetStatus(void) override
Tells the processor status (if it has finished or in progress)
void SignalFinishProcessing(void)
@ ePSGS_AnnotationRequest
void ResolveInputSeqId(void)
CPSGSCounters & GetCounters(void)
static CPubseqGatewayApp * GetInstance(void)
Interface class (and self-factory) for request processor objects that can retrieve data from a given ...
shared_ptr< CPSGS_Reply > m_Reply
EPSGS_Status
The GetStatus() method returns a processor current status.
shared_ptr< CPSGS_Request > m_Request
TProcessorPriority m_Priority
iterator_bool insert(const value_type &val)
const_iterator find(const key_type &key) const
const_iterator end() const
static const struct name_t names[]
static DLIST_TYPE *DLIST_NAME() last(DLIST_LIST_TYPE *list)
EDiagSev
Severity level for the posted diagnostics.
@ eDiag_Error
Error message.
@ e500_InternalServerError
bool IsMatch(CTempString str, TMatch flags=fMatch_default)
Check existence substring which match a specified pattern.
string StripTrailingVerticalBars(const string &seq_id)
string ToJsonString(const CBioseqInfoRecord &bioseq_info, SPSGS_ResolveRequest::TPSGS_BioseqIncludeData include_data_flags, const string &custom_blob_id)
#define PSG_ERROR(message)
#define PSG_WARNING(message)
@ ePSGS_UnknownResolvedSatellite
const int kUnknownPriority
psg_clock_t::time_point psg_time_point_t
unsigned long GetTimespanToNowMks(const psg_time_point_t &t_point)
string GetCassStartupDataStateMessage(EPSGS_StartupDataState state)
EPSGS_ResolutionResult m_ResolutionResult
CBioseqInfoRecord & GetBioseqInfo(void)
bool AdjustName(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply)
string ToString(void) const
CBioseqInfoRecord::TSat m_Sat
optional< SSatInfoEntry > m_Keyspace
bool MapSatToKeyspace(void)
CBioseqInfoRecord::TSatKey m_SatKey
TProcessorPriority RegisterProcessedName(TProcessorPriority priority, const string &name)
vector< string > GetNotProcessedName(TProcessorPriority priority)
TProcessorPriority RegisterBioseqInfo(TProcessorPriority priority)
void ReportBlobError(TProcessorPriority priority, EPSGS_ResultStatus rs)
unsigned long m_ResendTimeoutMks
void ReportResultStatus(const string &annot_name, EPSGS_ResultStatus rs)
EPSGS_CacheAndDbUse m_UseCache
unsigned long m_SendBlobIfSmall
EPSGS_TSEOption m_TSEOption
C++ wrappers for the Perl-compatible regular expression (PCRE) library.
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4