*
data=
static_cast<const char*
>(
buffer);
110 if(
data.GetData_compression() ==
data.eData_compression_gzip ) {
173 static interror_rate =
NCBI_PARAM_TYPE(SNP_PROCESSOR, ERROR_RATE)::GetDefault();
174 if( error_rate > 0 ) {
175 static interror_counter = 0;
176 if( ++error_counter >= error_rate ) {
190 #define PARAM_GC_CACHE_SIZE "gc_cache_size" 191 #define PARAM_MISSING_GC_SIZE "missing_gc_size" 192 #define PARAM_FILE_REOPEN_TIME "file_reopen_time" 193 #define PARAM_FILE_RECHECK_TIME "file_recheck_time" 194 #define PARAM_FILE_OPEN_RETRY "file_open_retry" 195 #define PARAM_SPLIT "split" 196 #define PARAM_VDB_FILES "vdb_files" 197 #define PARAM_ANNOT_NAME "annot_name" 198 #define PARAM_ADD_PTIS "add_ptis" 199 #define PARAM_ALLOW_NON_REFSEQ "allow_non_refseq" 200 #define PARAM_SNP_SCALE_LIMIT "snp_scale_limit" 202 #define DEFAULT_GC_CACHE_SIZE 10 203 #define DEFAULT_MISSING_GC_SIZE 10000 204 #define DEFAULT_FILE_REOPEN_TIME 3600 205 #define DEFAULT_FILE_RECHECK_TIME 600 206 #define DEFAULT_FILE_OPEN_RETRY 3 207 #define DEFAULT_SPLIT true 208 #define DEFAULT_VDB_FILES "" 209 #define DEFAULT_ANNOT_NAME "" 210 #define DEFAULT_ADD_PTIS true 211 #define DEFAULT_ALLOW_NON_REFSEQ false 212 #define DEFAULT_SNP_SCALE_LIMIT "" 218m_PreResolving(
false),
219m_ScaleLimit(
CSeq_id::eSNPScaleLimit_Default)
227shared_ptr<CPSGS_Request> request,
228shared_ptr<CPSGS_Reply> reply,
233this, placeholders::_1),
235this, placeholders::_1, placeholders::_2, placeholders::_3, placeholders::_4),
237m_Config(parent.m_Config),
238m_Client(parent.m_Client),
239m_ThreadPool(parent.m_ThreadPool),
241m_Status(ePSGS_InProgress),
243m_PreResolving(
false),
244m_ScaleLimit(
CSeq_id::eSNPScaleLimit_Default)
277 PSG_ERROR(
"CSNPClient: SNP primary track is disabled due to lack of GRPC support");
291 min(3u, max_conn), max_conn)));
298 boolenabled = app->Settings().m_SNPProcessorsEnabled;
326shared_ptr<CPSGS_Reply> reply)
const 329 if(!
x_IsEnabled(*request))
returnvector<string>();
334 catch( exception& exc ) {
335 x_SendError(reply,
"Exception in WhatCanProcess: ", exc);
342shared_ptr<CPSGS_Reply> reply)
const 348 if(!
m_Client->CanProcessRequest(*request, 0))
return false;
351 catch( exception& exc ) {
352 x_SendError(reply,
"Exception in CanProcess: ", exc);
359shared_ptr<CPSGS_Request> request,
360shared_ptr<CPSGS_Reply> reply,
370 catch( exception& exc ) {
371 x_SendError(reply,
"Exception in CreateProcessor: ", exc);
392reply->PrepareProcessorMessage(reply->GetItemId(),
"SNP",
msg,
406 const string&
msg,
constexception& exc)
429 autoreq_type =
GetRequest()->GetRequestType();
445 catch(exception& exc) {
446 x_SendError(
"Exception when handling a request: ", exc);
471 if(!annot_request.
m_SeqId.empty()) {
475 catch(exception& e) {
483 for(
auto&
id: annot_request.
m_SeqIds) {
487 catch(exception& e) {
507 boolneed_resolve =
true;
511need_resolve =
false;
527 if(!
m_Config->m_AllowNonRefSeq) {
546 if(!
m_Client->IsValidSeqId(
id))
continue;
556 catch(exception& exc) {
559 data.m_Error =
"Exception when handling get_na request: "+
string(exc.what());
581s.m_Error =
"simulated SNP processor error";
609 if( !
data.m_Error.empty() ) {
618 catch(exception& exc) {
630 if( has_error.count(name) ) {
633 else if( !has_data.count(name) ) {
804GetTiming().Register(
this,
operation, status, start, blob_size);
812 size_tblob_size = 0;
813 for(
auto& chunk :
data.GetData() ) {
814blob_size += chunk->size();
830COSSWriter writer(
data.SetData());
844 kSNPProcessorName+
" processor stops processing request because a higher priority processor has already processed it",
852ostringstream annot_str;
853 for(
auto& it :
data.m_AnnotInfo) {
929 size_titem_id = reply.GetItemId();
931reply.PrepareBlobPropData(item_id,
GetName(), psg_blob_id, data_to_send);
932reply.PrepareBlobPropCompletion(item_id,
GetName(), 2);
938 size_titem_id =
GetReply()->GetItemId();
940 for(
auto& chunk :
data.GetData() ) {
942(
const unsigned char*)chunk->data(), chunk->size(), chunk_no++);
950 size_titem_id =
GetReply()->GetItemId();
952 GetReply()->PrepareTSEBlobPropData(item_id,
GetName(), chunk_id, id2_info, data_to_send);
959 size_titem_id =
GetReply()->GetItemId();
961 for(
auto& chunk :
data.GetData() ) {
963(
const unsigned char*)chunk->data(), chunk->size(), chunk_no++,
966 GetReply()->PrepareTSEBlobCompletion(item_id,
GetName(), chunk_no+1);
1050 auto&
info= bioseq_resolution.GetBioseqInfo();
1051 if(
m_Client->IsValidSeqId(
info.GetAccession(),
info.GetSeqIdType(),
info.GetVersion())) {
1056 for(
auto& it :
info.GetSeqIds()) {
1058 if(!
m_Client->IsValidSeqId(get<1>(it), get<0>(it)))
continue;
1060 PSG_ERROR(
"Error parsing seq-id: "<< (err.empty() ? get<1>(it) : err));
1078 const string& message)
1135 if(details->InPeek()) {
1138details->SetInPeek(
true);
1139 x_Peek(details, need_wait);
1140details->SetInPeek(
false);
1157details->SetExcludeBlobCacheCompleted();
1166 if(!fetch_details->GetLoader())
return true;
1168 boolfinal_state =
false;
1170 if(!fetch_details->ReadFinished()) {
1171final_state = fetch_details->GetLoader()->Wait();
1172 if(final_state) fetch_details->SetReadFinished();
1175 if(fetch_details->GetLoader()->HasError() &&
1178 string error= fetch_details->GetLoader()->LastError();
1183fetch_details->GetLoader()->ClearError();
1184fetch_details->SetReadFinished();
User-defined methods of the data storage class.
User-defined methods of the data storage class.
User-defined methods of the data storage class.
CBlobRecord & SetNChunks(int32_t value)
CBlobRecord & SetId2Info(string const &value)
CBlobRecord & SetGzip(bool value)
string Repr(TReprFlags flags=0) const
Return a string representation of this node.
void SetString(const string &key, const string &value)
Set a JSON object element to the specified string value.
static CJsonNode NewObjectNode()
Create a new JSON object node.
list< TOctetString * > TOctetStringSequence
virtual ERW_Result Flush(void)
Flush pending data (if any) down to the output device.
COSSWriter(TOctetStringSequence &out)
TOctetStringSequence & m_Output
vector< char > TOctetString
virtual ERW_Result Write(const void *buffer, size_t count, size_t *written)
Write up to "count" bytes from the buffer pointed to by the "buf" argument onto the output device.
CObjectOStreamAsnBinary â.
void UpdateOverallStatus(CRequestStatus::ECode status)
list< unique_ptr< CCassFetch > > m_FetchDetails
bool AreAllFinishedRead(void) const
void SignalFinishProcessing(void)
virtual void Cancel(void) override
The infrastructure request to cancel processing.
@ ePSGS_AnnotationRequest
@ ePSGS_BlobBySatSatKeyRequest
TRequest & GetRequest(void)
void ResolveInputSeqId(void)
void x_ProcessAnnotationRequest(void)
IPSGS_Processor * CreateProcessor(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, TProcessorPriority priority) const override
Create processor to fulfil PSG request using the data source.
shared_ptr< CSNPClient > m_Client
void x_UnlockRequest(void)
void x_RegisterTimingFound(psg_time_point_t start, EPSGOperation operation, const objects::CID2_Reply_Data &data)
void x_RegisterTiming(psg_time_point_t start, EPSGOperation operation, EPSGOperationStatus status, size_t size)
void x_SendAnnotInfo(void)
void x_WriteData(objects::CID2_Reply_Data &data, const CSerialObject &obj) const
string GetName(void) const override
Tells the processor name (used in logging and tracing)
bool x_IsEnabled(CPSGS_Request &request) const
void x_Finish(EPSGS_Status status)
void x_Peek(bool need_wait)
void x_RegisterTimingNotFound(EPSGOperation operation)
void x_SendChunkBlobProps(const string &id2_info, int chunk_id, CBlobRecord &blob_props)
bool CanProcess(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply) const override
Tells if processor can process the given request.
bool x_SignalStartProcessing()
void ProcessEvent(void) override
Called when an event happened which may require to have some processing.
~CPSGS_SNPProcessor(void) override
shared_ptr< ncbi::CThreadPool > m_ThreadPool
void x_InitClient(void) const
void x_ReportResultStatusForAllNA(SPSGS_AnnotRequest::EPSGS_ResultStatus status)
shared_ptr< SSNPProcessor_Config > m_Config
static void x_SendError(shared_ptr< CPSGS_Reply > reply, const string &msg)
void x_SendBlobData(const string &psg_blob_id, const objects::CID2_Reply_Data &data)
void x_SendChunkBlobData(const string &id2_info, int chunk_id, const objects::CID2_Reply_Data &data)
vector< SSNPData > m_SNPData
void x_OnSeqIdResolveFinished(SBioseqResolution &&bioseq_resolution)
EPSGS_Status GetStatus(void) override
Tells the processor status (if it has finished or in progress)
string GetGroupName(void) const override
Tells the processor group name.
void x_ProcessBlobBySatSatKeyRequest(void)
vector< CSeq_id_Handle > m_SeqIds
vector< string > WhatCanProcess(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply) const override
Needs to be implemented only for the ID/get_na requests.
void OnGotBlobByBlobId(void)
void x_SendBlobProps(const string &psg_blob_id, CBlobRecord &blob_props)
void OnGotAnnotation(void)
void Cancel(void) override
The infrastructure request to cancel processing.
void x_ProcessTSEChunkRequest(void)
vector< string > m_ProcessNAs
void Process(void) override
Main processing function.
void GetBlobByBlobId(void)
void x_OnSeqIdResolveError(CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
void x_SendMainEntry(const SSNPData &data)
void x_SendSplitInfo(const SSNPData &data)
objects::CSeq_id::ESNPScaleLimit m_ScaleLimit
void x_OnResolutionGoodData(void)
static CPubseqGatewayApp * GetInstance(void)
virtual EStatus Execute(void) override
Do the actual job.
CSNPThreadPoolTask_GetAnnotation(CPSGS_SNPProcessor &processor)
CPSGS_SNPProcessor & m_Processor
virtual EStatus Execute(void) override
Do the actual job.
CSNPThreadPoolTask_GetBlobByBlobId(CPSGS_SNPProcessor &processor)
CPSGS_SNPProcessor & m_Processor
virtual EStatus Execute(void) override
Do the actual job.
CPSGS_SNPProcessor & m_Processor
CSNPThreadPoolTask_GetChunk(CPSGS_SNPProcessor &processor)
Abstract class for representing single task executing in pool of threads To use this class in applica...
Main class implementing functionality of pool of threads.
Writer-based output stream.
Interface class (and self-factory) for request processor objects that can retrieve data from a given ...
shared_ptr< CPSGS_Reply > GetReply(void) const
Provides the reply wrapper.
shared_ptr< CPSGS_Request > GetRequest(void) const
Provides the user request.
bool IsUVThreadAssigned(void) const
Tells if a libuv thread id has been assigned to the processor.
shared_ptr< CPSGS_Reply > m_Reply
TProcessorPriority GetPriority(void) const
Provides the processor priority.
EPSGS_Status
The GetStatus() method returns a processor current status.
void PostponeInvoke(CPSGS_UvLoopBinder::TProcessorCB cb, void *user_data)
The provided callback will be called from the libuv loop assigned to the processor.
EPSGS_StartProcessing SignalStartProcessing(void)
A processor should call the method when it decides that it successfully started processing the reques...
shared_ptr< CPSGS_Request > m_Request
EPSGS_SeqIdParsingResult ParseInputSeqId(objects::CSeq_id &seq_id, const string &request_seq_id, int request_seq_id_type, string *err_msg=nullptr)
Parse seq-id from a string and type representation.
TProcessorPriority m_Priority
A very basic data-write interface.
iterator_bool insert(const value_type &val)
std::ofstream out("events_result.xml")
main entry point for tests
void swap(NCBI_NS_NCBI::pair_base_member< T1, T2 > &pair1, NCBI_NS_NCBI::pair_base_member< T1, T2 > &pair2)
EDiagSev
Severity level for the posted diagnostics.
@ eDiag_Error
Error message.
@ e500_InternalServerError
#define MSerial_AsnBinary
static ESNPScaleLimit GetSNPScaleLimit_Value(const string &name)
static CSeq_id_Handle GetHandle(const CSeq_id &id)
Normal way of getting a handle, works for any seq-id.
#define NCBI_PARAM_TYPE(section, name)
Generate typename for a parameter from its {section, name} attributes.
virtual bool GetBool(const string §ion, const string &name, bool default_value, TFlags flags=0, EErrAction err_action=eThrow) const
Get boolean value of specified parameter name.
virtual int GetInt(const string §ion, const string &name, int default_value, TFlags flags=0, EErrAction err_action=eThrow) const
Get integer value of specified parameter name.
virtual string GetString(const string §ion, const string &name, const string &default_value, TFlags flags=0) const
Get the parameter string value.
ERW_Result
Result codes for I/O operations.
@ eRW_Success
Everything is okay, I/O completed.
static string Base64Encode(const CTempString str, size_t line_len=0)
Base64-encode string.
static bool EqualNocase(const CTempString s1, SIZE_TYPE pos, SIZE_TYPE n, const char *s2)
Case-insensitive equality of a substring with another string.
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
EStatus
Status of the task.
@ eCompleted
executed successfully
#define DEFINE_STATIC_FAST_MUTEX(id)
Define static fast mutex and initialize it.
operation
Bit operations.
@ eData_format_asn_binary
E_Choice
Choice variants.
const int64_t kSplitInfoChunk
Pool of generic task-executing threads.
string ToJsonString(const CBioseqInfoRecord &bioseq_info, SPSGS_ResolveRequest::TPSGS_BioseqIncludeData include_data_flags, const string &custom_blob_id)
#define PSG_ERROR(message)
chrono::steady_clock psg_clock_t
psg_clock_t::time_point psg_time_point_t
Reader-writer based streams.
static SLJIT_INLINE sljit_ins msg(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
NCBI_PARAM_DEF(int, SNP_PROCESSOR, ERROR_RATE, 0)
#define DEFAULT_FILE_REOPEN_TIME
static bool s_SimulateError()
#define PARAM_FILE_RECHECK_TIME
static void s_OnGotAnnotation(void *data)
#define DEFAULT_SNP_SCALE_LIMIT
#define PARAM_ALLOW_NON_REFSEQ
#define DEFAULT_FILE_RECHECK_TIME
NCBI_PARAM_DECL(int, SNP_PROCESSOR, ERROR_RATE)
#define DEFAULT_GC_CACHE_SIZE
static const string kSNPProcessorGroupName
#define PARAM_GC_CACHE_SIZE
#define PARAM_FILE_REOPEN_TIME
#define PARAM_MISSING_GC_SIZE
#define PARAM_FILE_OPEN_RETRY
#define DEFAULT_ANNOT_NAME
#define DEFAULT_FILE_OPEN_RETRY
static const string kParamMaxConn
#define DEFAULT_MISSING_GC_SIZE
#define PARAM_SNP_SCALE_LIMIT
#define DEFAULT_ALLOW_NON_REFSEQ
static const string kSNPProcessorSection
static const int kDefaultMaxConn
static const string kSNPProcessorName
static void s_OnGotBlobByBlobId(void *data)
static void s_OnGotChunk(void *data)
void s_SetBlobDataProps(CBlobRecord &blob_props, const CID2_Reply_Data &data)
const string kSNPProcessorEvent
TProcessorPriority RegisterProcessedName(TProcessorPriority priority, const string &name)
optional< CSeq_id::ESNPScaleLimit > m_SNPScaleLimit
vector< string > m_SeqIds
void ReportResultStatus(const string &annot_name, EPSGS_ResultStatus rs)
vector< string > m_DisabledProcessors
vector< string > m_EnabledProcessors
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4