full_message =
string(
"simulated exception: ")+message;
140: m_EventLoop(service_name,
141bind(&
CPSGL_Queue::ProcessItemCallback, this, placeholders::_1, placeholders::_2),
142bind(&
CPSGL_Queue::ProcessReplyCallback, this, placeholders::_1, placeholders::_2)),
178 autoiter =
m_TrackerMap.find(reply->GetRequest().get());
182 return Ref(iter->second);
195shared_ptr<CPSG_Request> request =
tracker->GetRequest();
204 constshared_ptr<CPSG_ReplyItem>& item)
207 tracker->ProcessItemCallback(status, item);
213 constshared_ptr<CPSG_Reply>& reply)
216 tracker->ProcessReplyCallback(status, reply);
227 constshared_ptr<CPSG_Request>& request,
230: m_QueueGuard(queue_guard),
231m_ThreadPool(queue_guard.m_ThreadPool),
233m_Processor(processor),
237m_NeedsFinalization(
false),
240m_BackgroundItemTaskCount(0),
291 void*
operator new(size_t) =
delete;
292 void*
operator new[](size_t) =
delete;
298 if( --
tracker->m_InCallbackCount == 0 ) {
299 tracker->m_InCallbackSemaphore.Post();
316++
tracker->m_InCallbackCount;
331 constshared_ptr<CPSG_ReplyItem>& item)
366 if( !callback_guard ) {
373 return tracker->BackgroundProcessReplyCallback(
this);
464 if( task->m_Item ) {
473 constshared_ptr<CPSG_ReplyItem>& item)
475 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::StartProcessItemInBackground()");
483 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::StartProcessReplyInBackground()");
496 if( status > old_status ) {
535 constshared_ptr<CPSG_ReplyItem>& item)
537 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::ProcessItemCallback()");
540 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::ProcessItemCallback() - canceled");
552 _TRACE(
"CPSGDataLoader: failed processing reply item: "<<
result);
556 catch( exception& exc ) {
557 _TRACE(
"CPSGDataLoader: exception while processing reply item: "<<exc.what());
565 constshared_ptr<CPSG_Reply>& reply)
567 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::ProcessReplyCallback()");
570 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::ProcessReplyCallback() - canceled");
588 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::ProcessReplyCallback(): ProcessReplyFast()");
591 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::ProcessReplyCallback(): ProcessReplyFast(): "<<
result);
597 _TRACE(
"CPSGDataLoader: failed processing reply: "<<
result);
604 catch( exception& exc ) {
605 _TRACE(
"CPSGDataLoader: exception while processing reply: "<<exc.what());
615 constshared_ptr<CPSG_ReplyItem>& item)
617 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::BackgroundProcessItemCallback()");
620 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::BackgroundProcessItemCallback() - finished");
634 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::BackgroundProcessItemCallback(): ProcessItemSlow()");
638 _TRACE(
"CPSGDataLoader: failed processing reply item: "<<
result);
661 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::BackgroundProcessItemCallback(): ProcessReplyFast()");
664 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::BackgroundProcessItemCallback(): ProcessReplyFast(): "<<
result);
670 _TRACE(
"CPSGDataLoader: failed processing reply: "<<
result);
677 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::BackgroundProcessItemCallback(): ProcessReply()");
680 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::BackgroundProcessItemCallback(): ProcessReplySlow(): "<<
result);
686 _TRACE(
"CPSGDataLoader: failed processing reply: "<<
result);
696 catch( exception& exc ) {
697 _TRACE(
"CPSGDataLoader: exception while processing reply item: "<<exc.what());
708 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::BackgroundProcessReplyCallback()");
711 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::BackgroundProcessReplyCallback() - finished");
732 _TRACE(
"CPSGDataLoader: failed processing reply: "<<
result);
741 catch( exception& exc ) {
742 _TRACE(
"CPSGDataLoader: exception while processing reply: "<<exc.what());
755 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::FinalizeResult(): calling");
759 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::FinalizeResult(): finalized: "<<
result);
764 catch( exception& exc ) {
765 _TRACE(
"CPSGDataLoader: exception while processing reply: "<<exc.what());
786: m_ThreadPool(thread_pool),
840 _TRACE(
"CPSGL_QueueGuard::AddRequest(): CPSGL_RequestTracker("<<
tracker<<
", "<<
tracker->m_Processor<<
") for requst "<<
s_GetRequestTypeName(request->GetType())<<
" "<<request->GetId());
847 ERR_POST(
"CPSGDataLoader: cannot send request");
855 _TRACE(
"CPSGL_QueueGuard::MarkAsFinished(): tracker: "<<
tracker);
885 _TRACE(
"CPSGL_QueueGuard::GetNextResult(): tracker: "<<
tracker);
886 return tracker->FinalizeResult();
Data loader exceptions, used by GenBank loader.
virtual EProcessResult ProcessItemSlow(EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
virtual EProcessResult ProcessReplyFast(EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
virtual EProcessResult ProcessReplySlow(EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
virtual EProcessResult ProcessItemFast(EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
void AddError(const string &message)
virtual EProcessResult ProcessReplyFinal()
CRef< CPSGL_Queue > m_Queue
void AddRequest(const shared_ptr< CPSG_Request > &request, const CRef< CPSGL_Processor > &processor, size_t index=0)
set< CRef< CPSGL_RequestTracker > > m_QueuedRequests
void MarkAsFinished(const CRef< CPSGL_RequestTracker > &request_processor)
list< CRef< CPSGL_RequestTracker > > m_CompleteRequests
CFastMutex m_CompleteMutex
friend class CPSGL_RequestTracker
CRef< CPSGL_RequestTracker > GetQueuedRequest()
CPSGL_ResultGuard GetNextResult()
CSemaphore m_CompleteSemaphore
CPSGL_QueueGuard(CThreadPool &thread_pool, CPSGL_Queue &queue)
void ProcessItemCallback(EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
bool SendRequest(const CRef< CPSGL_RequestTracker > &tracker)
CRef< CRequestContext > m_RequestContext
void SetRequestContext(const CRef< CRequestContext > &context)
void DeregisterRequest(const CPSGL_RequestTracker *tracker)
void ProcessReplyCallback(EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
CPSG_EventLoop m_EventLoop
CRef< CPSGL_RequestTracker > GetTracker(const shared_ptr< CPSG_Reply > &reply)
unordered_map< const CPSG_Request *, CPSGL_RequestTracker * > m_TrackerMap
CFastMutex m_TrackerMapMutex
void RegisterRequest(CPSGL_RequestTracker *tracker)
CPSGL_Queue(const string &service_name)
CBackgroundTask(CPSGL_RequestTracker *tracker, EPSG_Status item_status, const shared_ptr< CPSG_ReplyItem > &item)
shared_ptr< CPSG_ReplyItem > m_Item
CBackgroundTask(CPSGL_RequestTracker *tracker)
EStatus Execute() override
Do the actual job.
CRef< CObjectFor< CFastMutex > > m_TrackerMutex
CFastMutex & GetTrackerMutex()
CRef< CPSGL_RequestTracker > m_Tracker
void OnStatusChange(EStatus old_task_status) override
Callback to notify on changes in the task status.
void DisconnectFromTracker()
CCallbackGuard & operator=(const CCallbackGuard &)=delete
CPSGL_RequestTracker * m_Tracker
void Set(CPSGL_RequestTracker *tracker)
void SetNoLock(CPSGL_RequestTracker *tracker)
CCallbackGuard(const CCallbackGuard &)=delete
CCallbackGuard(CPSGL_RequestTracker *tracker)
CPSGL_RequestTracker(CPSGL_QueueGuard &queue_guard, const shared_ptr< CPSG_Request > &request, const CRef< CPSGL_Processor > &processor, size_t index=0)
void MarkAsNeedsFinalization()
CThreadPool_Task::EStatus BackgroundProcessItemCallback(CBackgroundTask *task, EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
void StartProcessReplyInBackground()
void MarkAsFinished(CThreadPool_Task::EStatus status)
void WaitForBackgroundTasks()
unsigned m_InCallbackCount
TBackgroundTasks m_BackgroundTasks
EPSG_Status m_ReplyStatus
void CancelBackgroundTasks()
CRef< CPSGL_Processor > m_Processor
shared_ptr< CPSG_Reply > m_Reply
void QueueInBackground(const CRef< CBackgroundTask > &task)
CPSGL_QueueGuard & m_QueueGuard
void StartProcessItemInBackground(EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
CSemaphore m_InCallbackSemaphore
CThreadPool_Task::EStatus BackgroundProcessReplyCallback(CBackgroundTask *task)
shared_ptr< CPSG_Request > m_Request
void ProcessItemCallback(EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
atomic< CThreadPool_Task::EStatus > m_Status
unsigned m_BackgroundItemTaskCount
CPSGL_ResultGuard FinalizeResult()
CFastMutex & GetTrackerMutex()
void ProcessReplyCallback(EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
CThreadPool & m_ThreadPool
CThreadPool_Task::EStatus GetStatus() const
CPSGL_ResultGuard & operator=(CPSGL_ResultGuard &&)
A class derived from the queue class that additionally allows to run event loop.
bool SendRequest(shared_ptr< CPSG_Request > request, CDeadline deadline)
Push request into the queue.
void Reset()
Stop accepting new requests and cancel all requests whose replies have not been returned yet.
Abstract class for representing single task executing in pool of threads To use this class in applica...
Main class implementing functionality of pool of threads.
iterator_bool insert(const value_type &val)
const_iterator begin() const
const_iterator find(const key_type &key) const
const_iterator end() const
void swap(NCBI_NS_NCBI::pair_base_member< T1, T2 > &pair1, NCBI_NS_NCBI::pair_base_member< T1, T2 > &pair2)
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
TObjectType * GetNCPointer(void) const THROWS_NONE
Get pointer,.
CRef< C > Ref(C *object)
Helper functions to get CRef<> and CConstRef<> objects.
void Reset(void)
Reset reference object.
EStatus GetStatus(void) const
Get status of the task.
EStatus
Status of the task.
void RequestToCancel(void)
Cancel the task.
void AddTask(CThreadPool_Task *task, const CTimeSpan *timeout=NULL)
Add task to the pool for execution.
@ eFailed
failure during execution
@ eCompleted
executed successfully
@ eCanceled
canceled - possible only if canceled before processing was started or if method Execute() returns res...
void Run(void)
Enter the main loop.
void Wait(void)
Wait on semaphore.
void Post(unsigned int count=1)
Increment the semaphore by "count".
@ eInfinite
Infinite deadline.
Pool of generic task-executing threads.
void SleepMilliSec(unsigned long ml_sec, EInterruptOnSignal onsignal=eRestartOnSignal)
Multi-threading â classes, functions, and features.
EPSG_Status
Retrieval result.
@ eInProgress
Retrieval is not finalized yet, more info may come.
@ eError
An error was encountered while trying to send request or to read and to process the reply.
static void s_SimulateFailure(CPSGL_Processor *processor, const char *message)
static const int kDestructionDelay
static const char * s_GetRequestTypeName(CPSG_Request::EType type)
static const int kFailureRate
static void s_SimulateDelay()
static bool s_IsFinished(CThreadPool_Task::EStatus status)
static bool s_IsAborted(CThreadPool_Task::EStatus status)
static CS_CONTEXT * context
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4