max_queues,
64: m_Host(server->GetBackgroundHost()),
65m_MaxQueues(max_queues),
68m_FreeStatusMemCnt(0),
69m_LastFreeMem(time(0)),
72m_PurgeStatusIndex(0),
86vector<string> refuse_submit_queues =
135 if(data_dir.
Exists()) {
137 "value set to true but the previous run " 138 "directory is on the disk. It will be removed.");
139 if(!data_dir.
Remove()) {
143 "Error removing the previous run data " 145 " (due to the cofiguration file sets " 146 "the [server]/diskless value to true)");
149}
else if(!data_dir.
Exists()) {
170 stringlast_queue_load_error;
171 size_tqueue_load_error_count = 0;
175 size_tfinal_dynamic_count = 0;
177k = dump_dynamic_queues.begin();
178k != dump_dynamic_queues.end(); ++k)
179 if(config_static_queues.
find(k->first) == config_static_queues.
end())
180++final_dynamic_count;
182 size_ttotal_queues = final_dynamic_count +
183config_static_queues.
size();
185 string msg=
"The initial number of queues on the server exceeds the " 186 "configured max number of queues. Configured: "+
188to_string(total_queues) +
". The limit will " 189 "be extended to accomodate all the queues.";
213k != dump_queue_classes.
end(); ++k) {
227k = dump_dynamic_queues.begin();
228k != dump_dynamic_queues.end(); ++k) {
229 stringqname = k->first;
230 if(config_static_queues.
find(qname) != config_static_queues.
end())
235 stringqclass = k->second;
255 unsigned intrecords =
258.
Print(
"_type",
"startup")
259.
Print(
"_queue", k->first)
260.
Print(
"info",
"load_from_dump")
261.
Print(
"records", records);
262}
catch(
constexception & ex) {
264last_queue_load_error = ex.what();
265++queue_load_error_count;
267last_queue_load_error =
"Unknown error loading queue "+
268k->first +
" from dump";
270++queue_load_error_count;
274}
catch(
constexception & ex) {
276last_queue_load_error = ex.what();
277++queue_load_error_count;
279last_queue_load_error =
"Unknown error loading queues from dump";
281++queue_load_error_count;
295 if(queue_load_error_count > 0) {
297 "There were error(s) loading the previous " 298 "instance dump. Number of errors: "+
299to_string(queue_load_error_count) +
300 ". See log for all the loading errors.\n" 301 "Last error: "+ last_queue_load_error);
318list<string> sections;
322 ITERATE(list<string>, it, sections) {
325 const string& section_name = *it;
330 if(queue_class.empty())
339vector<string> warnings;
343queue_classes[queue_class] = params;
347 returnqueue_classes;
358list<string> sections;
361 ITERATE(list<string>, it, sections) {
364 const string& section_name = *it;
369 if(queue_name.empty())
378vector<string> warnings;
380 if(params.
ReadQueue(reg, section_name, classes, warnings)) {
382queues[queue_name] = params;
394 typedefmap< string, map< string, string > > section_container;
395section_container new_values;
396list<string> sections;
399 ITERATE(list<string>, it, sections) {
400 stringqueue_or_class;
402 const string& section_name = *it;
405 if(queue_or_class.empty())
417 const string& entry = *k;
418 stringref_section = reg.
GetString(section_name,
424 if(entry ==
"linked_section_")
427 if(ref_section.empty())
430 if(find(sections.begin(), sections.end(), ref_section) ==
434 if(new_values.find(ref_section) != new_values.end())
438list<string> linked_section_entries;
441 for(list<string>::const_iterator j = linked_section_entries.begin();
442j != linked_section_entries.end(); ++j)
445new_values[ref_section] = values;
452vector<string> deleted;
455 if(new_values.find(k->first) == new_values.end())
456deleted.push_back(k->first);
458 if(!deleted.empty()) {
460 for(vector<string>::const_iterator k(deleted.begin());
461k != deleted.end(); ++k)
463diff.
SetByKey(
"linked_section_deleted", deletedSections );
467vector<string> added;
468 for(section_container::const_iterator k(new_values.begin());
469k != new_values.end(); ++k)
471added.push_back(k->first);
473 if(!added.empty()) {
475 for(vector<string>::const_iterator k(added.begin());
476k != added.end(); ++k)
478diff.
SetByKey(
"linked_section_added", addedSections );
482vector<string> changed;
483 for(section_container::const_iterator k(new_values.begin());
484k != new_values.end(); ++k) {
485 if(find(added.begin(), added.end(), k->first) != added.end())
489changed.push_back(k->first);
492 if(!changed.empty()) {
494 for(vector<string>::const_iterator k(changed.begin());
495k != changed.end(); ++k)
499diff.
SetByKey(
"linked_section_changed", changedSections );
515vector<string> deleted;
517k != old_values.
end(); ++k)
518 if(new_values.
find(k->first) == new_values.
end())
519deleted.push_back(k->first);
520 if(!deleted.empty()) {
522 for(vector<string>::const_iterator k(deleted.begin());
523k != deleted.end(); ++k)
525diff.
SetByKey(
"deleted", deletedValues );
529vector<string> added;
531k != new_values.
end(); ++k)
532 if(old_values.
find(k->first) == old_values.
end())
533added.push_back(k->first);
534 if(!added.empty()) {
536 for(vector<string>::const_iterator k(added.begin());
537k != added.end(); ++k)
539diff.
SetByKey(
"added", addedValues );
543vector<string> changed;
545k != new_values.
end(); ++k) {
546 if(old_values.
find(k->first) == old_values.
end())
548 if(old_values.
find(k->first)->second ==
549new_values.
find(k->first)->second)
551changed.push_back(k->first);
553 if(!changed.empty()) {
555 for(vector<string>::const_iterator k(changed.begin());
556k != changed.end(); ++k) {
560changedValues.
SetByKey( *k, values );
562diff.
SetByKey(
"changed", changedValues );
577k != queues_from_ini.
end(); ++k) {
584 "Configuration error. The queue '"+ k->first +
585 "' clashes with a currently existing " 586 "dynamic queue of the same name.");
596 unsigned intadd_count = 0;
599k != queues_from_ini.
end(); ++k) {
615 boolhas_changes =
false;
616vector<string> classes;
621 stringold_queue_class = k->first;
623 if(classes_from_ini.
find(old_queue_class) != classes_from_ini.
end())
632 if(k->second.delete_request)
635k->second.delete_request =
true;
636classes.push_back(old_queue_class);
639 if(!classes.empty()) {
642 for(vector<string>::const_iterator k = classes.begin();
643k != classes.end(); ++k)
645diff.
SetByKey(
"deleted_queue_classes", deleted_classes );
656 stringqueue_class = k->first;
658classes_from_ini.
find(queue_class);
660 if(new_class == classes_from_ini.
end())
664 if(k->second.delete_request) {
667k->second = new_class->second;
668classes.push_back(queue_class);
675 CJsonNodeclass_diff = k->second.Diff(new_class->second,
678 if(class_diff.
GetSize() > 0) {
680k->second = new_class->second;
682class_changes.
SetByKey(queue_class, class_diff);
686 if(class_changes.
GetSize() > 0)
687diff.
SetByKey(
"queue_class_changes", class_changes);
691k != classes_from_ini.
end(); ++k) {
692 stringnew_queue_class = k->first;
696classes.push_back(new_queue_class);
700 if(!classes.empty()) {
703 for(vector<string>::const_iterator k = classes.begin();
704k != classes.end(); ++k)
706diff.
SetByKey(
"added_queue_classes", added_classes);
719 boolhas_changes =
false;
720vector<string> deleted_queues;
729 stringold_queue = k->first;
730 if(queues_from_ini.
find(old_queue) != queues_from_ini.
end())
738 if(k->second.first.delete_request)
742queue->SetRefuseSubmits(
true);
744k->second.first.delete_request =
true;
745deleted_queues.push_back(k->first);
748 if(!deleted_queues.empty()) {
751 for(vector<string>::const_iterator k = deleted_queues.begin();
752k != deleted_queues.end(); ++k)
754diff.
SetByKey(
"deleted_queues", deleted);
760 string> > added_queues;
768k->second.second->UpdatePerfLoggingSettings(k->second.first.qclass);
775 stringqueue_name = k->first;
777queues_from_ini.
find(queue_name);
779 if(new_queue == queues_from_ini.
end())
784 if(k->second.first.delete_request) {
788queue->SetParameters(new_queue->second);
789queue->SetRefuseSubmits(
false);
793k->second.first = new_queue->second;
794added_queues.push_back(make_pair(queue_name,
795k->second.first.qclass));
803 CJsonNodequeue_diff = k->second.first.Diff(new_queue->second,
806 if(queue_diff.
GetSize() > 0) {
809queue->SetParameters(new_queue->second);
813k->second.first = new_queue->second;
815section_changes.
SetByKey(queue_name, queue_diff);
826 if(k->second.first.delete_request ==
true)
834 ERR_POST(
"Cannot find class '"+ k->second.first.qclass +
835 "' for dynamic queue '"+ k->first +
836 "'. Unexpected internal data inconsistency.");
845 CJsonNodeclass_diff = k->second.first.Diff(queue_class->second,
847 if(class_diff.
GetSize() > 0) {
850 stringold_class = k->second.first.qclass;
851 stringold_description = k->second.first.description;
854queue->SetParameters(queue_class->second);
856k->second.first = queue_class->second;
857k->second.first.qclass = old_class;
858k->second.first.description = old_description;
861section_changes.
SetByKey(k->first, class_diff);
866 if(section_changes.
GetSize() > 0)
867diff.
SetByKey(
"queue_changes", section_changes);
872k != queues_from_ini.
end(); ++k) {
873 stringnew_queue_name = k->first;
877added_queues.push_back(make_pair(new_queue_name, k->second.qclass));
881 if(!added_queues.empty()) {
884 for(vector< pair<string, string> >::const_iterator
885k = added_queues.begin();
886k != added_queues.end(); ++k)
888diff.
SetByKey(
"added_queues", added);
917 if(to_add_count > available_count)
919 "New configuration slots requirement: "+
920to_string(to_add_count) +
921 ". Number of available slots: "+
922to_string(available_count) +
".");
939min_precision =
std::min(min_precision,
940k->second.CalculateRuntimePrecision());
943min_precision =
std::min(min_precision,
944k->second.first.CalculateRuntimePrecision());
945 returnmin_precision;
951 unsigned int cnt= 0;
956 cnt+= k->second.second->CountActiveJobs();
964 unsigned int cnt= 0;
969 cnt+= k->second.second->CountAllJobs();
981 if(k->second.second->AnyJobs())
995 "Queue '"+ name +
"' is not found.");
997 returnfound->second.second;
1008q->SetParameters(params);
1010 m_Queues[qname] = make_pair(params, q.release());
1013.
Print(
"_type",
"startup")
1014.
Print(
"_queue", qname)
1016.
Print(
"info",
"mount");
1028 const string& qname,
1029 const string& qclass,
1030 const string& description)
1037 "Queue '"+ qname +
"' already exists.");
1043 "Queue class '"+ qclass +
1044 "' for queue '"+ qname +
"' is not found.");
1047 if(queue_class->second.delete_request)
1049 "Queue class '"+ qclass +
1050 "' for queue '"+ qname +
"' is marked for deletion.");
1055 "Cannot allocate queue '"+ qname +
1056 "'. max_queues limit reached.");
1060 if(!
client.IsAdmin()) {
1061 if(!queue_class->second.subm_hosts.empty()) {
1063acl.
SetHosts(queue_class->second.subm_hosts);
1066 " to create a dynamic queue");
1068 "Access denied: submitter privileges required");
1071 if(!queue_class->second.program_name.empty()) {
1089 "to create a dynamic queue");
1091 "Access denied: program privileges required");
1110 const string& qname)
1117 "Queue '"+ qname +
"' is not found.");
1121 "Queue '"+ qname +
"' is static and cannot be deleted.");
1125 if(!
client.IsAdmin()) {
1126 if(!queue->IsSubmitAllowed(
client.GetAddress()))
1128 "Access denied: submitter privileges required");
1129 if(!queue->IsProgramAllowed(
client.GetProgramName()))
1131 "Access denied: program privileges required");
1134found_queue->second.first.delete_request =
true;
1135queue->SetRefuseSubmits(
true);
1146 "Queue '"+ qname +
"' is not found.");
1160params.
refuse_submits= found->second.second->GetRefuseSubmits();
1161params.
pause_status= found->second.second->GetPauseStatus();
1163params.
aff_slots_used= found->second.second->GetAffSlotsUsed();
1168params.
clients= found->second.second->GetClientsCount();
1169params.
groups= found->second.second->GetGroupsCount();
1170params.
gc_backlog= found->second.second->GetGCBacklogCount();
1171params.
notif_count= found->second.second->GetNotifCount();
1183 names+= k->first + sep;
1198 size_taff_count = 0;
1207 LOG_POST(
"Drained shutdown: the DB has been successfully drained");
1215 "Drained shutdown: DB draining has not been completed " 1216 "when a hard shutdown is received. " 1217 "Shutting down immediately.");
1243 result+=
"OK:[queue "+ k->first +
"]\n"+
1244k->second.second->PrintTransitionCounters();
1252vector<string> warnings;
1258 result+=
"OK:[queue "+ k->first +
"]\n"+
1259k->second.second->PrintJobsStat(
client,
"",
"", warnings);
1278 output.append(
"OK:[qclass ")
1282.append(k->second.GetPrintableParameters(
false,
false));
1285j = k->second.linked_sections.begin();
1286j != k->second.linked_sections.end(); ++j) {
1287 stringprefix((j->first).c_str() + strlen(
"linked_section_"));
1288 stringsection_name = j->second;
1292m != values.
end(); ++m)
1314 output.append(
"[qclass_")
1318.append(k->second.ConfigSection(
true));
1336 output+=
"OK:[queue "+ k->first +
"]\n"+
1340j = k->second.first.linked_sections.begin();
1341j != k->second.first.linked_sections.end(); ++j) {
1342 stringprefix((j->first).c_str() + strlen(
"linked_section_"));
1343 stringsection_name = j->second;
1347m != values.
end(); ++m)
1348 output+=
"\nOK:"+ prefix +
"."+ m->first +
": "+ m->second;
1362 output+=
"[queue_"+ k->first +
"]\n"+
1363k->second.first.ConfigSection(
false);
1379 output+=
"["+ k->first +
"]\n";
1381j != k->second.end(); ++j) {
1382 output+= j->first +
"=\""+ j->second +
"\"\n";
1393map< string, map<string, string> >::const_iterator found =
1397 returnfound->second;
1409 intpause_state = k->second.second->GetPauseStatus();
1411pause_states[k->first] = pause_state;
1413 returnpause_states;
1420vector<string> refuse_submit_queues;
1425 if(k->second.second->GetRefuseSubmits())
1426refuse_submit_queues.push_back(k->first);
1428 returnrefuse_submit_queues;
1435 for(
unsigned intindex = 0; ; ++index) {
1439queue->NotifyListenersPeriodically(current_time);
1450k->second.second->PrintStatistics(aff_count);
1460k->second.second->PrintJobCounters();
1466 for(
unsigned intindex = 0; ; ++index) {
1470queue->CheckExecutionTimeout(logging);
1482list< pair< string, CRef< CQueue > > > candidates;
1488 if(k->second.first.delete_request)
1489candidates.push_back(make_pair(k->first, k->second.second));
1493list< pair< string, CRef< CQueue > > >::iterator
1494k = candidates.begin();
1495 while(k != candidates.end()) {
1496 if(k->second->IsEmpty() ==
false)
1497k = candidates.erase(k);
1502 if(candidates.empty())
1508 for(k = candidates.begin(); k != candidates.end(); ++k) {
1520vector< string > classes_to_delete;
1523 if(j->second.delete_request) {
1524 boolin_use =
false;
1527 if(m->second.first.qclass == j->first) {
1532 if(in_use ==
false)
1533classes_to_delete.push_back(j->first);
1538k != classes_to_delete.end(); ++k) {
1562 size_ttotal_scanned = 0;
1563 size_ttotal_mark_deleted = 0;
1565 boollimit_reached =
false;
1576 while(current_queue.
IsNull() ==
false) {
1581max_scanned, max_mark_deleted,
1583total_scanned, total_mark_deleted) ==
true)
1586 if(total_mark_deleted >= max_mark_deleted ||
1587total_scanned >= max_scanned) {
1588limit_reached =
true;
1596 if(limit_reached ==
false) {
1598 while(current_queue.
IsNull() ==
false) {
1599 if(current_queue->GetQueueName() == start_queue->GetQueueName())
1606max_scanned, max_mark_deleted,
1608total_scanned, total_mark_deleted) ==
true)
1611 if(total_mark_deleted >= max_mark_deleted ||
1612total_scanned >= max_scanned) {
1613limit_reached =
true;
1622 if(limit_reached ==
false) {
1623 if(start_queue.
IsNull() ==
false) {
1625 if(start_status_index > 0) {
16270, start_status_index - 1,
1629max_scanned, max_mark_deleted,
1631total_scanned, total_mark_deleted) ==
true)
1637 if(limit_reached ==
false) {
1638 if(start_queue.
IsNull() ==
false) {
1641start_status_index, start_status_index,
1643max_scanned, max_mark_deleted,
1645total_scanned, total_mark_deleted) ==
true)
1671 returnit->second.second;
1677 intqueue_num = ((rand() * 1.0) / RAND_MAX) *
m_Queues.
size();
1682 returnit->second.second;
1710 if(it->first == current_name) {
1714 returnit->second.second;
1728 size_tstatus_to_end,
1729 unsigned intstart_job_id,
1730 unsigned intend_job_id,
1732 size_tmax_mark_deleted,
1734 size_t& total_scanned,
1735 size_t& total_mark_deleted)
1739 for(; status <= status_to_end; ++status) {
1740purge_io.
scans= max_scanned - total_scanned;
1741purge_io.
deleted= max_mark_deleted - total_mark_deleted;
1742purge_io.
job_id= start_job_id;
1747total_scanned += purge_io.
scans;
1748total_mark_deleted += purge_io.
deleted;
1754 if(total_mark_deleted >= max_mark_deleted ||
1755total_scanned >= max_scanned) {
1771 if(!crash_file.
Exists()) {
1780 ERR_POST(
"Error creating crash detection file.");
1790 if(crash_file.
Exists())
1794 ERR_POST(
"Error removing crash detection file. When the server " 1795 "restarts it will re-initialize the database.");
1811 if(!crash_file.
Exists()) {
1820 ERR_POST(
"Error creating dump error detection file.");
1837 if(crash_file.
Exists())
1841 ERR_POST(
"Error removing dump error detection file. When the server " 1842 "restarts it will set an alert.");
1850 unsigned intdel_rec = 0;
1854 for(
unsigned intindex = 0; ; ++index) {
1858del_rec += queue->DeleteBatch(max_deleted - del_rec);
1859 if(del_rec >= max_deleted)
1870 static const intkMemFree_Delay = 15 * 60;
1871 static const unsignedkRecordThreshold = 1000000;
1878 for(
unsigned intindex = 0; ; ++index) {
1882queue->OptimizeMem();
1910 unsigned intsec_delay = purge_timeout;
1911 unsigned intnanosec_delay = (purge_timeout - sec_delay)*1000000000;
1914 m_Host, *
this, sec_delay, nanosec_delay,
1933 for(
unsigned intindex = 0; ; ++index) {
1937queue->PurgeAffinities();
1946 for(
unsigned intindex = 0; ; ++index) {
1950queue->PurgeGroups();
1964 if(current_time.
Sec() == last_purge.
Sec())
1966last_purge = current_time;
1968 for(
unsigned intindex = 0; ; ++index) {
1972queue->StaleNodes(current_time);
1986 if(current_time - last_time < period)
1989last_time = current_time;
1991 for(
unsigned intindex = 0; ; ++index) {
1995queue->PurgeBlacklistedJobs();
2009 if(current_time - last_time < period)
2012last_time = current_time;
2014 for(
unsigned intindex = 0; ; ++index) {
2018queue->PurgeClientRegistry(current_time);
2028 unsigned intcurrent_index = 0;
2033 if(current_index == index)
2034 returnk->second.second;
2045*
this, 0, 100000000,
2074 for(
unsigned intindex = 0; ; ++index) {
2078from_queue = queue->NotifyExactListeners();
2079 if(from_queue <
next)
2112run_delay.
Sec(), run_delay.
NSec(),
2134 if(!dump_dir.
Exists())
2142 booldump_error =
false;
2144 const stringlbsm_test_queue(
"LBSMDTestQueue");
2150dumped_queues.
insert(k->first);
2151}
catch(
constexception & ex) {
2153 ERR_POST(
"Error dumping queue "<< k->first <<
": "<<
2171 if(dumped_queues.
find(k->first) == dumped_queues.
end())
2174classes_to_dump.
insert(k->second.first.qclass);
2176j = k->second.first.linked_sections.begin();
2177j != k->second.first.linked_sections.end(); ++j)
2178linked_sections_to_dump.
insert(j->second);
2179dynamic_queues_to_dump.
insert(k->first);
2184 if(!classes_to_dump.
empty()) {
2185 stringqclasses_dump_file_name =
m_DumpPath+
2187 stringlinked_sections_dump_file_name =
m_DumpPath+
2189FILE * qclasses_dump_file =
NULL;
2190FILE * linked_sections_dump_file =
NULL;
2192qclasses_dump_file = fopen(qclasses_dump_file_name.c_str(),
"wb");
2193 if(qclasses_dump_file ==
NULL)
2194 throwruntime_error(
"Cannot open file "+
2195qclasses_dump_file_name);
2196setbuf(qclasses_dump_file,
NULL);
2198 if(classes_to_dump.
size() > 0 ||
2199dynamic_queues_to_dump.
size() > 0) {
2202header.
Write(qclasses_dump_file);
2207k != classes_to_dump.
end(); ++k) {
2211queue_class->second);
2216k = dynamic_queues_to_dump.
begin();
2217k != dynamic_queues_to_dump.
end(); ++k) {
2220q->second.first.qclass,
true,
2224fclose(qclasses_dump_file);
2225qclasses_dump_file =
NULL;
2228 if(!linked_sections_to_dump.
empty()) {
2229linked_sections_dump_file = fopen(
2230linked_sections_dump_file_name.c_str(),
"wb");
2231 if(linked_sections_dump_file ==
NULL)
2232 throwruntime_error(
"Cannot open file "+
2233linked_sections_dump_file_name);
2234setbuf(linked_sections_dump_file,
NULL);
2238header.
Write(linked_sections_dump_file);
2241k = linked_sections_to_dump.
begin();
2242k != linked_sections_to_dump.
end(); ++k) {
2243map<string, map<string, string> >::const_iterator
2248fclose(linked_sections_dump_file);
2249linked_sections_dump_file =
NULL;
2251}
catch(
constexception & ex) {
2253 ERR_POST(
"Error dumping dynamic queue classes and " 2254 "their linked sections. Dynamic queue dumps are lost.");
2255 if(qclasses_dump_file !=
NULL)
2256fclose(qclasses_dump_file);
2257 if(linked_sections_dump_file !=
NULL)
2258fclose(linked_sections_dump_file);
2261 if(access(qclasses_dump_file_name.c_str(), F_OK) != -1)
2262 remove(qclasses_dump_file_name.c_str());
2263 if(access(linked_sections_dump_file_name.c_str(), F_OK) != -1)
2264 remove(linked_sections_dump_file_name.c_str());
2268k = dynamic_queues_to_dump.
begin();
2269k != dynamic_queues_to_dump.
end(); ++k) {
2283 const string& qname,
2284 const string& qclass,
2292memcpy(descr_dump.
qname, qname.data(), qname.size());
2294memcpy(descr_dump.
qclass, qclass.data(), qclass.size());
2366 if(!prefixes.empty()) {
2370prefixes += k->first;
2382}
catch(
constexception & ex) {
2383 string msg=
"Writing error while dumping queue ";
2387 msg+=
"class "+ qclass;
2389 throwruntime_error(
msg);
2398k != values.
end(); ++k) {
2402memcpy(section_dump.
section, sname.data(), sname.size());
2404memcpy(section_dump.
value_name, k->first.data(), k->first.size());
2406memcpy(section_dump.
value, k->second.data(), k->second.size());
2409section_dump.
Write(
f);
2410}
catch(
constexception & ex) {
2411 throwruntime_error(
"Writing error while dumping linked section "+
2412sname +
" values: "+ ex.what());
2424}
catch(
constexception & ex) {
2425 ERR_POST(
"Error removing the dump directory: "<< ex.what());
2427 ERR_POST(
"Unknown error removing the dump directory");
2436 if(!data_dir.
Exists())
2441 for(CDir::TEntries::const_iterator k =
entries.begin();
2447 stringentryName = (*k)->GetName();
2470 ERR_POST(
"Reinitialization due to the server " 2471 "did not stop gracefully last time. " 2474 "reinitialized due to the server did not " 2475 "stop gracefully last time");
2480 LOG_POST(
Note<<
"Reinitialization due to a command line option. " 2483 "to a command line option");
2492 if(dump_dir_exists && !ver_file_exists) {
2497 "Error detected: Storage version file is not found " 2498 "while the dump directory exists.");
2501 if(dump_dir_exists && ver_file_exists) {
2504 size_tread_count = 0;
2511read_count =
f.Read(
buf,
sizeof(
buf) - 1);
2514 buf[read_count] =
'\0';
2515vector<string> lines;
2518 if(lines.size() > 2) {
2520 "Unexpected format of the storage version file: more " 2521 "than 2 lines found");
2522}
else if(lines.size() == 0) {
2524 "Unexpected format of the storage version file: no " 2530 stringserver_version = ver_info.
Print();
2532 if(lines.size() >= 2) {
2533 if(server_version != lines[1]) {
2535 "NetSchedule version has changed. " 2536 "Previous server version: "<< lines[1] <<
2537 " Current version: "<< server_version);
2539}
else if(lines.size() == 1) {
2541 "NetSchedule version has changed. " 2542 "Previous server version is unknown. " 2543 " Current version: "<< server_version);
2546 if(lines.size() >= 1) {
2549 "Storage version mismatch detected. " 2550 "Dumped version: "<< lines[0] <<
2556 if(!dump_dir_exists && ver_file_exists) {
2557 LOG_POST(
Note<<
"Non-empty data directory exists however the " 2559<<
" subdirectory is not found");
2561 "Non-empty data directory exists " 2563 " subdirectory is not found");
2568 string msg=
"The previous instance of the server had problems with " 2569 "dumping the information on the disk. Some queues may be " 2571 "See the previous instance log for details.";
2584 stringserver_version = ver_info.
Print();
2590 f.Write(
"\n", 1);
2591 f.Write(server_version.data(), server_version.size());
2598map<
string,
string,
2599 PNocase> & dump_dynamic_queues,
2603 if(!dump_dir.
Exists())
2608 CFilequeue_desrc_file(queue_desrc_file_name);
2609 if(queue_desrc_file.
Exists()) {
2610FILE *
f= fopen(queue_desrc_file_name.c_str(),
"rb");
2612 throwruntime_error(
"Cannot open the existing dump file " 2613 "for reading: "+ queue_desrc_file_name);
2624dump_dynamic_queues[qname] = qclass;
2671max_pending_read_wait_timeout);
2677client_registry_timeout_worker_node);
2682client_registry_timeout_admin);
2687client_registry_timeout_submitter);
2692client_registry_timeout_reader);
2697client_registry_timeout_unknown);
2702 stringdump_prefs(dump_struct.
2703linked_section_prefixes,
2705linked_section_prefixes_size);
2706 stringdump_names(dump_struct.
2707linked_section_names,
2709linked_section_names_size);
2710list<string> prefixes;
2711list<string>
names;
2718list<string>::const_iterator pref_it = prefixes.begin();
2719list<string>::const_iterator names_it =
names.begin();
2720 for( ; pref_it != prefixes.end() &&
2721names_it !=
names.end(); ++pref_it, ++names_it)
2725dump_queue_classes[qclass] = p;
2728}
catch(
constexception & ex) {
2738 for(CDir::TEntries::const_iterator k =
entries.begin();
2744 stringentry_name = (*k)->GetName();
2751 if(dump_dynamic_queues.find(qname) == dump_dynamic_queues.end())
2752dump_static_queues.
insert(qname);
2761list<string> sections;
2764 for(list<string>::const_iterator k = sections.begin();
2765k != sections.end(); ++k) {
2768 const string& section_name = *k;
2773 if(queue_name.empty())
2777queues.
insert(queue_name);
2789 if(!dump_dir.
Exists())
2793 CFilelinked_sections_file(linked_sections_file_name);
2795 if(linked_sections_file.
Exists()) {
2796FILE *
f= fopen(linked_sections_file_name.c_str(),
"rb");
2798 throwruntime_error(
"Cannot open the existing dump file " 2799 "for reading: "+ linked_sections_file_name);
2801map<string, map<string, string> > dump_sections;
2814dump_sections[sname][vname] =
val;
2816}
catch(
constexception & ex) {
2830 if(!dump_dir.
Exists())
2833 size_tbackup_number = 0;
2834 stringbackup_dir_name;
2837 "."+ to_string(backup_number);
2838 if(!
CDir(backup_dir_name).Exists())
2844dump_dir.
Rename(backup_dir_name);
2845}
catch(
constexception & ex) {
2846 ERR_POST(
"Error renaming the dump directory: "<< ex.what());
2848 ERR_POST(
"Unknown error renaming the dump directory");
2862 if(space_file.
Exists()) {
2865}
catch(
constexception & ex) {
2866 string msg=
"Error removing reserving dump space file: "+
2884 if(!dump_dir.
Exists()) {
2887}
catch(
constexception & ex) {
2888 string msg=
"Error creating dump directory: "+
string(ex.what());
2897 if(space_file ==
NULL) {
2898 string msg=
"Error opening reserving dump space file "+
2908 string msg=
"Error creating a memory buffer to write into the " 2909 "reserving dump space file";
2921 string msg=
"Error writing into the reserving dump space file: "+
2932 if(fwrite(
buffer, space, 1, space_file) != 1) {
2933 string msg=
"Error writing into the reserving dump space file: "+
2951k != paused_queues.
end(); ++k) {
2952 stringqueue_name = k->first;
2953 intpause_mode = k->second;
2958 ERR_POST(
"Cannot restore the pause state of the "<<
2959queue_name <<
" queue. The queue is not configured.");
2961existing->second.second->RestorePauseStatus(pause_mode);
2969 constvector<string> & refuse_submit_queues)
2973 for(vector<string>::const_iterator k = refuse_submit_queues.begin();
2974k != refuse_submit_queues.end(); ++k) {
2975 stringqueue_name = *k;
2984 ERR_POST(
"Cannot restore the refuse submit state of the "<<
2985queue_name <<
" queue. The queue is not configured.");
2987existing->second.second->SetRefuseSubmits(
true);
Class for support low level input/output for files.
size_t GetSize() const
For a container node (that is, either an array or an object), return the number of elements in the co...
static CJsonNode NewArrayNode()
Create a new JSON array node.
void AppendString(const string &value)
For an array node, add a string node at the end of the array.
void SetByKey(const string &key, CJsonNode::TInstance value)
For a JSON object node, insert a new element or update an existing element.
static CJsonNode NewStringNode(const string &value)
Create a new JSON string node.
static CJsonNode NewObjectNode()
Create a new JSON object node.
static CNSPreciseTime Current(void)
static CNcbiApplication * Instance(void)
Singleton method.
bool IsAllowed(unsigned int ha) const
CJsonNode SetHosts(const string &host_names)
NetSchedule internal exception.
NetScheduler threaded server.
unsigned int GetReserveDumpSpace(void) const
unsigned GetScanBatchSize(void) const
bool GetRefuseSubmits() const
void RegisterAlert(EAlertType alert_type, const string &message)
const bool & IsLogNotificationThread() const
const bool & IsLogExecutionWatcherThread() const
SNSRegistryParameters GetAffRegistrySettings(void) const
const unsigned int & GetJobCountersInterval() const
const unsigned int & GetStatInterval() const
const bool & IsLogCleaningThread() const
unsigned GetMarkdelBatchSize(void) const
unsigned GetDeleteBatchSize(void) const
void LoadJobsStartIDs(void)
bool WasDBDrained(void) const
bool IsDrainShutdown(void) const
double GetPurgeTimeout(void) const
void SetRefuseSubmits(bool val)
void InitNodeID(const string &db_path)
const bool & IsLogStatisticsThread() const
SNSRegistryParameters GetScopeRegistrySettings(void) const
void SerializeJobsStartIDs(void)
SNSRegistryParameters GetGroupRegistrySettings(void) const
All clients registered to connect.
bool IsMatchingClient(const CQueueClientInfo &cinfo) const
void AddClientInfo(const CQueueClientInfo &cinfo)
string PrintJobsStat(const CNSClientId &client)
CRef< CServiceThread > m_ServiceThread
set< string, PNocase > x_GetConfigQueues(void)
SQueueParameters QueueInfo(const string &qname) const
void RunServiceThread(void)
CQueueDataBase(CNetScheduleServer *server, const string &path, unsigned int max_queues, bool diskless, bool reinit)
string GetQueueConfig(void) const
time_t Configure(const IRegistry ®, CJsonNode &diff)
void RunExecutionWatcherThread(const CNSPreciseTime &run_delay)
void PrintStatistics(size_t &aff_count)
void RunNotifThread(void)
CRef< CQueue > x_GetNext(const string ¤t_name)
unsigned int x_CountQueuesToAdd(const TQueueParams &queues_from_ini) const
unsigned int m_FreeStatusMemCnt
void x_AppendDumpLinkedSections(void)
void WakeupNotifThread(void)
CFastMutex m_ConfigureLock
CRef< CQueue > x_GetQueueAt(unsigned int index)
size_t m_PurgeStatusIndex
bool x_RemoveSpaceReserveFile(void)
void CreateDynamicQueue(const CNSClientId &client, const string &qname, const string &qclass, const string &description="")
void x_CreateDumpErrorFlagFile(void)
TQueueParams x_ReadIniFileQueueClassDescriptions(const IRegistry ®)
bool x_CheckStopPurge(void)
string GetQueueInfo(void) const
void x_DeleteQueuesAndClasses(void)
void PurgeClientRegistry(void)
void x_ValidateConfiguration(const TQueueParams &queues_from_ini) const
bool x_DoesDumpErrorFlagFileExist(void) const
void StopNotifThread(void)
void DeleteDynamicQueue(const CNSClientId &client, const string &qname)
void StopPurgeThread(void)
void x_ReadLinkedSections(const IRegistry ®, CJsonNode &diff)
CJsonNode x_DetectChangesInLinkedSection(const map< string, string > &old_values, const map< string, string > &new_values)
map< string, int > GetPauseQueues(void) const
string GetQueueClassesConfig(void) const
void x_DumpQueueOrClass(FILE *f, const string &qname, const string &qclass, bool is_queue, const SQueueParameters ¶ms)
void StopServiceThread(void)
void RunPurgeThread(void)
void StopExecutionWatcherThread(void)
bool QueueExists(const string &qname) const
CNSPreciseTime SendExactNotifications(void)
bool x_CheckOpenPreconditions(bool reinit)
map< string, map< string, string > > m_LinkedSections
void PurgeAffinities(void)
vector< string > GetRefuseSubmitQueues(void) const
void x_RemoveDumpErrorFlagFile(void)
bool x_DoesCrashFlagFileExist(void) const
bool x_ConfigureQueues(const TQueueParams &queues_from_ini, CJsonNode &diff)
void x_OptimizeStatusMatrix(const CNSPreciseTime ¤t_time)
CRef< CJobQueueCleanerThread > m_PurgeThread
CNetScheduleServer * m_Server
TQueueParams x_ReadIniFileQueueDescriptions(const IRegistry ®, const TQueueParams &classes)
void x_RemoveDataFiles(void)
CNSPreciseTime CalculateRuntimePrecision(void) const
void x_CreateSpaceReserveFile(void)
unsigned int m_PurgeJobScanned
SQueueParameters x_SingleQueueInfo(TQueueInfo::const_iterator found) const
void x_CreateCrashFlagFile(void)
string GetQueueNames(const string &sep) const
void x_RestoreRefuseSubmitState(const vector< string > &refuse_submit_queues)
string PrintTransitionCounters(void)
void NotifyListeners(void)
CFastMutex m_LinkedSectionsGuard
map< string, string > GetLinkedSection(const string §ion_name) const
string x_GetDumpSpaceFileName(void) const
void x_CreateStorageVersionFile(void)
CRef< CQueue > x_GetLastPurged(void)
unsigned x_PurgeUnconditional(void)
void x_RestorePauseState(const map< string, int > &paused_queues)
string GetLinkedSectionConfig(void) const
unsigned int CountActiveJobs(void) const
unsigned int CountAllJobs(void) const
CRef< CQueue > x_GetFirst(void)
void PrintJobCounters(void)
bool x_PurgeQueue(CQueue &queue, size_t status_to_start, size_t status_to_end, unsigned int start_job_id, unsigned int end_job_id, size_t max_scanned, size_t max_mark_deleted, const CNSPreciseTime ¤t_time, size_t &total_scanned, size_t &total_mark_deleted)
void x_DumpLinkedSection(FILE *f, const string &sname, const map< string, string > &values)
CRef< CJobQueueExecutionWatcherThread > m_ExeWatchThread
CRef< CQueue > OpenQueue(const string &name)
void x_ReadDumpQueueDesrc(set< string, PNocase > &dump_static_queues, map< string, string, PNocase > &dump_dynamic_queues, TQueueParams &dump_queue_classes)
void PurgeBlacklistedJobs(void)
string GetQueueClassesInfo(void) const
void CheckExecutionTimeout(bool logging)
bool x_ConfigureQueueClasses(const TQueueParams &classes_from_ini, CJsonNode &diff)
CRef< CGetJobNotificationThread > m_NotifThread
TQueueParams m_QueueClasses
void x_CreateAndMountQueue(const string &qname, const SQueueParameters ¶ms)
void x_RemoveCrashFlagFile(void)
SPurgeAttributes CheckJobsExpiry(const CNSPreciseTime ¤t_time, SPurgeAttributes attributes, unsigned int last_job, TJobStatus status)
static void PrintServerWide(size_t affinities)
void RequestStop()
Schedule thread Stop.
container_type::const_iterator const_iterator
container_type::iterator iterator
const_iterator begin() const
const_iterator end() const
iterator_bool insert(const value_type &val)
const_iterator find(const key_type &key) const
iterator_bool insert(const value_type &val)
const_iterator begin() const
const_iterator find(const key_type &key) const
const_iterator end() const
parent_type::const_iterator const_iterator
static const struct name_t names[]
static DLIST_TYPE *DLIST_NAME() next(DLIST_LIST_TYPE *list, DLIST_TYPE *item)
static void DLIST_NAME() remove(DLIST_LIST_TYPE *list, DLIST_TYPE *item)
static SQLCHAR output[256]
const CNcbiRegistry & GetConfig(void) const
Get the application's cached configuration parameters (read-only).
CVersionInfo GetVersion(void) const
Get the program version information.
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
CDiagContext_Extra & Print(const string &name, const string &value)
The method does not print the argument, but adds it to the string.
CDiagContext & GetDiagContext(void)
Get diag context instance.
CDiagContext_Extra Extra(void) const
Create a temporary CDiagContext_Extra object.
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
#define LOG_POST(message)
This macro is deprecated and it's strongly recomended to move in all projects (except tests) to macro...
void Critical(CExceptionArgs_Base &args)
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
void Warning(CExceptionArgs_Base &args)
TEntries GetEntries(const string &mask=kEmptyStr, TGetEntriesFlags flags=0) const
Get directory entries based on the specified "mask".
static string DeleteTrailingPathSeparator(const string &path)
Delete trailing path separator, if any.
bool Rename(const string &new_path, TRenameFlags flags=fRF_Default)
Rename entry.
virtual bool Remove(TRemoveFlags flags=eRecursive) const
Remove a directory entry.
static string MakePath(const string &dir=kEmptyStr, const string &base=kEmptyStr, const string &ext=kEmptyStr)
Assemble a path from basic components.
static string AddTrailingPathSeparator(const string &path)
Add trailing path separator, if needed.
virtual bool Exists(void) const
Check if directory "dirname" exists.
bool Create(TCreateFlags flags=fCreate_Default) const
Create the directory using "dirname" passed in the constructor.
virtual bool Remove(TRemoveFlags flags=eRecursive) const
Delete existing directory.
virtual bool Exists(void) const
Check existence of file.
@ eRead
File can be read.
@ eReadWrite
File can be read and written.
@ eOpen
Open an existing file, or create a new one.
@ eCreate
Create a new file, or truncate an existing one.
@ fIgnoreRecursive
Suppress "self recursive" elements (the directories "." and "..").
EJobStatus
Job status codes.
@ eDone
Job is ready (computed successfully)
@ eConfirmed
Final state - read confirmed.
@ eCanceled
Explicitly canceled.
@ ePending
Waiting for execution.
@ eReadFailed
Final state - read failed.
@ eFailed
Failed to run (execution timeout)
void Reset(void)
Reset reference object.
bool IsNull(void) const THROWS_NONE
Check if pointer is null â same effect as Empty().
TObjectType & GetObject(void)
Get object.
bool Empty(void) const THROWS_NONE
Check if CRef is empty â not pointing to any object, which means having a null value.
virtual void EnumerateSections(list< string > *sections, TFlags flags=fAllLayers) const
Enumerate section names.
virtual void EnumerateEntries(const string §ion, list< string > *entries, TFlags flags=fAllLayers) const
Enumerate parameter names for a specified section.
virtual string GetString(const string §ion, const string &name, const string &default_value, TFlags flags=0) const
Get the parameter string value.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
static int CompareNocase(const CTempString s1, SIZE_TYPE pos, SIZE_TYPE n, const char *s2)
Case-insensitive compare of a substring with another string.
static list< string > & Split(const CTempString str, const CTempString delim, list< string > &arr, TSplitFlags flags=0, vector< SIZE_TYPE > *token_pos=NULL)
Split a string using specified delimiters.
static bool StartsWith(const CTempString str, const CTempString start, ECase use_case=eCase)
Check if a string starts with a specified prefix value.
static bool SplitInTwo(const CTempString str, const CTempString delim, string &str1, string &str2, TSplitFlags flags=0)
Split a string into two pieces using the specified delimiters.
@ fSplit_MergeDelimiters
Merge adjacent delimiters.
@ eCase
Case sensitive compare.
bool Run(TRunMode flags=fRunDefault)
Run the thread.
void Join(void **exit_data=0)
Wait for the thread termination.
void ParseVersionString(const string &vstr, string *program_name, CVersionInfo *ver)
Parse string, extract version info and program name (case insensitive)
virtual string Print(void) const
Print version information.
where both of them are integers Note
for(len=0;yy_str[len];++len)
Process information in the NCBI Registry, including working with configuration files.
NetSchedule client specs.
#define NETSCHEDULED_STORAGE_VERSION
const CNSPreciseTime kTimeNever
vector< string > DeserializeRefuseSubmitState(const string &data_path, bool diskless)
void SerializePauseState(CNetScheduleServer *server)
map< string, int > DeserializePauseState(const string &data_path, bool diskless)
void SerializeRefuseSubmitState(CNetScheduleServer *server)
const string kPausedQueuesFilesName("PAUSED_QUEUES")
const string kDumpReservedSpaceFileName("space_keeper.dat")
const string kCrashFlagFileName("CRASH_FLAG")
const string kDBStorageVersionFileName("DB_STORAGE_VER")
const string kLinkedSectionsFileName("linked_sections.dump")
const size_t kDumpReservedSpaceFileBuffer
const string kDumpErrorFlagFileName("DUMP_ERROR_FLAG")
static string kNewLine("\n")
const string kDumpSubdirName("dump")
const string kNodeIDFileName("NODE_ID")
const string kRefuseSubmitFileName("REFUSE_SUBMIT")
const string kStartJobIDsFileName("STARTJOBIDS")
const string kQClassDescriptionFileName("qclass_descr.dump")
const unsigned kMaxQueueNameSize
const size_t kStatusesSize
static CNetScheduleAPI::EJobStatus statuses_to_delete_from[]
static SLJIT_INLINE sljit_ins msg(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
static CNamedPipeClient * client
Netschedule queue client info.
CVersionInfo version_info
char section[kLinkedSectionValueNameSize]
char value_name[kLinkedSectionValueNameSize]
char value[kLinkedSectionValueSize]
int Read(FILE *f, size_t fixed_size_from_header)
Uint4 client_registry_min_worker_nodes
char reader_hosts[kMaxQueueLimitsSize]
int Read(FILE *f, size_t fixed_size_from_header)
double read_blacklist_time
char qclass[kMaxQueueNameSize]
double client_registry_timeout_admin
double client_registry_timeout_unknown
char subm_hosts[kMaxQueueLimitsSize]
Uint4 dump_aff_buffer_size
double client_registry_timeout_submitter
char linked_section_prefixes[kLinkedSectionsList]
Uint4 client_registry_min_unknowns
double client_registry_timeout_reader
char wnode_hosts[kMaxQueueLimitsSize]
char linked_section_names[kLinkedSectionsList]
Uint4 dump_client_buffer_size
Uint4 read_failed_retries
Uint4 client_registry_min_submitters
Uint4 client_registry_min_admins
Uint4 dump_group_buffer_size
double max_pending_read_wait_timeout
double client_registry_timeout_worker_node
Uint4 max_jobs_per_client
char qname[kMaxQueueNameSize]
char program_name[kMaxQueueLimitsSize]
double notif_hifreq_interval
double max_pending_wait_timeout
char description[kMaxDescriptionSize]
Uint4 linked_section_prefixes_size
Uint4 linked_section_names_size
double notif_hifreq_period
Uint4 client_registry_min_readers
unsigned int dump_buffer_size
unsigned int client_registry_min_submitters
CNSPreciseTime blacklist_time
map< string, string > linked_sections
CNSPreciseTime read_timeout
unsigned int client_registry_min_unknowns
CNSPreciseTime notif_hifreq_interval
unsigned int dump_aff_buffer_size
CNSPreciseTime notif_hifreq_period
unsigned int max_output_size
CNSPreciseTime read_blacklist_time
CNSPreciseTime pending_timeout
unsigned int max_input_size
CNSPreciseTime notif_handicap
unsigned int max_jobs_per_client
CNSPreciseTime client_registry_timeout_submitter
unsigned int read_failed_retries
CNSPreciseTime client_registry_timeout_unknown
unsigned int notif_lofreq_mult
unsigned int dump_client_buffer_size
CNSPreciseTime reader_timeout
unsigned int failed_retries
CNSPreciseTime max_pending_read_wait_timeout
unsigned int dump_group_buffer_size
unsigned int client_registry_min_worker_nodes
CNSPreciseTime max_pending_wait_timeout
CNSPreciseTime client_registry_timeout_admin
bool ReadQueueClass(const IRegistry ®, const string &sname, vector< string > &warnings)
CNSPreciseTime client_registry_timeout_reader
bool ReadQueue(const IRegistry ®, const string &sname, const map< string, SQueueParameters, PNocase > &queue_classes, vector< string > &warnings)
string GetPrintableParameters(bool include_class, bool url_encoded) const
CNSPreciseTime run_timeout
CNSPreciseTime client_registry_timeout_worker_node
CNSPreciseTime wnode_timeout
unsigned int client_registry_min_admins
unsigned int client_registry_min_readers
static wxAcceleratorEntry entries[3]
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4