<
typenameTRequest>
161 unsigned inttimeout_sec = 0,
162 unsigned inttimeout_nsec = 0);
172 unsigned inttimeout_nsec = 0)
const;
182 unsigned inttimeout_nsec = 0)
const;
194 unsigned inttimeout_nsec = 0);
207 unsigned inttimeout_nsec = 0);
281{
return!q.
empty(); }
289 unsigned inttimeout_nsec)
const;
304 template<
typenameTRequest>
385 template<
typenameTRequest>
408 unsigned intspawn_threshold = 1,
409 unsigned intmax_urgent_threads =
kMax_UInt,
410 const string& thread_name =
kEmptyStr);
419 void Spawn(
unsigned intnum_threads);
430 unsigned inttimeout_sec = 0,
431 unsigned inttimeout_nsec = 0);
440 unsigned inttimeout_sec = 0,
441 unsigned inttimeout_nsec = 0);
451 unsigned inttimeout_nsec = 0);
526 unsigned inttimeout_sec = 0,
527 unsigned inttimeout_nsec = 0);
563 EStatusold_status = GetStatus();
565m_Request->OnStatusChange(old_status, new_status);
599{ TParent::ProcessRequest(handle); }
624 unsigned intspawn_threshold = 1,
625 unsigned intmax_urgent_threads =
kMax_UInt,
627:
TParent(max_threads, queue_size, spawn_threshold, max_urgent_threads,
652{ KillAllThreads(wait ? (fKill_Wait | fKill_Reopen) : fKill_Reopen); }
658 virtual voidRegister(TThread& thread);
664 virtual voidUnRegister(TThread& thread);
681 typedef NCBI_PARAM_TYPE(ThreadPool, Catch_Unhandled_Exceptions) TParamThreadPoolCatchExceptions;
696 template<
typenameTRequest>
699 unsigned inttimeout_sec,
700 unsigned inttimeout_nsec)
706timeout_sec, timeout_nsec) ) {
708 "CBlockingQueue<>::Put: " 709 "attempt to insert into a full queue");
711 if(m_RequestCounter == 0) {
712m_RequestCounter = 0xFFFFFF;
715 val.m_Priority = (
val.m_Priority & 0xFF000000) | m_RequestCounter--;
723 TPriorityreal_priority = (priority << 24) | m_RequestCounter--;
728 if(q.
size() == m_MaxSize) {
735 template<
typenameTRequest>
737 unsigned inttimeout_nsec)
const 742timeout_sec, timeout_nsec)) {
746 "CBlockingQueue<>::WaitForRoom: timed out");
750 template<
typenameTRequest>
752 unsigned inttimeout_nsec)
const 756timeout_sec, timeout_nsec)) {
760 "CBlockingQueue<>::WaitForHunger: timed out");
765 template<
typenameTRequest>
768 unsigned inttimeout_nsec)
776m_HungerSem.TryWait();
780guard, timeout_sec, timeout_nsec);
782 if(--m_HungerCnt <= q.
size()) {
783m_HungerSem.TryWait();
788 "CBlockingQueue<>::Get[Handle]: timed out");
794 if(m_HungerCnt > q.
size()) {
795m_HungerSem.TryWait();
798 if( ! q.
empty() ) {
810handle->x_SetStatus(CQueueItem::eActive);
814 template<
typenameTRequest>
816 unsigned inttimeout_nsec)
818 TItemHandlehandle = GetHandle(timeout_sec, timeout_nsec);
819handle->MarkAsComplete();
820 returnhandle->GetRequest();
824 template<
typenameTRequest>
832 template<
typenameTRequest>
836 if(handle->GetUserPriority() == priority
837|| handle->GetStatus() != CQueueItem::ePending) {
846 if(it != q.
end() && *it == handle) {
848 TPrioritycounter = handle->m_Priority & 0xFFFFFF;
849handle->m_Priority = (priority << 24) | counter;
855 template<
typenameTRequest>
858 if(handle->GetStatus() != CQueueItem::ePending) {
868 if(it != q.
end() && *it == handle) {
880handle->x_SetStatus(CQueueItem::eWithdrawn);
883 template<
typenameTRequest>
887 unsigned inttimeout_sec,
888 unsigned inttimeout_nsec)
892 if( !(this->*pred)(q) ) {
893 #if SIZEOF_INT == SIZEOF_LONG 899 if(timeout_sec >=
kMax_Int- extra_sec) {
902timeout_sec += extra_sec;
906 CTimeSpanspan(timeout_sec, timeout_nsec);
913guard.
Guard(m_Mutex);
918 return(this->*pred)(q);
925 template<
typenameTRequest>
933 template<
typenameTRequest>
936 if(m_Counter !=
NULL) {
941 template<
typenameTRequest>
946 template<
typenameTRequest>
949m_Thread->x_UnregisterThread();
953 template<
typenameTRequest>
963 template<
typenameTRequest>
983}
catch(std::exception& e) {
984handle->MarkAsForciblyCaught();
986 "Exception from thread in pool: ", e);
989handle->MarkAsForciblyCaught();
1002 template<
typenameTRequest>
1009 if(!name.empty()) {
1016 ERR_POST(
Warning<<
"New worker thread blocked at the last minute.");
1022 boolcatch_all = TParamThreadPoolCatchExceptions::GetDefault();
1034 template<
typenameTRequest>
1042 template<
typenameTRequest>
1054 template<
typenameTRequest>
1056 unsigned intqueue_size,
1057 unsigned intspawn_threshold,
1058 unsigned intmax_urgent_threads,
1059 const string& thread_name)
1060:
m_MaxThreads(max_threads), m_MaxUrgentThreads(max_urgent_threads),
1061m_Threshold(spawn_threshold), m_Delta(0),
1062m_Queue(queue_size > 0 ? queue_size : max_threads),
1063m_QueuingForbidden(queue_size == 0),
1064m_ThreadName(thread_name)
1069 template<
typenameTRequest>
1075 Warning<<
"CPoolOfThreads<>::~CPoolOfThreads: " 1076<<
n<<
" thread(s) still active");
1080 template<
typenameTRequest>
1083 for(
unsigned int i= 0;
i< num_threads;
i++)
1085x_RunNewThread(TThread::eNormal, &m_ThreadCount);
1090 template<
typenameTRequest>
1095 unsigned inttimeout_sec,
1096 unsigned inttimeout_nsec)
1098 returnx_AcceptRequest(req, priority,
false, timeout_sec, timeout_nsec);
1101 template<
typenameTRequest>
1105 unsigned inttimeout_sec,
1106 unsigned inttimeout_nsec)
1108 returnx_AcceptRequest(req, 0xFF,
true, timeout_sec, timeout_nsec);
1111 template<
typenameTRequest>
1117 if(m_Queue.IsFull()) {
1119}
else if(m_Delta.load() < 0) {
1121}
else if(m_ThreadCount.Get() <
m_MaxThreads.Get()) {
1124&& m_UrgentThreadCount.Get() < m_MaxUrgentThreads.Get()) {
1128m_Queue.WaitForHunger(0);
1132 "Possible thread pool bug. delta: "<< m_Delta.load()
1133<<
"; hunger: "<< m_Queue.GetHunger());
1141 template<
typenameTRequest>
1144 unsigned inttimeout_nsec)
1146 if(HasImmediateRoom()) {
1148}
else if(m_QueuingForbidden) {
1149m_Queue.WaitForHunger(timeout_sec, timeout_nsec);
1151m_Queue.WaitForRoom(timeout_sec, timeout_nsec);
1155 template<
typenameTRequest>
1161 unsigned inttimeout_sec,
1162 unsigned inttimeout_nsec)
1164 boolnew_thread =
false;
1169 if( priority == 0xFF && !urgent )
1171 if(m_QueuingForbidden && !HasImmediateRoom(urgent) ) {
1173 "CPoolOfThreads<>::x_AcceptRequest: " 1174 "attempt to insert into a full queue");
1176handle = m_Queue.Put(req, priority, timeout_sec, timeout_nsec);
1177 if(++m_Delta >= m_Threshold
1182&& m_UrgentThreadCount.Get() >= m_MaxUrgentThreads.Get()) {
1190x_RunNewThread(TThread::eNormal, &m_ThreadCount);
1191}
else if(urgent) {
1192x_RunNewThread(TThread::eRunOnce, &m_UrgentThreadCount);
1198 template<
typenameTRequest>
1205 thr->CountSelf(counter);
1210 Critical<<
"Ignoring error while starting new thread: " 1215 template<
typenameTRequest>
1221 if(handle->GetUserPriority() == 0xFF) {
1223}
else if(priority == 0xFF) {
1226m_Queue.SetUserPriority(handle, priority);
CAtomicCounter_WithAutoInit â.
It may be desirable to store handles obtained from GetHandle() in instances of CCompletingHandle to e...
CBlockingQueue<> â queue of requests, with efficiently blocking Get()
void Guard(resource_type &resource)
Manually force the guard to protect some other resource.
void Release()
Manually force the resource to be released.
CThreadInPool<> â abstract request-handling thread.
CQueueItemBase â skeleton blocking-queue item, sans actual request.
iterator_bool insert(const value_type &val)
const_iterator begin() const
parent_type::iterator iterator
const_iterator find(const key_type &key) const
const_iterator end() const
Include a standard set of the NCBI C++ Toolkit most basic headers.
#define NON_CONST_ITERATE(Type, Var, Cont)
Non constant version of ITERATE macro.
@ ePositive
Value is positive.
TNCBIAtomicValue TValue
Alias TValue for TNCBIAtomicValue.
TValue Add(int delta) THROWS_NONE
Atomically add value (=delta), and return new counter value.
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
#define ERR_POST_XX(error_name, err_subcode, message)
Error posting with error code having given name and with given error subcode.
#define STD_CATCH_ALL_XX(err_name, err_subcode, message)
Standard handling of "exception"-derived exceptions; catches non-standard exceptions and generates "u...
void Critical(CExceptionArgs_Base &args)
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
void Warning(CExceptionArgs_Base &args)
#define NCBI_REPORT_EXCEPTION_XX(err_name, err_subcode, title, ex)
Generate a report on the exception with default error code and given subcode.
void Reset(void)
Reset reference object.
bool NotEmpty(void) const THROWS_NONE
Check if CRef is not empty â pointing to an object and has a non-null value.
TObjectType & GetObject(void)
Get object.
uint8_t Uint1
1-byte (8-bit) unsigned integer
uint32_t Uint4
4-byte (32-bit) unsigned integer
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
CMutex m_Mutex
Guards access to queue.
CQueueItemBase::TPriority TPriority
TPool * m_Pool
The pool that holds this thread.
size_t m_HungerCnt
Number of threads waiting for data.
const TPriority & GetPriority(void) const
virtual void * Main(void)
Derived (user-created) class must provide a real thread function.
bool IsEmpty(void) const
Check if the queue is empty.
bool(CBlockingQueue::* TQueuePredicate)(const TRealQueue &q) const
const EStatus & GetStatus(void) const
void MarkAsComplete(void)
void WaitForRoom(unsigned int timeout_sec=kMax_UInt, unsigned int timeout_nsec=0) const
Wait for room in the queue for up to timeout_sec + timeout_nsec/1E9 seconds.
bool operator()(const TItemHandle &i1, const TItemHandle &i2) const
TQueue::TUserPriority TUserPriority
virtual TThread * NewThread(ERunMode mode)=0
Create a new thread.
virtual void ProcessRequest(TItemHandle handle)
CAtomicCounter * m_Counter
virtual ~CStdRequest(void)
Destructor.
CSemaphore m_HungerSem
Raised if Get[Handle] has to wait.
CThreadInPool< CRef< CStdRequest > > TParent
virtual void x_OnExit(void)
Clean up. It is called by OnExit()
virtual void UnRegister(TThread &)
Unregister a thread.
TItemHandle Put(const TRequest &request, TUserPriority priority=0, unsigned int timeout_sec=0, unsigned int timeout_nsec=0)
Put a request into the queue.
TUserPriority GetUserPriority(void) const
CMutex m_Mutex
The guard for m_MaxThreads, m_MaxUrgentThreads, and m_Delta.
atomic< int > m_Delta
The difference between the number of unfinished requests and the total number of threads in the pool.
bool x_HungerSemPred(const TRealQueue &q) const
size_t GetSize(void) const
Get the number of requests in the queue.
void Withdraw(TItemHandle handle)
Withdraw a pending request from consideration.
void WaitForHunger(unsigned int timeout_sec=kMax_UInt, unsigned int timeout_nsec=0) const
Wait for the queue to have waiting readers, for up to timeout_sec + timeout_nsec/1E9 seconds.
virtual ~CPoolOfThreads(void)
Destructor.
void x_SetStatus(EStatus new_status)
friend class CAutoUnregGuard
void WaitForRoom(unsigned int timeout_sec=kMax_UInt, unsigned int timeout_nsec=0)
Wait for the room in the queue up to timeout_sec + timeout_nsec/1E9 seconds.
size_t GetHunger(void) const
Get the number of threads waiting for requests, for debugging purposes only.
TThread::ERunMode ERunMode
CAutoUnregGuard(TThread *thr)
TItemHandle AcceptRequest(const TRequest &request, TUserPriority priority=0, unsigned int timeout_sec=0, unsigned int timeout_nsec=0)
Put a request in the queue with a given priority.
bool HasImmediateRoom(bool urgent=false) const
Check whether a new request could be immediately processed.
CQueueItemBase::TUserPriority TUserPriority
CBlockingQueue(size_t max_size=kMax_UInt)
Constructor.
CAtomicCounter::TValue TACValue
bool IsEmpty(void) const
Check if the queue is empty.
size_t GetMaxSize(void) const
Get the maximun number of requests that can be put into the queue.
ERunMode m_RunMode
How long to keep running.
bool IsFull(void) const
Check if the queue is full.
void x_HandleOneRequest(bool catch_all)
virtual ~CThreadInPool(void)
Destructor.
size_t m_MaxSize
The maximum size of the queue.
size_t GetQueueSize(void) const
Get the number of requests in the queue.
Uint4 TPriority
Every request has an associated 32-bit priority field, but only the top eight bits are under direct u...
virtual void ProcessRequest(const TRequest &req)=0
Older interface (still delegated to by default)
TRequest Get(unsigned int timeout_sec=kMax_UInt, unsigned int timeout_nsec=0)
Get the first available request from the queue, and return just the request.
bool x_GetSemPred(const TRealQueue &q) const
TItemHandle x_AcceptRequest(const TRequest &req, TUserPriority priority, bool urgent, unsigned int timeout_sec=0, unsigned int timeout_nsec=0)
TQueue m_Queue
The request queue.
virtual TThread * NewThread(TThread::ERunMode mode)
Create a new thread.
virtual void Init(void)
Intit this thread. It is called at beginning of Main()
CStdThreadInPool(TPool *pool, ERunMode mode=eNormal)
Constructor.
bool x_PutSemPred(const TRealQueue &q) const
CStdPoolOfThreads(unsigned int max_threads, unsigned int queue_size, unsigned int spawn_threshold=1, unsigned int max_urgent_threads=kMax_UInt, const string &thread_name=kEmptyStr)
Constructor.
virtual void OnStatusChange(EStatus, EStatus)
Callback for status changes.
CBlockingQueue< TRequest > TQueue
CBlockingQueue & operator=(const CBlockingQueue &)
volatile TRealQueue m_Queue
The queue.
list< CRef< TThread > > TThreads
CBlockingQueue(const CBlockingQueue &)
forbidden
CQueueItemBase::EStatus EStatus
bool x_WaitForPredicate(TQueuePredicate pred, CSemaphore &sem, CMutexGuard &guard, unsigned int timeout_sec, unsigned int timeout_nsec) const
void Spawn(unsigned int num_threads)
Start processing threads.
CCompletingHandle(const TItemHandle &h)
virtual void Register(TThread &thread)
Register a thread.
CQueueItem(Uint4 priority, TRequest request)
CSemaphore m_PutSem
Raised if the queue has room.
TItemHandle AcceptUrgentRequest(const TRequest &request, unsigned int timeout_sec=0, unsigned int timeout_nsec=0)
Puts a request in the queue with the highest priority It will run a new thread even if the maximum of...
virtual void ProcessRequest(TItemHandle handle)
Process a request.
bool IsFull(void) const
Check if the queue is full.
void Withdraw(TItemHandle handle)
Withdraw a pending request from consideration.
int m_Threshold
for delta
void x_UnregisterThread(void)
void MarkAsForciblyCaught(void)
void x_RunNewThread(ERunMode mode, CAtomicCounter *counter)
CThreadInPool(TPool *pool, ERunMode mode=eNormal)
Constructor.
void SetUserPriority(TItemHandle handle, TUserPriority priority)
Adjust a pending request's priority.
int TKillFlags
binary OR of EKillFlags
const TRequest & GetRequest(void) const
CAtomicCounter_WithAutoInit m_ThreadCount
The current number of threads in the pool.
CThreadInPool< TRequest > TThread
ERunMode GetRunMode(void) const
Get run mode.
virtual void OnExit(void)
Override this to execute finalization code.
virtual void ProcessRequest(const CRef< CStdRequest > &req)
Process a request.
CBlockingQueue< TRequest >::TItemHandle TItemHandle
CAtomicCounter_WithAutoInit m_MaxThreads
The maximum number of threads the pool can hold.
virtual void KillAllThreads(bool wait)
Causes all threads in the pool to exit cleanly after finishing all pending requests,...
CQueueItemBase(TPriority priority)
NCBI_PARAM_DECL(bool, ThreadPool, Catch_Unhandled_Exceptions)
CPoolOfThreads< CRef< CStdRequest > > TParent
virtual void x_SetStatus(EStatus new_status)
CRef< CQueueItem > TItemHandle
TRequest & SetRequest(void)
typedef NCBI_PARAM_TYPE(ThreadPool, Catch_Unhandled_Exceptions) TParamThreadPoolCatchExceptions
CThreadInPool< TRequest > TThread
const string m_ThreadName
CAtomicCounter_WithAutoInit m_MaxUrgentThreads
The maximum number of urgent threads running simultaneously.
CPoolOfThreads< TRequest > TPool
TItemHandle GetHandle(unsigned int timeout_sec=kMax_UInt, unsigned int timeout_nsec=0)
Get the first available request from the queue, and return a handle to it.
CBlockingQueue< TRequest >::CCompletingHandle TCompletingHandle
bool operator>(const CQueueItemBase &item) const
virtual void Process(void)=0
Do the actual job Called by whichever thread handles this request.
CAtomicCounter_WithAutoInit m_UrgentThreadCount
The current number of urgent threads running now.
CSemaphore m_GetSem
Raised if the queue contains data.
TQueue::TItemHandle TItemHandle
set< TItemHandle, SItemHandleGreater > TRealQueue
The type of the queue.
void CountSelf(CAtomicCounter *counter)
CPoolOfThreads(unsigned int max_threads, unsigned int queue_size, unsigned int spawn_threshold=1, unsigned int max_urgent_threads=kMax_UInt, const string &thread_name=kEmptyStr)
Constructor.
void SetUserPriority(TItemHandle handle, TUserPriority priority)
Adjust a pending request's priority.
ERunMode
Thread run mode.
@ eComplete
extracted and released
@ eForciblyCaught
let an exception escape
@ ePending
still in the queue
@ eWithdrawn
dropped by submitter's request
@ eActive
extracted but not yet released
@ eNormal
Process request and stay in the pool.
@ eRunOnce
Process request and die.
unsigned int m_MaxThreads
Maximum simultaneous threads.
ERunMode
Which mode should the thread run in.
void Detach(void)
Inform the thread that user does not need to wait for its termination.
static void SetCurrentThreadName(const CTempString &)
Set name for the current thread.
bool TryWait(unsigned int timeout_sec=0, unsigned int timeout_nsec=0)
Timed wait.
long GetNanoSecondsAfterSecond(void) const
Get number of nanoseconds.
ESign GetSign(void) const
Get sign of time span.
CTime CurrentTime(CTime::ETimeZone tz=CTime::eLocal, CTime::ETimeZonePrecision tzp=CTime::eTZPrecisionDefault)
long GetCompleteSeconds(void) const
Get number of complete seconds.
const long kNanoSecondsPerSecond
Number of nanoseconds in one second.
@ eCurrent
Use current time. See also CCurrentTime.
@ eGmt
GMT (Greenwich Mean Time)
Definition of all error codes used in util (xutil.lib).
const struct ncbi::grid::netcache::search::fields::SIZE size
Multi-threading â classes, functions, and features.
Defines: CTimeFormat - storage class for time format.
NCBI_XUTIL_EXPORT
Parameter to control printing diagnostic message about conversion of static array data from a differe...
CRef< CTestThread > thr[k_NumThreadsMax]
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4