s_True =
"TRUE\n";
59 static strings_False =
"FALSE\n";
63 "OK: RECEPIENT ADDRESS: "+
68 buffer+=
"OK: ANY JOB: ";
75 buffer+=
"OK: EXPLICIT AFFINITIES: n/a (available in VERBOSE mode)\n";
79 buffer+=
"OK: EXPLICIT AFFINITIES: CLIENT NOT FOUND\n";
85 if(wait_aff.
any()) {
86 buffer+=
"OK: EXPLICIT AFFINITIES:\n";
89 for( ; en.
valid(); ++en)
94 buffer+=
"OK: EXPLICIT AFFINITIES: NONE\n";
99 buffer+=
"OK: USE PREFERRED AFFINITIES: ";
113 if(pref_aff.
any()) {
114 buffer+=
"OK: USE PREFERRED AFFINITIES:\n";
117 for( ; en.
valid(); ++en)
122 buffer+=
"OK: USE PREFERRED AFFINITIES: NONE\n";
125 buffer+=
"OK: USE PREFERRED AFFINITIES: FALSE\n";
128 buffer+=
"OK: EXCLUSIVE NEW AFFINITY: ";
135 buffer+=
"OK: GROUPS: NONE\n";
138 buffer+=
"OK: GROUPS:\n";
140 for( ; en.
valid(); ++en) {
143 buffer+=
"OK: '"+ token +
"'\n";
144}
catch(
constexception & ex) {
145 ERR_POST(
"Error resolving group number while printing " 146 "the notification registry: "<< ex.what());
148 ERR_POST(
"Unknown resolving group number error while " 149 "printing the notification registry");
153 buffer+=
"OK: GROUPS: n/a (available in VERBOSE mode)\n";
156 buffer+=
"OK: REASON: ";
162 buffer+=
"OK: ACTIVE: ";
169 buffer+=
"OK: HIGH FREQUENCY LIFE TIME: "+
172 buffer+=
"OK: HIGH FREQUENCY LIFE TIME: n/a\n";
174 buffer+=
"OK: SLOW RATE ACTIVE: ";
185 const string& ns_node,
186 const string& qname) :
187m_JobChangeNotifConstPart(
"ns_node="+ ns_node +
"&job_key="),
191 "reason=get&ns_node=%s&queue=%s",
192ns_node.c_str(), qname.c_str()) + 1;
194 "reason=read&ns_node=%s&queue=%s",
195ns_node.c_str(), qname.c_str()) + 1;
199 "NCBI_JSQ_%s", qname.c_str()) + 1;
207 unsigned inttimeout,
210 boolexclusive_new_affinity,
215 unsigned intaddress =
client.GetAddress();
216list<SNSNotificationAttributes>::iterator found;
225found->m_ClientNode =
client.GetNode();
226found->m_WnodeAff = wnode_aff;
227found->m_AnyJob = any_job;
228found->m_ExclusiveNewAff = exclusive_new_affinity;
229found->m_NewFormat = new_format;
230found->m_Groups = groups;
231found->m_HifreqNotifyLifetime =
kTimeZero;
232found->m_SlowRate =
false;
233found->m_SlowRateCount = 0;
234found->m_Reason = reason;
240 if(found->m_ClientNode.empty())
241found->m_AnyJob =
true;
264 attributes.m_ExclusiveNewAff = exclusive_new_affinity;
293list<SNSNotificationAttributes>::iterator found;
310 const string& notification)
314notification.size() + 1,
322 const string& job_key,
327notification.reserve(2048);
338notification +=
"status";
341notification +=
"stolen";
344notification +=
"progress";
347notification +=
"unknown";
352 size_tencoded_msg_size = url_encoded_msg.size();
353 const size_tsize_limit = 768;
355 if(encoded_msg_size > size_limit) {
356 size_tmsg_size = progress_msg.size();
357 size_ttruncate_count = encoded_msg_size - size_limit;
360progress_msg.substr(0, msg_size - truncate_count)) +
361 "&msg_truncated="+ to_string(truncate_count);
363notification +=
"&msg="+ url_encoded_msg;
377list<SNSNotificationAttributes>::iterator rec;
397 if(cmd_group == rec->m_Reason) {
411 unsigned intnotif_lofreq_mult,
423 if(k->m_SlowRate || current_time > k->m_HifreqNotifyLifetime) {
424k->m_SlowRate =
true;
427k->m_SlowRateCount += 1;
428 if(k->m_SlowRateCount > notif_lofreq_mult) {
429k->m_SlowRateCount = 0;
433k->m_NewFormat, k->m_Reason);
435k->m_NewFormat, k->m_Reason);
441k->m_NewFormat, k->m_Reason);
461 if(k->m_ExclusiveNewAff) {
462candidates = outdated_jobs;
465 if(candidates.
any()) {
467k->m_NewFormat, k->m_Reason);
470k->m_HifreqNotifyLifetime = current_time +
471notif_highfreq_period;
502 Notify(jobs, aff_ids, aff_id == 0,
503clients_registry, aff_registry, group_registry, scope_registry,
504notif_highfreq_period, notif_handicap, reason);
530vector<SNSNotificationAttributes*> targets;
531 boolbe_random = (notif_handicap.tv_sec != 0 ||
532notif_handicap.tv_nsec != 0);
542 if(reason != k->m_Reason) {
548 boolshould_send =
false;
555 if(candidates.
any() ==
false) {
561 if(k->m_Groups.any()) {
563 if(candidates.
any() ==
false) {
570 stringvirtual_scope;
571clients_registry.
GetScopes(k->m_ClientNode, scope, virtual_scope);
573 if(!virtual_scope.empty()) {
579scope_registry.
GetJobs(virtual_scope));
582candidates &= (scope_registry.
GetJobs(scope) |
583scope_registry.
GetJobs(virtual_scope));
586 if(candidates.
any() ==
false) {
599 if(affinities.
any())
603k->m_WnodeAff, reason);
605 if(should_send ==
false) {
606 if(k->m_ExclusiveNewAff) {
610 if(affinities.
any())
611should_send = (affinities -
612all_preferred_affs).any();
617k->m_HifreqNotifyLifetime = current_time + notif_highfreq_period;
623k->m_NewFormat, k->m_Reason);
632 if(be_random && !targets.empty()) {
633shuffle(targets.begin(), targets.end(), default_random_engine());
637targets[0]->m_NewFormat,
638targets[0]->m_Reason);
640 for(
size_tj(1); j < targets.size(); ++j)
644targets[j]->m_NewFormat,
645targets[j]->m_Reason);
663 if(k->m_Reason ==
eGet)
665k->m_NewFormat, k->m_Reason);
673k->m_NewFormat,
eGet);
691list<SNSNotificationAttributes>::const_iterator current;
701 buffer+= current->Print(clients_registry, aff_registry,
702group_registry,
true,
verbose);
706 buffer+= current->Print(clients_registry, aff_registry,
707group_registry,
false,
verbose);
715 unsigned intaddress,
733notification.
m_Port= port;
747list<SExactTimeNotification>::iterator k =
750 if( k->m_Reason ==
eGet)
769list<SExactTimeNotification>::iterator
first=
771 if(
first->m_TimeToSend > current)
772 return first->m_TimeToSend;
792 for(list<SQueueResumeNotification>::iterator
795 if(k->m_Address == address && k->m_Port == port)
814 for(list<SNSNotificationAttributes>::const_iterator
817 if(k->m_Address == address &&
819k->m_Reason == cmd_group)
820 returnk->m_Lifetime;
828 unsigned intaddress,
860list<SNSNotificationAttributes> & container,
861list<SNSNotificationAttributes>::iterator & record)
863 if(current_time > record->m_Lifetime) {
866 if(!record->m_ClientNode.empty())
870record->m_Reason,
false);
871record = container.erase(record);
881 unsigned shortport)
const 884 for(list<SExactTimeNotification>::const_iterator
888 if(k->m_Address == address &&
899 unsigned intsec_delay,
900 unsigned intnanosec_delay,
901 const bool& logging) :
902m_QueueDB(qdb), m_NotifLogging(logging),
903m_Period(sec_delay, nanosec_delay),
904m_StopSignal(0, 10000000),
949}
while(next_exact <= current);
953delay = next_exact - current;
976 catch(exception & ex) {
978 ERR_POST(
"Error during sending exact time scheduled notifications: " 979<< ex.what() <<
". Notification thread has been stopped.");
983 ERR_POST(
"Unknown error during sending exact time scheduled " 984 "notifications. Notification thread has been stopped.");
997 ctx->SetRequestID();
1000.Print(
"_type",
"get_job_notification_thread");
1008 catch(exception & ex) {
1010 ERR_POST(
"Error during notification: "<< ex.what() <<
1011 " notification thread has been stopped.");
1013 ctx->SetRequestStatus(
1018 ERR_POST(
"Unknown error during notification. " 1019 "Notification thread has been stopped.");
1021 ctx->SetRequestStatus(
1034list<SNSNotificationAttributes>::iterator
1036list<SNSNotificationAttributes> & container,
1037 unsigned intaddress,
1038 unsigned shortport,
1041 for(list<SNSNotificationAttributes>::iterator k = container.begin();
1042k != container.end(); ++k) {
1043 if(k->m_Address == address &&
1044k->m_Port == port &&
1045k->m_Reason == cmd_group)
1048 returncontainer.end();
virtual void * Main(void)
Derived (user-created) class must provide a real thread function.
CNSPreciseTime m_NextScheduled
const bool & m_NotifLogging
CQueueDataBase & m_QueueDB
CNSPreciseTime x_ProcessExactTimeNotifications(void)
CGetJobNotificationThread(CQueueDataBase &qdb, unsigned int sec_delay, unsigned int nanosec_delay, const bool &logging)
~CGetJobNotificationThread()
size_t GetLastEventIndex(void) const
const string & GetProgressMsg() const
string GetTokenByID(unsigned int aff_id) const
void SubtractBlacklistedJobs(const CNSClientId &client, ECommandGroup cmd_group, TNSBitVector &bv) const
TNSBitVector GetAllPreferredAffinities(ECommandGroup cmd_group) const
bool CancelWaiting(CNSClient &client, ECommandGroup cmd_group, bool touch_notif_registry=true)
TNSBitVector GetWaitAffinities(const CNSClientId &client, ECommandGroup cmd_group) const
void GetScopes(const string &client_node, string &scope, string &virtual_scope)
bool IsRequestedAffinity(const string &name, const TNSBitVector &aff, bool use_preferred, ECommandGroup cmd_group) const
TNSBitVector GetPreferredAffinities(const CNSClientId &client, ECommandGroup cmd_group) const
unsigned int ResolveGroup(const string &group)
void RestrictByGroup(const string &group, TNSBitVector &bv) const
string BuildJobChangedNotification(const CJob &job, const string &job_key, TJobStatus job_status, ENotificationReason reason)
CFastMutex m_GetAndReadNotificationSocketLock
void x_SendNotificationPacket(unsigned int address, unsigned short port, bool new_format, ECommandGroup reason)
list< SExactTimeNotification > m_ExactTimeNotifications
char m_GetMsgBufferObsoleteVersion[k_MessageBufferSize]
CDatagramSocket m_GetAndReadNotificationSocket
void onQueueResumed(bool any_pending)
CNSPreciseTime GetPassiveNotificationLifetime(unsigned int address, unsigned short port, ECommandGroup cmd_group) const
CMutex m_ExactTimeNotifLock
CMutex m_QueueResumeNotifLock
char m_ReadMsgBuffer[k_MessageBufferSize]
void Notify(unsigned int job_id, unsigned int aff_id, CNSClientsRegistry &clients_registry, CNSAffinityRegistry &aff_registry, CNSGroupsRegistry &group_registry, CNSScopeRegistry &scope_registry, const CNSPreciseTime ¬if_highfreq_period, const CNSPreciseTime ¬if_handicap, ECommandGroup cmd_group)
void AddToQueueResumedNotifications(unsigned int address, unsigned short port, bool new_format)
list< SNSNotificationAttributes > m_PassiveListeners
void x_AddToExactNotifications(unsigned int address, unsigned short port, const CNSPreciseTime &when, bool new_format, ECommandGroup reason)
void NotifyJobChanges(unsigned int address, unsigned short port, const string ¬ification)
void UnregisterListener(const CNSClientId &client, unsigned short port, ECommandGroup cmd_group)
void CheckOutdatedJobs(const TNSBitVector &outdated_jobs, CNSClientsRegistry &clients_registry, const CNSPreciseTime ¬if_highfreq_period, ECommandGroup cmd_group)
void ClearExactGetNotifications(void)
list< SQueueResumeNotification > m_QueueResumeNotifications
CQueueDataBase & m_QueueDB
bool x_TestTimeout(const CNSPreciseTime ¤t_time, CNSClientsRegistry &clients_registry, list< SNSNotificationAttributes > &container, list< SNSNotificationAttributes >::iterator &record)
void CheckTimeout(const CNSPreciseTime ¤t_time, CNSClientsRegistry &clients_registry, ECommandGroup cmd_group)
void NotifyPeriodically(const CNSPreciseTime ¤t_time, unsigned int notif_lofreq_mult, CNSClientsRegistry &clients_registry)
CFastMutex m_StatusNotificationSocketLock
CDatagramSocket m_StatusNotificationSocket
string m_JobChangeNotifConstPart
list< SNSNotificationAttributes > m_ActiveListeners
char m_GetMsgBuffer[k_MessageBufferSize]
list< SNSNotificationAttributes >::iterator x_FindListener(list< SNSNotificationAttributes > &container, unsigned int address, unsigned short port, ECommandGroup cmd_group)
size_t m_GetMsgLengthObsoleteVersion
void RegisterListener(const CNSClientId &client, unsigned short port, unsigned int timeout, bool wnode_aff, bool any_job, bool exclusive_new_affinity, bool new_format, const TNSBitVector &groups, ECommandGroup cmd_group)
string Print(const CNSClientsRegistry &clients_registry, const CNSAffinityRegistry &aff_registry, const CNSGroupsRegistry &group_registry, bool verbose) const
CNSPreciseTime NotifyExactListeners(void)
CNSNotificationList(CQueueDataBase &qdb, const string &ns_node, const string &qname)
bool x_IsInExactList(unsigned int address, unsigned short port) const
static CNSPreciseTime Never(void)
static CNSPreciseTime Current(void)
TNSBitVector GetAllJobsInScopes(void) const
TNSBitVector GetJobs(const string &scope) const
@ eStatus_OK
Command is ok and execution is good.
@ eStatus_ServerError
Internal server error.
void WakeupNotifThread(void)
CNSPreciseTime SendExactNotifications(void)
void NotifyListeners(void)
Constant iterator designed to enumerate "ON" bits.
bool valid() const noexcept
Checks if iterator is still valid.
Bitvector Bit-vector container with runtime compression of bits.
bool any() const noexcept
Returns true if any bits in this bitset are set, and otherwise returns false.
bool set_bit(size_type n, bool val=true)
Sets bit n.
enumerator first() const
Returns enumerator pointing on the first non-zero bit.
static DLIST_TYPE *DLIST_NAME() first(DLIST_LIST_TYPE *list)
static const struct attribute attributes[]
void PrintRequestStop(void)
Print request stop message (for request-driven applications)
CDiagContext & GetDiagContext(void)
Get diag context instance.
static void SetRequestContext(CRequestContext *ctx)
Shortcut to CDiagContextThreadData::GetThreadData().SetRequestContext()
void PrintRequestStart(const string &message)
Print request start message (for request-driven applications)
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
EJobStatus
Job status codes.
static string StatusToString(EJobStatus status)
Printable status type.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
EIO_Status Send(const void *data, size_t datalen, const string &host=string(), unsigned short port=0)
static string gethostbyaddr(const CNCBI_IPAddr &addr, ESwitch log=eOff)
Return empty string on error.
static string ntoa(const CNCBI_IPAddr &addr)
BSD-like API.
static string URLEncode(const CTempString str, EUrlEncode flag=eUrlEnc_SkipMarkChars)
URL-encode string.
unsigned short m_Port
TCP port to listen on.
static void SetCurrentThreadName(const CTempString &)
Set name for the current thread.
bool TryWait(unsigned int timeout_sec=0, unsigned int timeout_nsec=0)
Timed wait.
void Post(unsigned int count=1)
Increment the semaphore by "count".
const size_t k_MessageBufferSize
const CNSPreciseTime kTimeZero
const CNSPreciseTime kTimeNever
string NS_FormatPreciseTime(const CNSPreciseTime &t)
const string kNoScopeOnly
@ eProgressMessageChanged
Defines CRequestContext class for NCBI C++ diagnostic API.
static CNamedPipeClient * client
CNSPreciseTime m_TimeToSend
CNSPreciseTime m_HifreqNotifyLifetime
CNSPreciseTime m_Lifetime
string Print(const CNSClientsRegistry &clients_registry, const CNSAffinityRegistry &aff_registry, const CNSGroupsRegistry &group_registry, bool is_active, bool verbose) const
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4