(
const string& psg_blob_id)
146: m_AddWGSMasterDescr(add_wgs_master),
147m_DataSource(data_source),
171 _ASSERT(!tse_slot->m_DL_Blob_id);
172tse_slot->m_DL_Blob_id = dl_blob_id;
202tse_slot->m_LockedDelayedChunkInfo = &locked_chunk_info;
210 autodl_blob_id = locked_chunk_info.
GetBlobId();
214 static_cast<SChunkSlot*
>(chunk_slot)->m_LockedChunkInfo = &locked_chunk_info;
223 return&iter->second;
232 auto[ iter, inserted ] =
m_TSEBlobMap.try_emplace(blob_id, blob_id);
234 _TRACE(
Descr()<<
": TSE slot for blob_id="<<blob_id);
236 _ASSERT(iter->second.m_PSG_Blob_id == blob_id);
237 return&iter->second;
246 return&iter->second;
255 auto[ iter, inserted ] =
m_SplitBlobMap.try_emplace(id2_info, id2_info);
257 _TRACE(
Descr()<<
": Split slot for id2_info="<<id2_info);
259 _ASSERT(iter->second.m_Id2Info == id2_info);
260 return&iter->second;
272 autoiter2 = split_slot->m_ChunkMap.find(chunk_id);
273 if( iter2 != split_slot->m_ChunkMap.end() ) {
274 return&iter2->second;
289 auto[ iter, inserted ] = split_slot->m_ChunkMap.try_emplace(chunk_id, split_slot);
291 _TRACE(
Descr()<<
": Blob slot for id2_info="<<id2_info<<
" chunk="<<chunk_id);
293 return&iter->second;
336 else if( chunk_id ) {
350 else if( chunk_id ) {
360 if(
autotse_slot =
GetTSESlot(psg_blob_id) ) {
361 if( tse_slot->m_PsgBlobInfo ) {
362 state|= tse_slot->m_PsgBlobInfo->blob_state_flags;
372 autoblob_id =
dynamic_cast<const CPSG_BlobId*
>(id);
373 autochunk_id = blob_id?
nullptr:
dynamic_cast<const CPSG_ChunkId*
>(id);
374 returnmake_tuple(blob_id, chunk_id);
381 returnblob_id->
GetId();
394 constshared_ptr<CPSG_ReplyItem>& item)
397 switch(item->GetType()) {
402 if(
autoblob_info = dynamic_pointer_cast<CPSG_BlobInfo>(item) ) {
403 auto[ blob_id, chunk_id ] =
ParseId(blob_info->GetId());
404 boolready_to_OM =
false;
407 autotse_slot =
SetTSESlot(blob_id->GetId());
409 autoparsed_info = make_shared<SPsgBlobInfo>(*blob_info);
414 if( !parsed_info->id2_info.empty() ) {
416 if( !split_slot->m_TSESlot ) {
417 _TRACE(
Descr()<<
": link TSE "<<blob_id->GetId()<<
" to split "<<parsed_info->id2_info);
418split_slot->m_TSESlot = tse_slot;
419tse_slot->m_SplitSlot = split_slot;
420tse_slot->m_Skipped = std::move(split_slot->m_Skipped);
421tse_slot->m_SkippedWaitDeadline = std::move(split_slot->m_SkippedWaitDeadline);
423 _ASSERT(split_slot->m_TSESlot == tse_slot);
424 _ASSERT(tse_slot->m_SplitSlot == split_slot);
426ready_to_OM =
GetDLBlobId(tse_slot) && split_slot->m_BlobObject;
429tse_slot->m_PsgBlobInfo = parsed_info;
449 string msg= item->GetNextMessage();
450 if(
msg.empty() ) {
456 if(
auto data= dynamic_pointer_cast<CPSG_BlobData>(item) ) {
457 auto[ blob_id, chunk_id ] =
ParseId(
data->GetId());
458 if(
autoslot =
SetBlobSlot(blob_id, chunk_id) ) {
460slot->m_BlobDataStatus = status;
461slot->m_BlobData =
data;
463 if( slot->IsReadyToDeserialize() ) {
472 if(
autoskipped = dynamic_pointer_cast<CPSG_SkippedBlob>(item) ) {
474 auto[ blob_id, chunk_id ] =
ParseId(skipped->GetId());
476 if(
autotse_slot =
SetTSESlot(*blob_id) ) {
477tse_slot->m_Skipped = skipped;
486 if(
autosplit_slot =
SetSplitSlot(chunk_id->GetId2Info()) ) {
487 if(
autotse_slot = split_slot->m_TSESlot ) {
488tse_slot->m_Skipped = skipped;
497split_slot->m_Skipped = skipped;
509 if(
autoprocessor = dynamic_pointer_cast<CPSG_Processor>(item) ) {
524 constshared_ptr<CPSG_ReplyItem>& item)
526 switch(item->GetType()) {
528 if(
autoblob_info = dynamic_pointer_cast<CPSG_BlobInfo>(item) ) {
533 if(
auto data= dynamic_pointer_cast<CPSG_BlobData>(item) ) {
539 if(
autoskipped = dynamic_pointer_cast<CPSG_SkippedBlob>(item) ) {
556 auto[ blob_id, chunk_id ] =
ParseId(
id);
569 if( ready_data && !
ParseTSE(blob_id, slot) ) {
572 return TSE_ToOM(blob_id, chunk_id, slot);
578 return TSE_ToOM(blob_id, chunk_id, slot);
581 _ASSERT(ready_object || ready_data);
582 if( ready_data && !
ParseChunk(chunk_id, slot) ) {
595shared_ptr<CPSG_BlobInfo> blob_info;
596shared_ptr<CPSG_BlobData> blob_data;
609 LOG_POST(
"PSGBlobProcessor("<<
this<<
"): cannot open data stream for "<<
631shared_ptr<CPSG_BlobInfo> blob_info;
632shared_ptr<CPSG_BlobData> blob_data;
644 LOG_POST(
"PSGBlobProcessor("<<
this<<
"): cannot open data stream for "<<
666shared_ptr<CPSG_BlobInfo> blob_info;
667shared_ptr<CPSG_BlobData> blob_data;
679 LOG_POST(
"PSGBlobProcessor("<<
this<<
"): cannot open data stream for "<<
700 _ASSERT(blob_id || split_info_id);
716tse_slot =
static_cast<STSESlot*
>(data_slot);
724split_slot =
static_cast<SSplitSlot*
>(data_slot);
738 if( !entry && split_slot ) {
745 if( !entry && !split_info ) {
750 return x_Failed(
"GetBlobByIdShouldFail=true for: "+dl_blob_id->
ToString());
753 LOG_POST(
Info<<
"PSGBlobProcessor("<<
this<<
"): getting TSE load lock: "<<dl_blob_id->
ToString());
757 LOG_POST(
Info<<
"PSGBlobProcessor("<<
this<<
"): got TSE load lock: "<<dl_blob_id->
ToString());
762 autoblob_state = tse_slot->
m_PsgBlobInfo->blob_state_flags;
777 if( !delayed_main_chunk ) {
779 if( !delayed_main_chunk->
IsLoaded() ) {
782 if( !delayed_main_chunk_load_lock.
get() || !*delayed_main_chunk_load_lock.
get() ) {
784delayed_main_chunk_load_lock.
reset();
785delayed_main_chunk =
nullptr;
789 if( delayed_main_chunk && delayed_main_chunk->
IsLoaded() ) {
791delayed_main_chunk_load_lock.
reset();
792delayed_main_chunk =
nullptr;
795 if( !load_lock.
IsLoaded() || delayed_main_chunk ) {
822 if( delayed_main_chunk ) {
825 "calling delayed_main_chunk->SetLoaded() for "<<dl_blob_id->
ToString());
830 "delayed TSE chunk loaded: "<<dl_blob_id->
ToString());
836 "calling SetLoaded() for "<<dl_blob_id->
ToString());
841 "TSE loaded: "<<dl_blob_id->
ToString());
882 if(
autosplit_slot = chunk_slot->
m_SplitSlot) {
883 if(
autotse_slot = split_slot->m_TSESlot ) {
884tse_lock = tse_slot->m_TSE_Lock;
917vector<pair<TChunkId, CRef<CID2S_Chunk>>> ready_chunks;
924 for(
auto& [ chunk_id, chunk_slot ] : split_slot->m_ChunkMap ) {
925 if( chunk_slot.m_BlobObject ) {
928ready_chunks.push_back(make_pair(chunk_id, chunk));
929chunk_slot.m_BlobObject =
null;
936 for(
auto[
id, chunk ] : ready_chunks ) {
939 if( chunk_load_lock.
get() && *chunk_load_lock.
get() ) {
951 auto[ blob_id, chunk_id ] =
ParseId(
id);
959 autosplit_slot =
GetSplitSlot(chunk_id->GetId2Info());
960tse_slot = split_slot->m_TSESlot;
982 if( psg_blob_id.GetId2Info().empty() ) {
985 if(
autosplit_slot =
GetSplitSlot(psg_blob_id.GetId2Info()) ) {
986 for(
auto& [ chunk_id, chunk_slot ] : split_slot->m_ChunkMap ) {
988 if( chunk_slot.m_BlobObject ) {
1004 if( !dl_blob_id ) {
1010 LOG_POST(
Info<<
"PSGBlobProcessor("<<
this<<
"): " 1011 "getting loaded TSE lock: "<<dl_blob_id->ToString()<<
1019 LOG_POST(
Info<<
"PSGBlobProcessor("<<
this<<
"): " 1020 "getting loaded TSE lock: "<<dl_blob_id->ToString());
1027 LOG_POST(
Info<<
"PSGBlobProcessor("<<
this<<
"): " 1028 "didn't get loaded TSE lock: "<<dl_blob_id->ToString());
1033 LOG_POST(
Info<<
"PSGBlobProcessor("<<
this<<
"): " 1034 "got loaded TSE lock: "<<dl_blob_id->ToString());
1069 returnmake_unique<CDeadline>(
CTimeout(timeout));
1077 return "in progress";
1093 if( blob_id.empty() ) {
1095 return x_Failed(
"ProcessReply(): empty blob id");
1104 _TRACE(
Descr()<<
": ProcessReply(): processed w/o TSE");
1107 if( !tse_slot->m_TSE_Lock && tse_slot->m_Skipped ) {
1112 _TRACE(
Descr()<<
": ProcessReply(): couldn't get TSE lock");
1115 _ASSERT(tse_slot->m_TSE_Lock);
1117 if( tse_slot->m_TSE_Lock ) {
1119tse_lock = tse_slot->m_TSE_Lock;
1126pair<CTSE_Chunk_Info*, CRef<CID2S_Chunk>>
1140 if( chunk_info && chunk_object ) {
1141 returnmake_pair(chunk_info, chunk_object);
1146 returnmake_pair(
nullptr,
null);
User-defined methods of the data storage class.
User-defined methods of the data storage class.
CTSE_LoadLock GetTSE_LoadLock(const TBlobId &blob_id)
CTSE_LoadLock GetTSE_LoadLockIfLoaded(const TBlobId &blob_id)
CTSE_LoadLock GetLoadedTSE_Lock(const TBlobId &blob_id, const CDeadline &deadline)
void Add(const TKey &key, const TValue &value)
CPSGBlobInfoCache m_BlobInfoCache
static bool GetGetBlobByIdShouldFail()
EProcessResult AssignChunks(STSESlot *tse_slot)
EProcessResult ObtainSkippedTSE_Lock(STSESlot *slot, EWaitForLock wait_for_lock=eNoWaitForLock)
void SetDLBlobId(const string &psg_blob_id, const CConstRef< CPsgBlobId > &dl_blob_id)
bool ParseSplitInfo(const CPSG_ChunkId *split_info_id, SBlobSlot *data_slot)
void SetLockedDelayedChunkInfo(const string &psg_blob_id, CTSE_Chunk_Info &locked_chunk_info)
EProcessResult ProcessItemSlow(EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item) override
EProcessResult ProcessItemFast(EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item) override
EProcessResult ProcessTSE_Lock(const string &blob_id, CTSE_Lock &tse_lock, EWaitForLock wait_for_lock=eNoWaitForLock)
static const char * GetSkippedType(const CPSG_SkippedBlob &skipped)
unique_ptr< CDeadline > GetWaitDeadline(const CPSG_SkippedBlob &skipped) const
static const TChunkId kSplitInfoChunkId
SBlobSlot * GetBlobSlot(const CPSG_BlobId *blob_id, const CPSG_ChunkId *chunk_id)
STSESlot * SetTSESlot(const string &blob_id)
pair< CTSE_Chunk_Info *, CRef< CID2S_Chunk > > GetNextLoadedChunk()
CFastMutex m_BlobProcessorMutex
bool HasChunksToAssign(const CTSE_Lock &tse)
void SetLockedChunkInfo(CTSE_Chunk_Info &locked_chunk_info)
bool ParseChunk(const CPSG_ChunkId *chunk_id, SBlobSlot *data_slot)
SSplitSlot * GetSplitSlot(const string &id2_info)
CPSGL_Blob_Processor(CDataSource *data_source, CPSGCaches *caches=nullptr, bool add_wgs_master=false)
TSplitBlobMap m_SplitBlobMap
SBlobSlot * GetChunkSlot(const string &id2_info, TChunkId chunk_id)
SBlobSlot * SetBlobSlot(const CPSG_BlobId *blob_id, const CPSG_ChunkId *chunk_id)
~CPSGL_Blob_Processor() override
EProcessResult PostProcessBlob(const CPSG_DataId *id)
EProcessResult Chunk_ToOM(const CPSG_ChunkId *chunk_id, SChunkSlot *chunk_slot)
int GetBlobInfoState(const string &psg_blob_id)
EProcessResult PostProcessSkippedBlob(const CPSG_DataId *id)
bool ParseTSE(const CPSG_BlobId *blob_id, SBlobSlot *data_slot)
STSESlot * GetTSESlot(const string &blob_id)
EProcessResult TSE_ToOM(const CPSG_BlobId *blob_id, const CPSG_ChunkId *split_info_id, SBlobSlot *data_slot)
CConstRef< CPsgBlobId > GetDLBlobId(STSESlot *tse_slot)
CDataSource * m_DataSource
static tuple< const CPSG_BlobId *, const CPSG_ChunkId * > ParseId(const CPSG_DataId *id)
SSplitSlot * SetSplitSlot(const string &id2_info)
virtual CConstRef< CPsgBlobId > CreateDLBlobId(STSESlot *tse_slot)
SBlobSlot * SetChunkSlot(const string &id2_info, TChunkId chunk_id)
EProcessResult x_Failed(const string &message)
SProcessorDescrPrinter Descr() const
const string & GetId() const
Get ID.
const string & GetId2Info() const
Get ID2 info.
int GetId2Chunk() const
Get ID2 chunk number.
virtual string Repr() const =0
Get tilde-separated string representation of this data ID (e.g. for logging)
EReason GetReason() const
const TSeconds & GetTimeUntilResend() const
Seconds before the blob will be sent to the client.
const string & GetId2Info() const
bool HasBioseqIsDead() const
virtual string ToString(void) const override
Get string representation of blob id.
static void Attach(CTSE_Info &tse, const CID2S_Split_Info &split)
static void Load(CTSE_Chunk_Info &chunk, const CID2S_Chunk &data)
TBlobId GetBlobId(void) const
void SetLoaded(CObject *obj=0)
TChunkId GetChunkId(void) const
CInitGuard * GetLoadInitGuard(void)
bool IsLoaded(void) const
void SetBlobState(TBlobState state)
const TBlobId & GetBlobId(void) const
CTSE_Split_Info & GetSplitInfo(void)
bool x_NeedsDelayedMainChunk(void) const
void SetSeq_entry(CSeq_entry &entry, CTSE_SetObjectInfo *set_info=0)
bool HasSplitInfo(void) const
void SetBlobVersion(TBlobVersion version)
bool IsLoaded(void) const
const CTSE_Info * GetPointerOrNull(void) const
CTSE_Chunk_Info & GetChunk(TChunkId chunk_id)
CTimeout â Timeout interval.
static void AddWGSMaster(CTSE_LoadLock &lock)
const_iterator begin() const
const_iterator end() const
const_iterator find(const key_type &key) const
void reset(element_type *p=0, EOwnership ownership=eTakeOwnership)
Reset will delete the old pointer (if owned), set content to the new value, and assume the ownership ...
bool IsNull(void) const
Check if the object is unassigned.
element_type * get(void) const
Get pointer.
void swap(NCBI_NS_NCBI::pair_base_member< T1, T2 > &pair1, NCBI_NS_NCBI::pair_base_member< T1, T2 > &pair2)
const TValue & GetValue(void) const
Get a const reference to the current value.
#define LOG_POST(message)
This macro is deprecated and it's strongly recomended to move in all projects (except tests) to macro...
void Info(CExceptionArgs_Base &args)
#define MSerial_AsnText
I/O stream manipulators â.
CConstRef< C > ConstRef(const C *object)
Template function for conversion of const object pointer to CConstRef.
void Reset(void)
Reset reference object.
TObjectType * GetPointerOrNull(void) THROWS_NONE
Get pointer value.
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
std::istream & in(std::istream &in_, double &x_)
double r(size_t dimension_, const Int4 *score_, const double *prob_, double theta_)
static string ToString(const CPSG_BlobId *blob_id)
unsigned s_GetDebugLevel()
CObjectIStream * GetBlobDataStream(const CPSG_BlobInfo &blob_info, const CPSG_BlobData &blob_data)
void UpdateOMBlobId(CTSE_LoadLock &load_lock, const CConstRef< CPsgBlobId > &dl_blob_id)
EPSG_Status
Retrieval result.
@ eSuccess
Successfully retrieved.
@ eForbidden
User is not authorized for the retrieval.
static SLJIT_INLINE sljit_ins msg(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
shared_ptr< CPSG_BlobInfo > m_BlobInfo
bool IsReadyToDeserialize() const
EPSG_Status m_BlobDataStatus
CRef< CSerialObject > m_BlobObject
shared_ptr< CPSG_BlobData > m_BlobData
EPSG_Status m_BlobInfoStatus
SChunkSlot(SSplitSlot *split_slot)
CTSE_Chunk_Info * m_LockedChunkInfo
unique_ptr< CDeadline > m_SkippedWaitDeadline
shared_ptr< CPSG_SkippedBlob > m_Skipped
map< TChunkId, SChunkSlot > TChunkMap
SSplitSlot(const string &id2_info)
STSESlot(const string &psg_blob_id)
unique_ptr< CDeadline > m_SkippedWaitDeadline
shared_ptr< SPsgBlobInfo > m_PsgBlobInfo
CConstRef< CPsgBlobId > m_DL_Blob_id
shared_ptr< CPSG_SkippedBlob > m_Skipped
CTSE_Chunk_Info * m_LockedDelayedChunkInfo
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4