m_LastID(0), m_BlacklistTimeout(), m_ReadBlacklistTimeout()
62 bool& client_was_found,
63 bool& session_was_reset,
65 bool& had_wn_pref_affs,
66 bool& had_reader_pref_affs)
68client_was_found =
false;
69session_was_reset =
false;
71had_wn_pref_affs =
false;
72had_reader_pref_affs =
false;
75 if(!
client.IsComplete())
88new_ns_client.
SetID(client_id);
95client_was_found =
true;
98 client.SetID(known_client->second.GetID());
99old_session = known_client->second.GetSession();
100 if(
client.GetSession() != old_session) {
101session_was_reset =
true;
103client_was_found, old_session, had_wn_pref_affs,
104had_reader_pref_affs);
107known_client->second.Touch(
client);
119 if(!
client.IsComplete())
125cl->second.RegisterSocketWriteError();
132 if(!
client.IsComplete())
138cl->second.SetLastScope(
client.GetScope());
144 unsigned inttype_to_append)
147 if(!
client.IsComplete())
153cl->second.AppendType(type_to_append);
165k->second.GCBlacklistedJobs(
tracker, cmd_group);
171 const string&
data,
intdata_version)
174 if(!
client.IsComplete())
176 "only non-anonymous clients may set their data");
184 "Cannot find client '"+
client.GetNode() +
185 "' to set client data");
187 returnfound->second.SetClientData(
data, data_version);
194 if(!
client.IsComplete())
202admin->second.MarkAsAdmin();
212 if(!
client.IsComplete())
220submitter->second.RegisterSubmittedJobs(
count);
231 if(!
client.IsComplete())
240 "Cannot find client '"+
client.GetNode() +
241 "' to register a job");
243cl->second.RegisterJob(job_id, cmd_group);
254 if(!
client.IsComplete())
263 "Cannot find client '"+
client.GetNode() +
264 "' to set blacklisted job");
266cl->second.RegisterBlacklistedJob(job_id, cmd_group);
277 if(!
client.IsComplete())
285cl->second.UnregisterJob(job_id, cmd_group);
301k->second.UnregisterJob(job_id, cmd_group);
312 if(!
client.IsComplete())
320cl->second.MoveJobToBlacklist(job_id, cmd_group);
336 if(k->second.MoveJobToBlacklist(job_id, cmd_group))
351 bool& client_was_found,
352 string& old_session,
353 bool& had_wn_pref_affs,
354 bool& had_reader_pref_affs)
356client_was_found =
false;
359 if(
client.IsComplete())
361client_was_found, old_session, had_wn_pref_affs,
362had_reader_pref_affs);
370 bool& client_was_found,
371 string& old_session,
372 bool& had_wn_pref_affs,
373 bool& had_reader_pref_affs)
375client_was_found =
false;
381client_was_found =
true;
382old_session = cl->second.GetSession();
383cl->second.SetSession(
"");
387had_wn_pref_affs,
eGet);
389had_reader_pref_affs,
eRead);
403 const string& client_node,
408 boolhad_pref_affs =
client.HasPreferredAffinities(cmd_group);
412 client.GetPreferredAffinities(cmd_group),
414 client.ClearPreferredAffinities(cmd_group);
417 stringaff_part =
"get";
418 if(cmd_group ==
eRead)
423 ctx->SetRequestID();
426.Print(
"_type",
"client_watch")
427.Print(
"client_node", client_node)
428.Print(
"client_session",
client.GetSession())
429.Print(aff_part +
"_preferred_affinities_reset",
435 client.SetAffinityReset(
true, cmd_group);
455 while(en.
valid()) {
459 if(batch.
count() >= batch_size) {
465 if(batch.
count() > 0)
483 if(batch.
get_bit(k->second.GetID())) {
488 if(printed >= batch.
count())
504 if(!
client.IsComplete())
513 "Cannot find client '"+
client.GetNode() +
514 "' to set waiting attributes");
516node->second.SetWaitPort(port, cmd_group);
517node->second.SetWaitAffinities(aff_ids, cmd_group);
530 booltouch_notif_registry)
532 boolret_val =
false;
534 if(
client.HasWaitAffinities(cmd_group))
537 client.GetWaitAffinities(cmd_group),
541 unsigned shortport =
client.GetWaitPort(cmd_group);
548 if(touch_notif_registry)
554 client.CancelWaiting(cmd_group);
564 if(
client.IsComplete())
573 booltouch_notif_registry)
575 if(node_name.empty())
582 return CancelWaiting(cl->second, cmd_group, touch_notif_registry);
593 if(
client.IsComplete())
600 const string& client_node,
609found->second.SubtractBlacklistedJobs(cmd_group, bv);
615 string& scope,
string& virtual_scope)
618virtual_scope.clear();
625scope = found->second.GetLastScope();
626virtual_scope = found->second.GetVirtualScope(client_node);
637 if(
client.IsComplete())
644 const string& client_node,
653found->second.AddBlacklistedJobs(cmd_group, bv);
661 if(!
client.IsComplete())
679 returnfound->second.GetPreferredAffinities(cmd_group);
688 if(cmd_group ==
eGet)
698 if(!
client.IsComplete())
716 returnfound->second.GetWaitAffinities(cmd_group);
733 if(!
client.IsComplete())
742 "Cannot find client '"+
client.GetNode() +
743 "' to update preferred affinities");
749found->second.AddPreferredAffinities(aff_to_add, cmd_group);
750found->second.RemovePreferredAffinities(aff_to_del, cmd_group);
753 if(aff_to_del.
any())
756 if(aff_to_add.
any()) {
757 if(cmd_group ==
eGet)
768 unsigned intaff_to_add,
769 unsigned intaff_to_del,
772 if(aff_to_add + aff_to_del == 0)
775 if(!
client.IsComplete())
778 boolaff_added =
false;
785 "Cannot find client '"+
client.GetNode() +
786 "' to update preferred affinities");
788 if(aff_to_add != 0) {
791aff_added = found->second.AddPreferredAffinity(aff_to_add,
795 if(aff_to_del != 0) {
798found->second.RemovePreferredAffinity(aff_to_del, cmd_group);
806 if(cmd_group ==
eGet)
821 if(!
client.IsComplete())
824 stringclient_name =
client.GetNode();
831 "Cannot find client '"+
client.GetNode() +
832 "' to update preferred affinities");
834 TNSBitVectorcurr_affs = found->second.GetPreferredAffinities(cmd_group);
838 if(aff_to_add.
any())
842 if(aff_to_del.
any())
846found->second.SetPreferredAffinities(aff_to_set, cmd_group);
849 if(aff_to_del.
any())
852 if(aff_to_add.
any()) {
853 if(cmd_group ==
eGet)
862 if(cmd_group ==
eGet)
886 returnnode->second.IsRequestedAffinity(aff, use_preferred, cmd_group);
895 if(cmd_group ==
eGet)
905 if(!
client.IsComplete())
915 returnfound->second.GetAffinityReset(cmd_group);
925 if(k->second.GetID() ==
id)
938 if(current_time -
client.GetLastAccess() <= timeout)
943 unsigned shortwait_port =
client.GetWaitPort(cmd_group);
944 unsigned intwait_address =
client.GetPeerAddress();
945 if(wait_port != 0) {
947GetPassiveNotificationLifetime(wait_address,
950 if(current_time <= get_lifetime)
970 state= k->second.GetState();
974 type= k->second.GetType();
995k->second,
eRead)) {
1009 unsigned intmin_worker_nodes,
1011 unsigned intmin_admins,
1013 unsigned intmin_submitters,
1015 unsigned intmin_readers,
1017 unsigned intmin_unknowns,
1023timeout_reader, min_readers, is_log);
1024 x_PurgeAdmins(current_time, timeout_admin, min_admins, is_log);
1026 x_PurgeUnknowns(current_time, timeout_unknown, min_unknowns, is_log);
1045 if(cmd_group ==
eRead) {
1065 if(!
client.IsComplete())
1069 if(cmd_group ==
eGet)
1082 bool& had_pref_affs,
1085jobs =
client.GetJobs(cmd_group);
1086 client.ClearJobs(cmd_group);
1089had_pref_affs =
client.HasPreferredAffinities(cmd_group);
1090 if(had_pref_affs) {
1093 client.GetPreferredAffinities(cmd_group),
1095 client.ClearPreferredAffinities(cmd_group);
1104 if(cmd_group ==
eGet)
1121 return m_Clients[ lhs ].GetLastAccess() <
1135 unsigned intmin_worker_nodes,
1137 unsigned intmin_readers,
1141list< string > inactive_wns;
1142list< string > inactive_readers;
1143 unsigned inttotal_wn_count = 0;
1144 unsigned inttotal_reader_count = 0;
1150 type= k->second.GetType();
1155++total_reader_count;
1159 state= k->second.GetState();
1172 if(current_time - k->second.GetLastAccess() >
1173timeout_worker_node)
1174inactive_wns.push_back(k->first);
1185 if(current_time - k->second.GetLastAccess() >
1187inactive_readers.push_back(k->first);
1196 if(current_time - k->second.GetLastAccess() >
1197timeout_worker_node &&
1198current_time - k->second.GetLastAccess() >
1200inactive_wns.push_back(k->first);
1201inactive_readers.push_back(k->first);
1207 if(total_wn_count > min_worker_nodes && ! inactive_wns.empty()) {
1210 unsigned intactive_count = total_wn_count - inactive_wns.
size();
1211 unsigned intremove_count = 0;
1213 if(active_count >= min_worker_nodes)
1214remove_count = inactive_wns.size();
1216remove_count = total_wn_count - min_worker_nodes;
1222 for(list<string>::iterator j = inactive_wns.begin();
1223j != inactive_wns.end() && remove_count > 0; ++j, --remove_count) {
1236 ERR_POST(
"Garbage collected worker node list exceeds 100000 " 1237 "records. There are currently "<<
1240 ERR_POST(
"Garbage collected reader list exceeds 100000 " 1241 "records. There are currently "<<
1244list<string>::iterator found = find(inactive_readers.begin(),
1245inactive_readers.end(), *j);
1246 if(found != inactive_readers.end()) {
1248inactive_readers.erase(found);
1249--total_reader_count;
1258 if(total_reader_count > min_readers && ! inactive_readers.empty()) {
1261 unsigned intactive_count = total_reader_count -
1262inactive_readers.size();
1263 unsigned intremove_count = 0;
1265 if(active_count >= min_readers)
1266remove_count = inactive_readers.size();
1268remove_count = total_reader_count - min_readers;
1274 for(list<string>::iterator j = inactive_readers.begin();
1275j != inactive_readers.end() && remove_count > 0; ++j) {
1277 if(find(inactive_wns.begin(), inactive_wns.end(), *j) !=
1291 ERR_POST(
"Garbage collected reader list exceeds 100000 " 1292 "records. There are currently "<<
1306 unsigned intmin_admins,
1318 unsigned intmin_submitters,
1330 unsigned intmin_unknowns,
1341 unsigned intmin_clients,
1342 unsigned intclient_type,
1346list< string > inactive;
1347 unsigned intinactive_count = 0;
1353 type= k->second.GetType();
1355 if(client_type == 0) {
1359 if((
type& client_type) == 0)
1367 if(current_time - k->second.GetLastAccess() > timeout) {
1369inactive.push_back(k->first);
1373 if(
total_count<= min_clients || inactive_count == 0)
1377 unsigned intactive_count =
total_count- inactive_count;
1378 unsigned intremove_count = 0;
1379 if(active_count >= min_clients)
1380remove_count = inactive_count;
1388 for(list<string>::iterator j = inactive.begin();
1389j != inactive.end() && remove_count > 0; ++j, --remove_count) {
map< string, CNSClient > & m_Clients
bool operator()(const string &lhs, const string &rhs)
AgeFunctor(map< string, CNSClient > &clients)
void SetWaitClientForAffinities(unsigned int client_id, const TNSBitVector &aff_ids, ECommandGroup cmd_group)
void AddClientToAffinities(unsigned int client_id, const TNSBitVector &aff_ids, ECommandGroup cmd_group)
size_t RemoveClientFromAffinities(unsigned int client_id, const TNSBitVector &aff_ids, ECommandGroup cmd_group)
void AddClientToAffinity(unsigned int client_id, unsigned int aff_id, ECommandGroup cmd_group)
size_t RemoveWaitClientFromAffinities(unsigned int client_id, const TNSBitVector &aff_ids, ECommandGroup cmd_group)
void SetID(unsigned int id)
void x_ClearClient(const string &node_name, CNSClient &client, TNSBitVector &jobs, bool &had_pref_affs, ECommandGroup cmd_group)
void AppendType(const CNSClientId &client, unsigned int type_to_append)
bool IsPreferredByAny(unsigned int aff_id, ECommandGroup cmd_group) const
CNSAffinityRegistry * m_AffRegistry
void RegisterJob(const CNSClientId &client, unsigned int job_id, ECommandGroup cmd_group)
void x_BuildAffinities(ECommandGroup cmd_group)
void SetPreferredAffinities(const CNSClientId &client, const TNSBitVector &aff_to_set, ECommandGroup cmd_group)
set< string > m_GCReaderClients
CNSPreciseTime m_ReadBlacklistTimeout
bool x_CouldBeStale(const CNSPreciseTime ¤t_time, const CNSPreciseTime &timeout, const CNSClient &client, ECommandGroup cmd_group)
void SubtractBlacklistedJobs(const CNSClientId &client, ECommandGroup cmd_group, TNSBitVector &bv) const
void Purge(const CNSPreciseTime ¤t_time, const CNSPreciseTime &timeout_worker_node, unsigned int min_worker_nodes, const CNSPreciseTime &timeout_admin, unsigned int min_admins, const CNSPreciseTime &timeout_submitter, unsigned int min_submitters, const CNSPreciseTime &timeout_reader, unsigned int min_readers, const CNSPreciseTime &timeout_unknown, unsigned int min_unknowns, bool is_log)
set< string > m_GCWNodeClients
void StaleNodes(const CNSPreciseTime ¤t_time, const CNSPreciseTime &wn_timeout, const CNSPreciseTime &reader_timeout, bool is_log)
TNSBitVector GetRegisteredClients(void) const
string x_PrintSelected(const TNSBitVector &batch, const CQueue *queue, bool verbose) const
void AddToSubmitted(const CNSClientId &client, size_t count)
void x_PurgeWNodesAndReaders(const CNSPreciseTime ¤t_time, const CNSPreciseTime &timeout_worker_node, unsigned int min_worker_nodes, const CNSPreciseTime &timeout_reader, unsigned int min_readers, bool is_log)
CNSNotificationList * m_NotifRegistry
string PrintClientsList(const CQueue *queue, size_t batch_size, bool verbose) const
void UnregisterJob(const CNSClientId &client, unsigned int job_id, ECommandGroup cmd_group)
void ClearOnTimeout(CNSClient &client, const string &client_node, bool is_log, ECommandGroup cmd_group)
TNSBitVector GetAllPreferredAffinities(ECommandGroup cmd_group) const
void x_PurgeInactiveClients(const CNSPreciseTime ¤t_time, const CNSPreciseTime &timeout, unsigned int min_clients, unsigned int client_type, bool is_log)
void RegisterBlacklistedJob(const CNSClientId &client, unsigned int job_id, ECommandGroup cmd_group)
bool CancelWaiting(CNSClient &client, ECommandGroup cmd_group, bool touch_notif_registry=true)
map< string, CNSClient > m_Clients
TNSBitVector GetWaitAffinities(const CNSClientId &client, ECommandGroup cmd_group) const
void GetScopes(const string &client_node, string &scope, string &virtual_scope)
bool IsRequestedAffinity(const string &name, const TNSBitVector &aff, bool use_preferred, ECommandGroup cmd_group) const
void MoveJobToBlacklist(const CNSClientId &client, unsigned int job_id, ECommandGroup cmd_group)
void ClearClient(const CNSClientId &client, TNSBitVector &running_jobs, TNSBitVector &reading_jobs, bool &client_was_found, string &old_session, bool &had_wn_pref_affs, bool &had_reader_pref_affs)
string GetNodeName(unsigned int id) const
CNSPreciseTime m_BlacklistTimeout
void GCBlacklistedJobs(const CJobStatusTracker &tracker, ECommandGroup cmd_group)
void x_PurgeAdmins(const CNSPreciseTime ¤t_time, const CNSPreciseTime &timeout_admin, unsigned int min_admins, bool is_log)
void UpdatePreferredAffinities(const CNSClientId &client, const TNSBitVector &aff_to_add, const TNSBitVector &aff_to_del, ECommandGroup cmd_group)
TNSBitVector m_WNodeAffinities
void Touch(CNSClientId &client, TNSBitVector &running_jobs, TNSBitVector &reading_jobs, bool &client_was_found, bool &session_was_reset, string &old_session, bool &had_wn_pref_affs, bool &had_reader_pref_affs)
bool GetAffinityReset(const CNSClientId &client, ECommandGroup cmd_group) const
void SetLastScope(const CNSClientId &client)
TNSBitVector m_RegisteredClients
void MarkAsAdmin(const CNSClientId &client)
void x_PurgeSubmitters(const CNSPreciseTime ¤t_time, const CNSPreciseTime &timeout_submitter, unsigned int min_submitters, bool is_log)
void RegisterSocketWriteError(const CNSClientId &client)
int SetClientData(const CNSClientId &client, const string &data, int data_version)
unsigned int x_GetNextID(void)
TNSBitVector m_ReaderAffinities
void SetRegistries(CNSAffinityRegistry *aff_registry, CNSNotificationList *notif_registry)
void SetNodeWaiting(const CNSClientId &client, unsigned short port, const TNSBitVector &aff_ids, ECommandGroup cmd_group)
TNSBitVector GetPreferredAffinities(const CNSClientId &client, ECommandGroup cmd_group) const
bool WasGarbageCollected(const CNSClientId &client, ECommandGroup cmd_group) const
void x_PurgeUnknowns(const CNSPreciseTime ¤t_time, const CNSPreciseTime &timeout_unknown, unsigned int min_unknowns, bool is_log)
void AddBlacklistedJobs(const CNSClientId &client, ECommandGroup cmd_group, TNSBitVector &bv) const
void UnregisterListener(const CNSClientId &client, unsigned short port, ECommandGroup cmd_group)
NetSchedule internal exception.
@ eStatus_OK
Command is ok and execution is good.
Constant iterator designed to enumerate "ON" bits.
bool valid() const noexcept
Checks if iterator is still valid.
Bitvector Bit-vector container with runtime compression of bits.
bool get_bit(size_type n) const noexcept
returns true if bit n is set and false is bit n is 0.
bool any() const noexcept
Returns true if any bits in this bitset are set, and otherwise returns false.
bool set_bit(size_type n, bool val=true)
Sets bit n.
enumerator first() const
Returns enumerator pointing on the first non-zero bit.
void clear(const size_type *ids, size_type ids_size, bm::sort_order so=bm::BM_UNKNOWN)
clear list of bits in this bitset
size_type count() const noexcept
population count (count of ON bits)
const_iterator begin() const
const_iterator end() const
const_iterator find(const key_type &key) const
iterator_bool insert(const value_type &val)
const_iterator find(const key_type &key) const
const_iterator end() const
void PrintRequestStop(void)
Print request stop message (for request-driven applications)
CDiagContext & GetDiagContext(void)
Get diag context instance.
static void SetRequestContext(CRequestContext *ctx)
Shortcut to CDiagContextThreadData::GetThreadData().SetRequestContext()
void PrintRequestStart(const string &message)
Print request start message (for request-driven applications)
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
const TNSBitVector kEmptyBitVector
static unsigned long int total_count
Defines CRequestContext class for NCBI C++ diagnostic API.
static CNamedPipeClient * client
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4