m_NeedToProxy(
false)
57 #if __NC_TASKS_MONITOR 58m_TaskName =
"CNCActiveHandler_Proxy";
158m_BlobAccess(
NULL),
159m_ReservedForBG(
false),
160m_ProcessingStarted(
false),
161m_CmdStarted(
false),
162m_GotAnyAnswer(
false),
163m_CmdFromClient(
false),
166 #if __NC_TASKS_MONITOR 167m_TaskName =
"CNCActiveHandler";
197CNCActiveHandler::State
200 SRV_FATAL(
"CNCActiveHandler started in invalid state");
229CNCActiveHandler::State
268CNCActiveHandler::State
279 if(
ctx== proxy_ctx)
430 const string& password,
453 if(!password.empty()) {
465 const string& password,
485 if(!password.empty()) {
497 const string& password,
526 if(!password.empty()) {
538 const string& password,
558 if(!password.empty()) {
570 const string& password,
606 if(!password.empty()) {
622 const string& password,
655 if(!password.empty()) {
705 const string& password,
736 if(!password.empty()) {
748 const string& password,
749 unsigned intadd_time,
777 if(!password.empty()) {
794 m_CmdToSend+= filters ?
"PROXY_BLIST2 \"":
"PROXY_BLIST \"";
851 if(filters->
cr_srv!= 0) {
1043 const string&
key,
1080 Uint8remote_rec_no)
1100CNCActiveHandler::State
1122CNCActiveHandler::State
1129 m_ErrMsg=
"ERR:Connection closed by peer";
1136CNCActiveHandler::State
1151CNCActiveHandler::State
1195 m_ErrMsg=
"ERR:Error in TaskServer";
1225CNCActiveHandler::State
1276CNCActiveHandler::State
1286CNCActiveHandler::State
1297CNCActiveHandler::State
1302<<
"Protocol error. Got response: '" 1311 Uint4start_word = 0x01020304;
1316CNCActiveHandler::State
1322 Uint4finish_word = 0xFFFFFFFF;
1329CNCActiveHandler::State
1336CNCActiveHandler::State
1342list<CTempString> params;
1344 if(params.size() < 5)
1347list<CTempString>::const_iterator param_it = params.begin();
1369CNCActiveHandler::State
1432CNCActiveHandler::State
1438pos +=
sizeof(
"NEED_ABORT") - 1;
1447pos +=
sizeof(
"HAVE_NEWER") - 1;
1463CNCActiveHandler::State
1542CNCActiveHandler::State
1564CNCActiveHandler::State
1572CNCActiveHandler::State
1576 if(pos == string::npos) {
1579<<
"SIZE is not found in peer response");
1583pos +=
sizeof(
"SIZE=") - 1;
1592<<
"Cannot parse data size: "<< ex);
1611CNCActiveHandler::State
1634CNCActiveHandler::State
1649 const char* keywd =
"Content-Length:";
1658pos += strlen(keywd);
1674CNCActiveHandler::State
1689 m_ErrMsg=
"ERR:Error writing to peer";
1691 m_ErrMsg=
"ERR:Error writing blob data to client";
1697CNCActiveHandler::State
1712CNCActiveHandler::State
1732CNCActiveHandler::State
1735list<CTempString> tokens;
1737 if(tokens.size() != 2 && tokens.size() != 3)
1740list<CTempString>::const_iterator it_tok = tokens.begin();
1741 Uint8local_rec_no = 0, remote_rec_no = 0;
1746 if(it_tok != tokens.end())
1763CNCActiveHandler::State
1774 m_Purge= (line ==
"PURGE:");
1777 if(line.
empty() || line ==
";") {
1791CNCActiveHandler::State
1812CNCActiveHandler::State
1855CNCActiveHandler::State
1876CNCActiveHandler::State
1888+
sizeof(blob_sum->
expire)
1919CNCActiveHandler::State
1957CNCActiveHandler::State
1977CNCActiveHandler::State
1980list<CTempString> tokens;
1982 if(tokens.size() != 11) {
1985list<CTempString>::const_iterator it_tok = tokens.begin();
2018CNCActiveHandler::State
2027 m_ErrMsg=
"ERR:Error writing blob to database";
2064CNCActiveHandler::State
2081list<CTempString> tokens;
2083 if(tokens.size() != 7)
2086list<CTempString>::const_iterator it_tok = tokens.begin();
2113CNCActiveHandler::State
2118list<CTempString> tokens;
2120 ITERATE( list<CTempString>,
t, tokens) {
2124list<CTempString> v;
2125ncbi_NStr_Split(two,
".", v);
2126 if(v.size() >= 3) {
2127 for(
int i=0;
i<3; ++
i) {
2151CNCActiveHandler::State
2167 boolneed_event =
false;
2197CNCActiveHandler::State
2265CNCActiveHandler::State
2289CNCActiveHandler::State
2296 Uint8(0xFFFFFFFE)));
2305 m_ErrMsg=
"ERR:Blob data is corrupted";
2324CNCActiveHandler::State
2332CNCActiveHandler::State
2346CNCActiveHandler::State
static string s_PeerAuthString
Uint4 GetDefaultTaskPriority(void)
CNCActiveHandler * m_Handler
CNCMessageHandler * m_Client
CNCMessageHandler * GetClient(void)
CNCActiveClientHub(CNCMessageHandler *client)
virtual void ExecuteRCU(void)
Method implementing RCU job that was scheduled earlier by CallRCU().
void SetStatus(ENCClientHubStatus status)
static CNCActiveClientHub * Create(Uint8 srv_id, CNCMessageHandler *client)
ENCClientHubStatus m_Status
void SetErrMsg(const string &msg)
string GetFullPeerName(void)
void NeedToProxySocket(void)
CNCActiveHandler_Proxy(CNCActiveHandler *handler)
bool SocketProxyDone(void)
virtual void ExecuteSlice(TSrvThreadNum thr_num)
This is the main method to do all work this task should do.
virtual ~CNCActiveHandler_Proxy(void)
CNCActiveHandler * m_Handler
void SetHandler(CNCActiveHandler *handler)
State x_ProcessProtocolError(void)
State x_ReadSyncStartAnswer(void)
void x_CleanCmdResources(void)
State x_ReadCopyProlong(void)
bool IsReservedForBG(void)
State x_ReadBlobsListBody(void)
void x_StartWritingBlob(void)
void ProxyHasBlob(CRequestContext *cmd_ctx, const CNCBlobKey &key, const string &password, Uint1 quorum)
State x_SendCopyPutCmd(void)
void ProxySetValid(CRequestContext *cmd_ctx, const CNCBlobKey &key, const string &password, int version)
CNCBlobAccessor * m_BlobAccess
State x_ReplaceServerConn(void)
void SyncProlongPeer(CNCActiveSyncControl *ctrl, SNCSyncEvent *event)
void SyncRead(CNCActiveSyncControl *ctrl, SNCSyncEvent *event)
State x_ReadSyncProInfoAnswer(void)
State x_ProcessPeerError(void)
State x_ReadBlobData(void)
void SyncSend(CNCActiveSyncControl *ctrl, SNCSyncEvent *event)
State x_SendSyncGetCmd(void)
void CopyPurge(CRequestContext *cmd_ctx, const CNCBlobKeyLight &key, Uint8 when)
void ProxyWrite(CRequestContext *cmd_ctx, const CNCBlobKey &key, const string &password, int version, Uint4 ttl, Uint1 quorum, TNCUserFlags flags)
bool x_StartProcessing(void)
State x_ReadSyncStartExtra(void)
State x_FakeWritingBlob(void)
CNCActiveClientHub * m_Client
CNCActiveHandler_Proxy * m_Proxy
State x_ReadEventsListBody(void)
State x_SendCmdToExecute(void)
void CopyUpdate(const CNCBlobKeyLight &key, Uint8 create_time)
void ProxyBList(CRequestContext *cmd_ctx, const CNCBlobKey &key, bool force_local, SNCBlobFilter *filters)
void ProxyRead(CRequestContext *cmd_ctx, const CNCBlobKey &key, const string &password, int version, Uint8 start_pos, Uint8 size, Uint1 quorum, bool search, bool force_local, Uint8 age)
State x_ReadConfirm(void)
State x_ReadBlobsListKeySize(void)
State x_FinishWritingBlob(void)
void SetReservedForBG(bool value)
State x_ReadCopyPut(void)
void SetClientHub(CNCActiveClientHub *hub)
void x_SendCopyProlongCmd(const SNCBlobSummary &blob_sum)
CNCActiveHandler(Uint8 srv_id, CNCPeerControl *peer)
void SyncCancel(CNCActiveSyncControl *ctrl)
void CopyRemove(const CNCBlobKeyLight &key, Uint8 create_time)
State x_ReadSyncGetAnswer(void)
State x_FinishCommand(void)
State x_WaitForMetaInfo(void)
void SyncStart(CNCActiveSyncControl *ctrl, Uint8 local_rec_no, Uint8 remote_rec_no)
void ProxyRemove(CRequestContext *cmd_ctx, const CNCBlobKey &key, const string &password, int version, Uint1 quorum)
State x_PrepareSyncProlongCmd(void)
State x_ReadEventsListKeySize(void)
ESynActionType m_SyncAction
State x_ReadHttpDataPrefix(void)
State x_WriteBlobData(void)
void SyncCommit(CNCActiveSyncControl *ctrl, Uint8 local_rec_no, Uint8 remote_rec_no)
State x_ReadFoundMeta(void)
CNCActiveSyncControl * m_SyncCtrl
void ClientReleased(void)
State x_ConnClosedReplaceable(void)
void AskPeerVersion(void)
void ProxyGetMeta(CRequestContext *cmd_ctx, const CNCBlobKey &key, Uint1 quorum, bool force_local, int http)
static void Initialize(void)
void ProxyProlong(CRequestContext *cmd_ctx, const CNCBlobKey &key, const string &password, unsigned int add_time, Uint1 quorum, bool search, bool force_local)
void SetProxy(CNCActiveHandler_Proxy *proxy)
void x_SetStateAndStartProcessing(State state)
State x_ExecuteProInfoCmd(void)
void x_FinishSyncCmd(ESyncResult result, int hint)
void ProxyReadLast(CRequestContext *cmd_ctx, const CNCBlobKey &key, const string &password, Uint8 start_pos, Uint8 size, Uint1 quorum, bool search, bool force_local, Uint8 age)
State x_InvalidState(void)
void SyncBlobsList(CNCActiveSyncControl *ctrl)
void CloseForShutdown(void)
State x_CloseCmdAndConn(void)
State x_FinishBlobFromClient(void)
void SyncProlongOur(CNCActiveSyncControl *ctrl, SNCSyncEvent *event)
void ProxyGetSize(CRequestContext *cmd_ctx, const CNCBlobKey &key, const string &password, int version, Uint1 quorum, bool search, bool force_local)
State x_ReadDataPrefix(void)
State x_WaitOneLineAnswer(void)
void CheckCommandTimeout(void)
State x_ReadWritePrefix(void)
CNCBlobKeyLight m_BlobKey
State x_ReadPeerVersion(void)
void CopyProlong(const CNCBlobKeyLight &key, Uint2 slot, Uint8 orig_rec_no, Uint8 orig_time, const SNCBlobSummary &blob_sum)
Uint8 GetSrvId(void) const
State x_WaitClientRelease(void)
void CopyPut(CRequestContext *cmd_ctx, const CNCBlobKeyLight &key, Uint2 slot, Uint8 orig_rec_no)
State x_ReadSyncGetHeader(void)
void x_DoProlongOur(void)
State x_MayDeleteThis(void)
State x_ReadSizeToRead(void)
virtual ~CNCActiveHandler(void)
void x_SetSlotAndBucketAndVerifySlot(Uint2 slot)
void SearchMeta(CRequestContext *cmd_ctx, const CNCBlobKey &key)
State x_ReadDataForClient(void)
State x_ReadSyncStartHeader(void)
State x_PutSelfToPool(void)
bool AddStartEvent(SNCSyncEvent *evt)
bool AddStartBlob(const string &key, SNCBlobSummary *blob_sum)
void CmdFinished(ESyncResult res, ESynActionType action, CNCActiveHandler *conn, int hint)
void StartResponse(Uint8 local_rec_no, Uint8 remote_rec_no, bool by_blobs)
static void Register(EAlertType alert_type, const string &message)
void * GetWriteMemPtr(void)
Uint8 GetCurBlobSize(void) const
Get size of the blob.
void RequestMetaInfo(CSrvTask *owner)
void SetNewVerExpire(int dead_time)
void MoveReadPos(Uint4 move_size)
bool IsMetaInfoReady(void)
size_t GetWriteMemSize(void)
void SetCurBlobExpire(int expire, int dead_time=0)
unsigned int GetCurBlobTTL(void) const
void SetCurVerExpire(int dead_time)
unsigned int GetCurVersionTTL(void) const
void SetPassword(CTempString password)
bool IsCurBlobDead(void) const
void SetNewBlobExpire(int expire, int dead_time=0)
int GetCurBlobDeadTime(void) const
void SetVersionTTL(int ttl)
static bool UpdatePurgeData(const string &data, char separator='\n')
Uint4 GetCurCreateId(void) const
void SetBlobTTL(unsigned int ttl)
Set blob's timeout after last access before it will be deleted.
Uint8 GetCurBlobCreateTime(void) const
Uint8 GetCurCreateServer(void) const
void SetCreateServer(Uint8 create_server, Uint4 create_id)
void MoveWritePos(Uint4 move_size)
int GetCurBlobVersion(void) const
Uint4 GetReadMemSize(void)
int GetCurBlobExpire(void) const
string GetCurPassword(void) const
void SetPosition(Uint8 pos)
Initially set current position in the blob to start reading from.
int GetCurVerExpire(void) const
bool HasError(void) const
void SetBlobVersion(int ver)
void SetBlobCreateTime(Uint8 create_time)
const void * GetReadMemPtr(void)
bool IsBlobExists(void) const
Check if blob exists.
void Release(void)
Release blob lock.
const string & PackedKey(void) const
const CTempString & RawKey(void) const
const CTempString & SubKey(void) const
const CTempString & Cache(void) const
static void SavePurgeData(void)
static CNCBlobAccessor * GetBlobAccess(ENCAccessType access, const string &key, const string &password, Uint2 time_bucket)
Acquire access to the blob identified by key, subkey and version.
static Uint4 GetSyncPriority(void)
static string GetFullPeerName(Uint8 srv_id)
static Uint8 GetNetworkErrorTimeout(void)
static Uint1 GetBlobListTimeout(void)
static Uint8 GetSelfID(void)
static Uint1 GetPeerTimeout(void)
Handler of all NetCache incoming requests.
bool IsBlobWritingFinished(void)
void AbortInitialSync(void)
void RegisterConnError(void)
void RegisterConnSuccess(void)
void PutConnToPool(CNCActiveHandler *conn)
CNCActiveHandler * GetPooledConn(void)
bool AcceptsUserFlags(void) const
void ReleaseConn(CNCActiveHandler *conn)
bool CreateNewSocket(CNCActiveHandler *conn)
void SetHostProtocol(Uint8 ver)
void AssignClientConn(CNCActiveClientHub *hub)
static CNCPeerControl * Peer(Uint8 srv_id)
static void ClientDataRead(size_t data_size)
static void PeerDataWrite(size_t data_size)
static void PeerDataRead(size_t data_size)
static Uint8 AddEvent(Uint2 slot, SNCSyncEvent *event)
const value_type * data() const
void resize_mem(size_type new_size)
Resize the buffer. No data preservation.
void CallRCU(void)
Method to be called to schedule call of ExecuteRCU() at appropriate time.
Special variant of CRef that doesn't check for NULL when dereferencing.
bool IsProxyInProgress(void)
Check whether proxying started earlier is still in progress.
bool NeedToClose(void)
Checks if socket should be closed because of long inactivity or because server is in "hard" shutdown ...
size_t Write(const void *buf, size_t size)
Write into the socket as much as immediately possible (including writing into internal write buffers ...
void AbortSocket(void)
Abort the socket, i.e.
CSrvSocketTask & WriteText(CTempString message)
Write text into socket.
bool ProxyHadError(void)
Check whether proxying started earlier finished successfully or any of sockets had some error in it.
virtual void Terminate(void)
Terminate the task.
bool m_NeedToClose
Flag showing that socket needs to be closed because of long inactivity.
bool ReadData(void *buf, Uint2 size)
Read from socket exactly the given data size.
void RequestFlush(void)
Request flushing of all data saved in internal write buffers to socket.
size_t Read(void *buf, size_t size)
Read from socket into memory.
bool ReadNumber(NumType *num)
Read from socket a number in native machine representation.
void WriteData(const void *buf, size_t size)
Write the exact amount of data into the socket.
bool StartProcessing(TSrvThreadNum thread_num=0, bool boost=false)
Start processing of the socket and include it into TaskServer's central epoll.
bool ReadLine(CTempString *line)
Read from socket one line which ends with ' ', '\r ' or '\0'.
bool NeedEarlyClose(void)
Checks if socket should be closed because of internal reasons (long inactivity or "hard" shutdown as ...
void StartProxyTo(CSrvSocketTask *dst_task, Uint8 proxy_size)
Start proxying of raw data from this socket to the one in dst_task.
bool FlushIsDone(void)
Check if data flushing requested earlier is complete.
void SetState(State state)
Sets current state of the machine.
void SetDiagCtx(CRequestContext *ctx)
Set diagnostic context for this task to work in.
int m_LastActive
Time (in seconds) when the task was active last time, i.e.
CRequestContext * GetDiagCtx(void)
Get current diagnostic context for the task.
void SetPriority(Uint4 prty)
Set and retrieve task's priority.
void SetRunnable(bool boost=false)
Set this task "runnable", i.e.
virtual void Terminate(void)
Stops task's execution and deletes it.
void ReleaseDiagCtx(void)
Releases current diagnostic context of the task.
static int CurSecs(void)
Current time in seconds since epoch (time_t).
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
static void StartSyncBlob(Uint8 create_time)
void(*)(CSeq_entry_Handle seh, IWorkbench *wb, const CSerialObject &obj) handler
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
string GetSessionID(void) const
Session ID.
string GetClientIP(void) const
Client IP/hostname.
void SetRequestStatus(int status)
void Critical(CExceptionArgs_Base &args)
void Warning(CExceptionArgs_Base &args)
uint8_t Uint1
1-byte (8-bit) unsigned integer
uint32_t Uint4
4-byte (32-bit) unsigned integer
uint16_t Uint2
2-byte (16-bit) unsigned integer
int64_t Int8
8-byte (64-bit) signed integer
uint64_t Uint8
8-byte (64-bit) unsigned integer
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
NCBI_NS_STD::string::size_type SIZE_TYPE
static string Int8ToString(Int8 value, TNumToStringFlags flags=0, int base=10)
Convert Int8 to string.
static int StringToInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to int.
static string IntToString(int value, TNumToStringFlags flags=0, int base=10)
Convert int to string.
bool empty(void) const
Return true if the represented string is empty (i.e., the length is zero)
static SIZE_TYPE FindCase(const CTempString str, const CTempString pattern, SIZE_TYPE start, SIZE_TYPE end, EOccurrence which=eFirst)
Find the pattern in the specified range of a string using a case sensitive search.
static string UIntToString(unsigned int value, TNumToStringFlags flags=0, int base=10)
Convert UInt to string.
static bool StartsWith(const CTempString str, const CTempString start, ECase use_case=eCase)
Check if a string starts with a specified prefix value.
void clear(void)
Clears the string.
static Uint8 StringToUInt8(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to Uint8.
static bool SplitInTwo(const CTempString str, const CTempString delim, string &str1, string &str2, TSplitFlags flags=0)
Split a string into two pieces using the specified delimiters.
static unsigned int StringToUInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to unsigned int.
CTempString substr(size_type pos) const
Obtain a substring from this string, beginning at a given offset.
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
size_type find(const CTempString match, size_type pos=0) const
Find the first instance of the entire matching string within the current string, beginning at an opti...
size_type size(void) const
Return the length of the represented array.
static string UInt8ToString(Uint8 value, TNumToStringFlags flags=0, int base=10)
Convert UInt8 to string.
static const size_type npos
@ fAllowTrailingSymbols
Ignore trailing non-numerics characters.
@ fAllowLeadingSpaces
Ignore leading whitespace characters in converted string.
const string version
version string
const struct ncbi::grid::netcache::search::fields::SIZE size
const struct ncbi::grid::netcache::search::fields::KEY key
const GenericPointer< typename T::ValueType > T2 value
const string & GetMessageByStatus(EHTTPStatus sts)
static const char *const kNCPeerClientName
@ eStatus_OK
Command is ok and execution is good.
@ eStatus_BadPeer
Peer returned something wrong.
@ eNCReadData
Read blob data.
@ eNCCopyCreate
(Re-)write blob from another NetCache (as opposed to writing from client)
@ eNCRead
Read meta information only.
Defines CRequestContext class for NCBI C++ diagnostic API.
static CNamedPipeClient * client
#define SRV_LOG(sev, msg)
Macro to be used for printing log messages.
#define ACCESS_ONCE(x)
Purpose of this macro is to force compiler to access variable exactly at the place it's written (no m...
constexpr Uint8 kUSecsPerSecond
ENCSyncEvent
Event types to log.
Uint2 TSrvThreadNum
Type for thread number in TaskServer.
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4