(next_time <
value)
114it->second->SetRunnable();
156 conn->AskPeerVersion();
185m_HostIP(
Uint4(m_SrvId >> 32)),
190m_SlotsToInitSync(0),
191m_OrigSlotsToInitSync(0),
195m_InThrottle(
false),
196m_MaybeThrottle(
false),
197m_HasBGTasks(
false),
198m_InitiallySynced(
false)
200 #if __NC_TASKS_MONITOR 201m_TaskName =
"CNCPeerControl";
208 if(!hostport.empty()) {
209list<CTempString> srv_fields;
210ncbi_NStr_Split(hostport,
":", srv_fields);
211 if(srv_fields.size() == 2) {
274 if(host != 0 &&
m_HostIP!= host) {
301 conn->SetProxy(proxy);
406 if(!
conn||
conn->IsReservedForBG()) {
409 conn->SetReservedForBG(
false);
433 conn->SetReservedForBG(for_bg);
461:
"ERR:Cannot connect to peer");
467 conn->SetClientHub(hub);
496 conn->SetReservedForBG(
true);
512<<
") connections");
529 boolis_locked =
true;
554 conn->SetReservedForBG(
true);
579 conn->SetReservedForBG(
true);
707cur_time, queue_size);
725 Uint8srv_id = *it_srv;
744 Uint8srv_id = *it_srv;
765 Uint8srv_id = *it_srv;
766 if(
done.find(srv_id) ==
done.end()) {
786 Uint8srv_id = *it_srv;
791orig_rec_no, orig_time, accessor);
814 intactive = 0, bg = 0;
823 state.peer_active_conns = active;
824 state.peer_bg_conns = bg;
866it_ctrl->second->m_MaybeThrottle) {
881 stringis(
"\": "), iss(
"\": \""), eol(
",\n\""), qt(
"\"");
885 if(it != ctrl.
begin()) {
955 INFO(
"Initial sync: for " 967 boolsucceeded =
true;
968 if(cnt_slots != 1) {
970 INFO(
"Initial sync: Server " 979 SRV_LOG(
Error,
"Initial sync: unable to synchronize with any server");
981 SRV_LOG(
Critical,
"Initial sync: unable to synchronize with any server");
1000 Uint8& next_sync_time,
1001 Uint8next_sync_delay)
1005 Uint8next_time = now + next_sync_delay;
1023 size_t key= (is_passive ? 2 : 0) | (is_by_blobs ? 1 : 0);
1027 key=
key| (hint & 0xFFFF);
1038 stringis(
"\": "), iss(
"\": \""), eol(
",\n\""), qt(
"\"");
1042 if(it != ctrl.
begin()) {
1064 size_t key=
i->first;
1065 size_thint =
key& 0xFFFF;
1069 boolby_blobs = (
key& 1) != 0;
1070 boolpassive = (
key& 2) != 0;
1100 boolhas_more =
true;
1101 booltask_added =
false;
1136 if(sync_ctrl == ctrl) {
1179it->CheckCommandTimeout();
1203it->CheckCommandTimeout();
1211 boolhas_more = sync_ctrl->
GetNextTask(task_info);
1231 conn->CloseForShutdown();
Mutex created to have minimum possible size (its size is 4 bytes) and to sleep using kernel capabilit...
void Unlock(void)
Unlock the mutex.
void Lock(void)
Lock the mutex.
void SetStatus(ENCClientHubStatus status)
void SetHandler(CNCActiveHandler *handler)
void SetErrMsg(const string &msg)
bool GetNextTask(SSyncTaskInfo &task_info, bool *is_valid=nullptr)
void CmdFinished(ESyncResult res, ESynActionType action, CNCActiveHandler *conn, int hint)
void ExecuteSyncTask(const SSyncTaskInfo &task_info, CNCActiveHandler *conn)
static void Register(EAlertType alert_type, const string &message)
@ eSyncFailed
Synchronization failed.
@ ePeerIpChanged
Peer IP address changed.
Uint8 GetCurBlobSize(void) const
Get size of the blob.
int GetCurBlobDeadTime(void) const
Uint4 GetCurCreateId(void) const
Uint8 GetCurBlobCreateTime(void) const
Uint8 GetCurCreateServer(void) const
int GetCurBlobExpire(void) const
int GetCurVerExpire(void) const
static Uint2 GetMaxPeerBGConns(void)
static Uint2 GetMaxMirrorQueueSize(void)
static Uint1 GetMaxSyncsOneServer(void)
static Uint8 GetSmallBlobBoundary(void)
static string GetPeerNameOrEmpty(Uint8 srv_id)
static Uint2 GetMaxPeerTotalConns(void)
static string GetPeerName(Uint8 srv_id)
static string GetFullPeerName(Uint8 srv_id)
static Uint8 GetNetworkErrorTimeout(void)
static Uint2 GetCntErrorsToThrottle(void)
static Uint8 GetSelfTrustLevel(void)
static Uint8 GetPeerThrottlePeriod(void)
static const TServersList & GetRawServersForSlot(Uint2 slot)
static Uint4 CreateHostAlias(Uint4 ip, Uint4 port)
static bool HasCommonSlots(Uint8 server)
static Uint2 GetCntThrottlesToIpchange(void)
static const string & GetMirroringSizeFile(void)
static void Finalize(void)
static void MirrorRemove(const CNCBlobKeyLight &key, Uint2 slot, Uint8 update_time)
static bool HasPeerInThrottle(void)
Uint2 m_OrigSlotsToInitSync
bool x_DoReleaseConn(CNCActiveHandler *conn)
CNCActiveHandler * x_GetBGConnImpl(void)
void AbortInitialSync(void)
TNCPeerConnsList m_PooledConns
TNCActiveSyncList m_SyncList
static void SetServersForInitSync(Uint4 cnt_servers)
TNCPeerConnsList m_BusyConns
void x_DeleteMirrorEvent(SNCMirrorEvent *event)
TNCMirrorQueue m_SmallMirror
void x_ProcessMirrorEvent(CNCActiveHandler *conn, SNCMirrorEvent *event)
void RegisterConnError(void)
Uint8 GetSrvId(void) const
void x_DecActiveConns(void)
CNCPeerControl(Uint8 srv_id)
void RegisterConnSuccess(void)
void PutConnToPool(CNCActiveHandler *conn)
void AddInitiallySyncedSlot(void)
bool x_ReserveBGConnNow(void)
TNCClientHubsList m_Clients
TNCMirrorQueue m_BigMirror
static Uint4 FindIPbyName(const string &alias)
CNCActiveHandler * GetPooledConn(void)
bool FinishSync(CNCActiveSyncControl *sync_ctrl)
Uint8 GetTrustLevel(void) const
static void MirrorWrite(const CNCBlobKeyLight &key, Uint2 slot, Uint8 orig_rec_no, Uint8 size, const TServersList &mirrors_done)
void RegisterSyncStat(bool is_passive, bool is_by_blobs, int result, int hint)
bool AddSyncControl(CNCActiveSyncControl *sync_ctrl)
void x_UnreserveBGConn(void)
bool GetReadyForShutdown(void)
bool StartActiveSync(void)
bool AcceptsSyncUpdate(void) const
void ReleaseConn(CNCActiveHandler *conn)
static Uint8 GetMirrorQueueSize(void)
static CAtomicCounter sm_TotalCopyRequests
void x_ProcessUpdateEvent(SNCMirrorEvent *event)
static void MirrorProlong(const CNCBlobKeyLight &key, Uint2 slot, Uint8 orig_rec_no, Uint8 orig_time, const CNCBlobAccessor *accessor)
bool CreateNewSocket(CNCActiveHandler *conn)
CNCActiveHandler * GetBGConn(bool silent=false)
virtual void ExecuteSlice(TSrvThreadNum thr_num)
This is the main method to do all work this task should do.
void SetHostProtocol(Uint8 ver)
static void ReconfServersForInitSync(Uint4 cnt_servers)
void AssignClientConn(CNCActiveClientHub *hub)
void RemoveSyncControl(CNCActiveSyncControl *sync_ctrl)
static void PrintState(CSrvSocketTask &task)
void x_AddMirrorEvent(SNCMirrorEvent *event, Uint8 size)
void x_UpdateHasTasks(void)
void RegisterSyncStop(bool is_passive, Uint8 &next_sync_time, Uint8 next_sync_delay)
static bool Initialize(void)
static void ReadCurState(SNCStateStat &state)
bool x_AssignClientConn(CNCActiveClientHub *hub, CNCActiveHandler *conn)
static string GetPeerNameOrEmpty(Uint8 srv_id)
static bool HasServersForInitSync(void)
map< size_t, size_t > m_SyncStat
CNCActiveHandler * x_CreateNewConn(bool for_bg)
CNCActiveHandler * x_GetPooledConnImpl(void)
static CNCPeerControl * Peer(Uint8 srv_id)
static void PrintSyncStat(CSrvSocketTask &task)
TNCActiveSyncListIt m_NextTaskSync
static Uint4 FindIPbyAlias(Uint4 alias)
static void ResetServersForInitSync(void)
void x_SlotsInitiallySynced(Uint2 cnt_slots, bool aborted=false)
static CAtomicCounter sm_CopyReqsRejected
bool AcceptsBlobKey(const CNCBlobKeyLight &key) const
bool AcceptsSyncRemove(void) const
static void MirrorUpdate(const CNCBlobKeyLight &key, Uint2 slot, Uint8 update_time)
void x_SrvInitiallySynced(bool succeeded)
bool x_ReserveBGConn(void)
virtual ~CNCPeerShutdown(void)
virtual bool ReadyForShutdown(void)
Method called if server is ready to shutdown.
static void InitialSyncComplete(void)
static Uint4 GetCntRunningCmds(void)
static void InitialSyncDone(Uint8 srv_id, bool succeeded)
Task controlling a socket.
bool Connect(Uint4 host, Uint2 port)
Create new socket and connect it to given IP and port.
CSrvSocketTask & WriteText(CTempString message)
Write text into socket.
CSrvSocketTask & WriteNumber(NumType num)
Write number into socket as string, i.e.
CSrvSocketTask & WriteBool(bool b)
void RunAfter(Uint4 delay_sec)
This call is basically equivalent to SetRunnable() but with guarantee that task will be scheduled for...
void SetRunnable(bool boost=false)
Set this task "runnable", i.e.
static int CurSecs(void)
Current time in seconds since epoch (time_t).
static CSrvTime Current(void)
Exact current time with precision up to nanoseconds.
Uint8 AsUSec(void) const
Converts object's value to microseconds since epoch.
static void RequestShutdown(ESrvShutdownType shutdown_type)
Asks server to start shutdown procedures.
static void AddShutdownCallback(CSrvShutdownCallback *callback)
Adds new object wishing to receive callbacks when server is going to shutdown and wishing to influenc...
static string IPToString(Uint4 ip)
Converts 4-byte encoded IP address into its string representation.
static bool IsInShutdown(void)
Checks if TaskServer received request to shutdown.
static Uint4 GetIPByHost(const string &host)
Converts server name (or IP address written as string) to encoded 4-byte IP address.
static bool IsInHardShutdown(void)
Checks if TaskServer currently is in "hard" shutdown phase when no operation is allowed to proceed un...
static void ResetStatCounters(void)
container_type::const_iterator const_iterator
const_iterator begin() const
const_iterator end() const
const_iterator find(const key_type &key) const
static bool is_valid(const char *num, int type, CONV_RESULT *cr)
static CS_CONNECTION * conn
static DLIST_TYPE *DLIST_NAME() first(DLIST_LIST_TYPE *list)
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
#define ERASE_ITERATE(Type, Var, Cont)
Non-constant version with ability to erase current element, if container permits.
#define NON_CONST_ITERATE(Type, Var, Cont)
Non constant version of ITERATE macro.
void Set(TValue new_value) THROWS_NONE
Set atomic counter value.
TValue Add(int delta) THROWS_NONE
Atomically add value (=delta), and return new counter value.
TValue Get(void) const THROWS_NONE
Get atomic counter value.
void Critical(CExceptionArgs_Base &args)
void Error(CExceptionArgs_Base &args)
void Warning(CExceptionArgs_Base &args)
uint32_t Uint4
4-byte (32-bit) unsigned integer
uint16_t Uint2
2-byte (16-bit) unsigned integer
uint64_t Uint8
8-byte (64-bit) unsigned integer
#define NCBI_UINT8_FORMAT_SPEC
Uint4 TValue
Type of the generated integer value and/or the seed value.
TValue GetRand(void)
Get the next random number in the interval [0..GetMax()] (inclusive)
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
constexpr bool empty(list< Ts... >) noexcept
const struct ncbi::grid::netcache::search::fields::SIZE size
const struct ncbi::grid::netcache::search::fields::KEY key
const GenericPointer< typename T::ValueType > T2 value
const string & GetMessageByStatus(EHTTPStatus sts)
@ eStatus_ShuttingDown
operation canceled because server needs to shutdown.
vector< Uint8 > TServersList
static Uint4 s_ServersToSync
static CAtomicCounter s_SyncOnInit
map< Uint8, CNCPeerControl * > TControlMap
static CAtomicCounter s_WaitToOpenToClients
static CNCPeerShutdown * s_ShutdownListener
static CAtomicCounter s_AbortedSyncClients
static CMiniMutex s_RndLock
static TControlMap s_Controls
static CAtomicCounter s_MirrorQueueSize
static void s_SetNextTime(Uint8 &next_time, Uint8 value, bool add_random)
static CRandom s_Rnd(CRandom::TValue(time(NULL)))
static CMiniMutex s_MapLock
static FILE * s_MirrorLogFile
intr::list< CNCActiveHandler, intr::base_hook< TActiveListHook >, intr::constant_time_size< false > > TNCPeerConnsList
list< SNCMirrorEvent * > TNCMirrorQueue
TNCActiveSyncList::iterator TNCActiveSyncListIt
list< CNCActiveSyncControl * > TNCActiveSyncList
#define INFO(msg)
Macro to be used for printing informational messages.
#define SRV_LOG(sev, msg)
Macro to be used for printing log messages.
bool AtomicCAS(T volatile &var, T old_value, T new_value)
constexpr Uint8 kUSecsPerSecond
SNCMirrorProlong(ENCSyncEvent typ, Uint2 slot_, const CNCBlobKeyLight &key_, Uint8 rec_no, Uint8 tm, const CNCBlobAccessor *accessor)
ENCSyncEvent
Event types to log.
Uint2 TSrvThreadNum
Type for thread number in TaskServer.
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4