(!service_provided) {
73 key.host.push_back(
':');
88 if(job_provided && service_provided) {
94 "must be a host:port server address.");
101 autowarning_handler = [&](
const string& m,
CNetServers) {
117 if(!service_provided) {
119 "' must be explicitly specified.");
123 "' must be specified explicitly (not via $" 171printf(
"{\"status\": \"%s\"}\n", job_status.c_str());
199printf(
"{\"progress_message\": \"%s\"}\n",
289 #define ATTR_CHECK_SET(name, type) \ 290 if (attr_name.length() == sizeof(name) - 1 && \ 291 memcmp(attr_name.data(), name, sizeof(name) - 1) == 0) { \ 292 m_JobAttribute = type; \ 306 switch(attr_name[0]) {
318 #define ATTR_POS " at line "<< m_LineNumber << ", column " << attr_column
323 "unknown attribute "<< attr_name <<
ATTR_POS);
331 "attribute "<< attr_name <<
" requires a value" ATTR_POS);
342 const string& job_key,
343 const string& server_host)
346 intlast_event_index = -1;
348 const char*
format=
"%s \"%s\" from %s [invalid]\n";
351&job_status, &last_event_index))
352 format=
"%s \"%s\" from %s [valid, " 353 "job_status=%s, last_event_index=%d]\n";
366 if(job_input_ostream.fail()) {
372 size_tmax_embedded_input_size,
381numeric_limits<size_t>().
max() :
382max_embedded_input_size - max_embedded_input_size / 10);
386remote_app_stdin.peek();
387 if(!remote_app_stdin.eof())
390request.
Send(job_input_ostream);
423 "More than one \"input\" attribute is defined " 432 "More than one \"args\" attribute is defined " 451 "(and/or \"args\") attribute is required " 459 size_tmax_embedded_input_size,
CNcbiOstream&job_input_ostream)
472remote_app_stdin, job_input_ostream);
486 size_tmax_embedded_input_size =
m_GridClient->GetMaxServerInputSize();
494remote_app_stdin.write(job_input_record.
job_input.data(),
498remote_app_stdin, job_input_ostream);
500job_input_ostream.write(job_input_record.
job_input.data(),
506 if(!job_input_record.
affinity.empty())
524 if(remaining_batch_size == 0) {
526 constvector<CNetScheduleJob>& jobs =
528 ITERATE(vector<CNetScheduleJob>, it, jobs)
530 "%s\n", it->job_id.c_str());
531batch_submitter.
Reset();
540remote_app_stdin.write(job_input_record.
job_input.data(),
544remote_app_stdin, job_input_ostream);
546job_input_ostream.write(job_input_record.
job_input.data(),
551 if(!job_input_record.
affinity.empty())
557--remaining_batch_size;
561 constvector<CNetScheduleJob>& jobs =
563 ITERATE(vector<CNetScheduleJob>, it, jobs)
565batch_submitter.
Reset();
577 "' option is not supported in batch mode");
594 CWStreamjob_input_ostream(&job_input_writer, 0,
NULL);
606 size_tmax_embedded_input_size =
m_GridClient->GetMaxServerInputSize();
652job.
job_id, server_host);
679 intlast_event_index = -1;
681tie(job_status, last_event_index, ignore) =
686 "setting up a job event listener.\n");
697printf(
"%d\n%s\n", last_event_index,
701fprintf(stderr,
"Job event index (%d) has already " 702 "exceeded %d; won't wait.\n",
707fprintf(stderr,
"Job is already '%s'; won't wait.\n",
730std_stream.exceptions((ios::iostate) 0);
732 while(!std_stream.eof()) {
734 if(std_stream.fail() && !std_stream.eof())
736bytes_read = (size_t) std_stream.gcount();
740fwrite(
buffer, bytes_read, 1, output_stream) != 1)
748 const string& data_or_blob_id)
751unique_ptr<IReader> reader;
768 if(fwrite(data_or_blob_id.data(), data_or_blob_id.length(), 1,
779 CRStreaminput_stream(reader.release(), 0, 0, kStreamFlags);
785 ": Cannot deserialize remote_app job input.\n");
796 CRStreaminput_stream(reader.release(), 0, 0, kStreamFlags);
799remote_app_result.
Receive(input_stream);
833 "affinity=\"%s\" exclusive\n":
"affinity=\"%s\"\n",
837 "exclusive\n":
"\n");
873fprintf(stderr,
"Warning: job is in %s status.\n",
891 "' is required in reliable mode (same session must be used for both steps).\n");
895 if(!reliable_second) {
899reliable_first ?
"the first step of reliable mode":
"simple mode");
912rnj_result = job_reader.
ReadNextJob(&job, &job_status);
915rnj_result = job_reader.
ReadNextJob(&job, &job_status, &timeout);
918 switch(rnj_result) {
1024 "' are mutually exclusive.\n");
1049fprintf(stderr,
"%s\nA job has been returned; won't wait.\n",
1056 stringserver_address;
1060 const char*
format=
"%s \"%s\" from %s [invalid]\n";
1065 format=
"%s \"%s\" from %s [valid, server=%s]\n";
1071server_host.c_str(),
1072server_address.c_str());
1103 "Error while reading job output data");
1125fprintf(stderr,
GRID_APP_NAME ": error while sending job output.\n");
1182 ": either the '" QUEUE_ARG "' argument or the '--" 1235TQueueRegister queue_register;
1238 stringserver_address = it->server.GetServerAddress();
1240 ITERATE(std::list<std::string>, queue, it->queues) {
1241queue_register[*queue].insert(server_address);
1245 ITERATE(TQueueRegister, it, queue_register) {
1247 if(it->second.size() != queues.size()) {
1248 const char* sep =
" (limited to ";
1249 ITERATE(TServerSet, server, it->second) {
CJsonNode g_QueueClassInfoToJson(CNetScheduleAPI ns_api)
void g_ProcessJobInfo(CNetScheduleAPI ns_api, const string &job_key, IJobInfoProcessor *processor, bool verbose, CCompoundIDPool::TInstance id_pool)
void g_PrintJSON(FILE *output_stream, CJsonNode node, const char *indent)
CJsonNode g_QueueInfoToJson(CNetScheduleAPI ns_api, const string &queue_name)
ENextAttributeType NextAttribute(CTempString *attr_name, string *attr_value, size_t *attr_column)
void Reset(const char *position, const char *eol)
size_t GetLineNumber() const
CBatchSubmitAttrParser(istream *input_stream)
string m_JobAttributeValue
CAttrListParser m_AttrParser
EOption GetAttributeType() const
const string & GetAttributeValue() const
CCompoundID NewID(ECompoundIDClass new_id_class)
Create and return a new CCompoundID objects.
Base64-encoded ID string that contains extractable typed fields.
void AppendRandom(Uint4 random_number)
Append an eCIT_Random field at the end of this compound ID.
void AppendCurrentTime()
Get the current time and append it as an eCIT_Timestamp field at the end of this compound ID.
string ToString()
Pack the ID and return its string representation.
Grid Client (the submitter).
static bool OnWarning(bool worker_node_admin, const string &warn_msg, CNetServer server)
int DumpJobInputOutput(const string &data_or_blob_id)
CCompoundIDPool m_CompoundIDPool
bool IsOptionSet(int option) const
unique_ptr< CGridClient > m_GridClient
void SetUp_NetScheduleCmd(EAPIClass api_class, EAdminCmdSeverity cmd_severity=eReadOnlyAdminCmd, bool require_queue=true)
struct CGridCommandLineInterfaceApp::SOptions m_Opts
void PrepareRemoteAppJobInput(size_t max_embedded_input_size, const string &args, CNcbiIstream &remote_app_stdin, CNcbiOstream &job_input_ostream)
CNetScheduleSubmitter m_NetScheduleSubmitter
void JobInfo_PrintStatus(CNetScheduleAPI::EJobStatus status)
CNetScheduleAdmin m_NetScheduleAdmin
bool IsOptionAcceptedAndSetImplicitly(EOption option) const
CNetScheduleAPIExt m_NetScheduleAPI
void MarkOptionAsSet(int option)
@ eAdminCmdWithSideEffects
void PrintJobStatusNotification(CNetScheduleNotificationHandler &submit_job_handler, const string &job_key, const string &server_host)
enum CGridCommandLineInterfaceApp::EAPIClass m_APIClass
int PrintJobAttrsAndDumpInput(const CNetScheduleJob &job)
static void PrintLine(const string &line)
bool IsOptionExplicitlySet(int option) const
void x_LoadJobInput(size_t max_embedded_input_size, CNcbiOstream &job_input_ostream)
CNetScheduleExecutor m_NetScheduleExecutor
void CheckJobInputStream(CNcbiOstream &job_input_ostream)
CNetCacheAPI m_NetCacheAPI
Grid Job Batch Submitter.
CJsonNode GetRootNode() const
void DeleteQueue(const string &qname)
Delete queue Applicable only to queues, created through CreateQueue method.
void GetQueueList(TQueueList &result)
void CreateQueue(const string &qname, const string &qclass, const string &description=kEmptyStr)
Create an instance of the given queue class.
void DumpJob(CNcbiOstream &out, const string &job_key)
void CancelAllJobs(const string &job_statuses=kEmptyStr)
Cancel all jobs in the queue (optionally with particular statuses).
void DumpQueue(CNcbiOstream &output_stream, const string &start_after_job=kEmptyStr, size_t job_count=0, const string &job_statuses=kEmptyStr, const string &job_group=kEmptyStr)
void PrintQueueInfo(const string &queue_name, CNcbiOstream &output_stream)
list< SServerQueueList > TQueueList
Smart pointer to a part of the NetSchedule API that does job retrieval and processing on the worker n...
Smart pointer to a part of the NetSchedule API that allows to retrieve completed jobs.
string SaveJobInput(const string &target_dir, CNetCacheAPI &nc_api)
CNetScheduleAPI::EJobStatus WaitForJobEvent(const string &job_key, CDeadline &deadline, CNetScheduleAPI ns_api, TJobStatusMask status_mask, int last_event_index=kMax_Int, int *new_event_index=NULL)
bool CheckJobStatusNotification(const string &job_id, CNetScheduleAPI::EJobStatus *job_status, int *last_event_index=NULL)
bool WaitForNotification(const CDeadline &deadline, string *server_host=NULL)
void CmdAppendTimeoutGroupAndClientInfo(string &cmd, const CDeadline *deadline, const string &job_group)
bool CheckRequestJobNotification(CNetScheduleExecutor::TInstance executor, CNetServer *server)
bool RequestJob(CNetScheduleExecutor::TInstance executor, CNetScheduleJob &job, const string &cmd)
TJobInfo RequestJobWatching(CNetScheduleAPI::TInstance ns_api, const string &job_id, const CDeadline &deadline)
void SubmitJob(CNetScheduleSubmitter::TInstance submitter, CNetScheduleJob &job, unsigned wait_time, CNetServer *server=NULL)
static string MkBaseGETCmd(CNetScheduleExecutor::EJobAffinityPreference affinity_preference, const string &affinity_list)
const string & GetMessage() const
CNetScheduleAPI::EJobStatus WaitForJobCompletion(CNetScheduleJob &job, CDeadline &deadline, CNetScheduleAPI ns_api, time_t *job_exptime=NULL)
void StickToServer(SSocketAddress address)
string GetServerAddress() const
void PrintCmdOutput(const string &cmd, CNcbiOstream &output_stream, ECmdOutputStyle output_style, CNetService::EIterationMode=CNetService::eSortByLoad)
void SetWarningHandler(TEventHandler warning_handler)
CNetServerPool GetServerPool()
Note about the "buf_size" parameter for streams in this API.
@ fLeakExceptions
Exceptions leaked out.
@ fOwnReader
Own the underlying reader.
Remote Application Request (both client side and application executor side)
CNcbiIstream & GetStdInForRead()
Get the stdin stream of the remote application.
void SetMaxInlineSize(size_t max_inline_size)
void Send(CNcbiOstream &os)
Serialize a request to a given stream.
void Deserialize(CNcbiIstream &is)
CNcbiOstream & GetStdIn()
Get an output stream to write data to a remote application stdin.
void SetCmdLine(const string &cmdline)
Set the command line for the remote application.
Remote Application Result (both client side and application executor side)
CNcbiIstream & GetStdErr()
Get a remote application stderr.
CNcbiIstream & GetStdOut()
Get a remote application stdout.
void Receive(CNcbiIstream &is)
Deserialize a request from a given stream.
String or Blob Storage Reader.
String or Blob Storage Writer.
ERW_Result Write(const void *buf, size_t count, size_t *bytes_written=0) override
Write up to "count" bytes from the buffer pointed to by the "buf" argument onto the output device.
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
CTimeout â Timeout interval.
Writer-based output stream.
#define RAW_OUTPUT_FORMAT
#define QUEUEINFO_COMMAND
#define LOGIN_TOKEN_OPTION
#define ANY_AFFINITY_OPTION
#define CLIENT_SESSION_OPTION
#define NETSCHEDULE_OPTION
@ eUsePreferredAffinities
#define ALL_QUEUES_OPTION
#define WAIT_TIMEOUT_OPTION
#define CLAIM_NEW_AFFINITIES_OPTION
#define JOB_INPUT_DIR_OPTION
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
void Error(CExceptionArgs_Base &args)
TErrCode GetErrCode(void) const
Get error code.
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
#define NCBI_THROW_FMT(exception_class, err_code, message)
The same as NCBI_THROW but with message processed as output to ostream.
EJobStatus
Job status codes.
void ReadConfirm(const string &job_id, const string &auth_token)
Mark the specified job as successfully retrieved.
CNetScheduleSubmitter GetSubmitter()
Create an instance of CNetScheduleSubmitter.
void Submit(const string &job_group=kEmptyStr)
Submit a batch to the queue.
void SetJobMask(CNetScheduleAPI::TJobMask mask)
Set a job mask.
string output
Job result data.
void SetJobAffinity(const string &affinity)
Set a job affinity.
EJobStatus GetJobDetails(CNetScheduleJob &job, time_t *job_exptime=NULL, ENetScheduleQueuePauseMode *pause_mode=NULL)
Get job details.
CNetScheduleAPI::EJobStatus GetJobStatus(const CNetScheduleJob &job, time_t *job_exptime=NULL, ENetScheduleQueuePauseMode *pause_mode=NULL)
Get the current status of the specified job.
CNetScheduleJobReader GetJobReader(const string &group=kEmptyStr, const string &affinity=kEmptyStr)
Create an instance of CNetScheduleJobReader.
void ReturnJob(const CNetScheduleJob &job)
Switch the job back to the "Pending" status so that it can be run again on a different worker node.
void ClearNode()
Unregister client-listener.
void PutResult(const CNetScheduleJob &job)
Put job result (job should be received by GetJob() or WaitJob())
CNetScheduleAPI::TJobMask mask
int ret_code
Job return code.
static string StatusToString(EJobStatus status)
Printable status type.
EReadNextJobResult ReadNextJob(CNetScheduleJob *job, CNetScheduleAPI::EJobStatus *job_status, const CTimeout *timeout=NULL)
Wait and return the next completed job.
CNetScheduleAPI::EJobStatus GetJobDetails(CNetScheduleJob &job, time_t *job_exptime=NULL, ENetScheduleQueuePauseMode *pause_mode=NULL)
Get full information about the specified job.
void SetJobGroup(const string &job_group)
Retrieve jobs from the specified group only.
CNetScheduleAPI::EJobStatus GetJobStatus(const string &job_key, time_t *job_exptime=NULL, ENetScheduleQueuePauseMode *pause_mode=NULL)
Get the current status of the specified job.
void JobDelayExpiration(const CNetScheduleJob &job, unsigned runtime_inc)
Increment job execution timeout.
EJobAffinityPreference
Affinity matching modes.
void CancelJobGroup(const string &job_group, const string &job_statuses=kEmptyStr)
Cancel job group.
void GetProgressMsg(CNetScheduleJob &job)
Update the progress_message field of the job structure.
void SetClientType(EClientType client_type)
void ReadRollback(const string &job_id, const string &auth_token)
Refuse from processing the results of the specified job.
void SetAffinityPreference(EJobAffinityPreference aff_pref)
Set preferred method of requesting jobs with affinities.
const SServerParams & GetServerParams()
EReadNextJobResult
Possible outcomes of ReadNextJob() calls.
void PutProgressMsg(const CNetScheduleJob &job)
Put job interim (progress) message.
bool GetJob(CNetScheduleJob &job, const string &affinity_list=kEmptyStr, CDeadline *dealine=NULL)
Get a pending job.
CNetScheduleExecutor GetExecutor()
Create an instance of CNetScheduleExecutor.
void ReadFail(const string &job_id, const string &auth_token, const string &error_message=kEmptyStr)
Refuse from processing the results of the specified job and increase its counter of failed job result...
CNetScheduleAdmin GetAdmin()
void PutFailure(const CNetScheduleJob &job, bool no_retries=false)
Submit job failure diagnostics.
string job_id
Output job key.
void CancelJob(const string &job_key)
Cancel job.
const vector< CNetScheduleJob > & GetBatch() const
CNcbiOstream & GetOStream()
Get a stream where a client can write an input data for the remote job.
@ eDone
Job is ready (computed successfully)
@ eConfirmed
Final state - read confirmed.
@ eReading
Job has its output been reading.
@ eRunning
Running on a worker node.
@ eJobNotFound
No such job.
@ ePending
Waiting for execution.
@ eReadFailed
Final state - read failed.
@ eExplicitAffinitiesOnly
@ eExclusiveJob
Exclusive job - the node executes only this job, even if there are processor resources.
@ eRNJ_NotReady
No matching jobs are ready for reading.
@ eRNJ_JobReady
A job is returned.
@ eRNJ_NoMoreJobs
No matching jobs.
@ eRNJ_Interrupt
ReadNextJob() has been interrupted.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
IO_PREFIX::ostream CNcbiOstream
Portable alias for ostream.
IO_PREFIX::istream CNcbiIstream
Portable alias for istream.
bool NcbiStreamCopy(CNcbiOstream &os, CNcbiIstream &is)
Copy the entire contents of stream "is" to stream "os".
@ eRW_Eof
End of data, should be considered permanent.
@ eRW_Success
Everything is okay, I/O completed.
static string PrintableString(const CTempString str, TPrintableMode mode=fNewLine_Quote|fNonAscii_Passthru)
Get a printable version of the specified string.
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
CTime GetFastLocalTime(void)
Quick and dirty getter of local time.
The blob sat and sat key Both must be positive integers</td > n< td > Non empty string The interpretation of the blob id depends on a processor Cassandra n processor expects the following format
const struct ncbi::grid::netcache::search::fields::KEY key
static const string s_NotificationTimestampFormat("Y/M/D h:m:s.l")
#define ATTR_CHECK_SET(name, type)
static bool s_DumpStdStream(CNcbiIstream &std_stream, FILE *output_stream)
Reader-writer based streams.
time_t extend_lifetime_by
EOutputFormat output_format
void ReSetClientSession(const string &)
void ReSetClientNode(const string &)
static TInstance CreateWnCompat(const string &, const string &)
static TInstance CreateNoCfgLoad(const string &, const string &, const string &)
Meaningful information encoded in the NetSchedule key.
SBatchSubmitRecord(istream *input_stream)
CBatchSubmitAttrParser attr_parser
bool remote_app_args_defined
static SSocketAddress Parse(const string &address, SHost::EName name=SHost::EName::eResolved)
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4