<
typenameType>
void 69shuffle(lst.begin(), lst.end(), mt);
75TSyncSlotsList::const_iterator sl =
s_SlotsList.begin();
78s_ShuffleList<SSyncSlotSrv*>((*sl)->srvs);
95slot_data = it_slot->second;
161slot_srv->
hint= hint;
258 #if __NC_TASKS_MONITOR 259m_TaskName =
"CNCLogCleaner";
273clean_required(
false)
278sync_started(
false),
280is_by_blobs(
false),
281was_blobs_sync(
false),
282made_initial_sync(
false),
286last_active_time(
CSrvTime::Current().Sec()),
287last_success_time(0),
302 for(
Uint2 i= 0;
i< slots.size(); ++
i) {
309 Uint4cnt_to_sync = 0;
314 ITERATE(vector<Uint2>, it_slot, commonSlots) {
317slot_data->
srvs.push_back(slot_srv);
320 if(!commonSlots.empty()) {
329 for(
Uint1 i= 0;
i< cnt_syncs; ++
i) {
340 if(cnt_to_sync == 0)
356peer_slots[it_peer->first].
insert(commonSlots.begin(), commonSlots.end());
361 ITERATE(vector<Uint2>, it_slot, self_slots) {
367 Uint8srv_id = (*it_srv)->peer->GetSrvId();
368all_peers.
insert((*it_srv)->peer);
369processed.
insert(srv_id);
371 if(p != peer_slots.
end() && p->second.
find( *it_slot) != p->second.
end()) {
380p != peer_slots.
end(); ++p) {
381 if(processed.
find(p->first) != processed.
end()) {
384processed.
insert(p->first);
388 if(p->second.find( *it_slot) != p->second.end()) {
390slot_data->
srvs.push_back( slot_srv);
396 Uint4cnt_to_sync = 0;
399 if(p != peer_slots.
end()) {
400(*it_peer)->ReconfSlotsForInitSync(
Uint2(p->second.
size()));
401 if(p->second.
size() != 0) {
414 if(!(*c)->IsStuck()) {
422(*srv)->made_initial_sync =
false;
423(*srv)->peer->ResetSlotsForInitSync();
438 Uint8* local_start_rec_no,
439 Uint8* remote_start_rec_no,
449 if(slot_srv ==
NULL) {
471 if(records_available
493 if(slot_srv ==
NULL)
578 Uint8local_synced_rec_no,
579 Uint8remote_synced_rec_no)
583remote_synced_rec_no);
605 #if __NC_TASKS_MONITOR 606m_TaskName =
"CNCActiveSyncControl";
623CNCActiveSyncControl::State
656CNCActiveSyncControl::State
701CNCActiveSyncControl::State
742CNCActiveSyncControl::State
756 if(wait_time > sync_interval)
757wait_time = sync_interval;
774CNCActiveSyncControl::State
824CNCActiveSyncControl::State
850CNCActiveSyncControl::State
857CNCActiveSyncControl::State
861 SRV_FATAL(
"Command finalized already");
877CNCActiveSyncControl::State
883 boolis_locked =
false;
907 if(!is_locked) {
m_Lock.
Lock(); is_locked =
true;}
923(*h)->CheckCommandTimeout();
931CNCActiveSyncControl::State
1003CNCActiveSyncControl::State
1036 conn->SyncBlobsList(
this);
1054CNCActiveSyncControl::State
1070CNCActiveSyncControl::State
1107 deleteit->second.wr_or_rm_event;
1108 deleteit->second.prolong_event;
1244 case eSynEventSend: blob_key = (*m_CurSendEvent)->key.PackedKey();
break;
1245 case eSynEventGet: blob_key = (*m_CurGetEvent)->key.PackedKey();
break;
1251 if(!blob_key.empty()) {
1252 Uint2slot=0, bucket;
1267!
conn->GetPeer()->AcceptsBlobKey(event->key)) {
1272 switch(event->event_type) {
1276 conn->SyncSend(
this, event);
1279 conn->SyncProlongPeer(
this, event);
1289 switch(event->event_type) {
1293 conn->SyncRead(
this, event);
1296 conn->SyncProlongOur(
this, event);
1309 conn->SyncProlongOur(
this,
key, *remote_blob);
1319 if(!
conn->GetPeer()->AcceptsBlobKey(
key)) {
1327 conn->SyncProlongPeer(
this,
key, *local_blob);
1329 conn->SyncSend(
this,
key);
1337 if(!
conn->GetPeer()->AcceptsBlobKey(
key)) {
1342 conn->SyncSend(
this,
key);
1351 conn->SyncRead(
this,
key, create_time);
1363 conn->SyncCancel(
this);
1388 SRV_FATAL(
"Invalid state: no m_StartedCmds");
1480}
else if(res !=
eSynOK) {
1500 boolone_slot =
false, is_audit =
false;
1501 if(!
mask.empty()) {
1502is_audit =
mask==
"audit";
1513 stringis(
"\": "), iss(
"\": \""), eol(
",\n\"");
1515task.
WriteText(
",\n\"SyncControls\": [");
1585 boolis_first =
true;
1589 if((*sl)->slot != slot) {
1593 if((*sl)->cnt_sync_started == 0) {
1609 if(srv != (*sl)->srvs.begin()) {
ncbi::TMaskedQueryRegions mask
Mutex created to have minimum possible size (its size is 4 bytes) and to sleep using kernel capabilit...
void Unlock(void)
Unlock the mutex.
void Lock(void)
Lock the mutex.
State x_WaitSyncStarted(void)
void x_DoEventSend(const SSyncTaskInfo &task_info, CNCActiveHandler *conn)
void x_DoBlobSend(const SSyncTaskInfo &task_info, CNCActiveHandler *conn)
TSyncSlotsList::const_iterator m_NextSlotIt
void x_CleanSyncObjects(void)
void x_DoBlobGet(const SSyncTaskInfo &task_info, CNCActiveHandler *conn)
State x_ExecuteSyncCommands(void)
TSyncEventsIt m_CurGetEvent
static void PrintState(TNCBufferType &sendBuff, const CTempString &mask)
State x_WaitForExecutingTasks(void)
State x_PrepareSyncByEvents(void)
void x_DoFinalize(CNCActiveHandler *conn)
void x_DoEventGet(const SSyncTaskInfo &task_info, CNCActiveHandler *conn)
State x_CheckSlotOurSync(void)
Uint8 m_RemoteSyncedRecNo
bool GetNextTask(SSyncTaskInfo &task_info, bool *is_valid=nullptr)
TReducedSyncEvents m_RemoteEvents
CNCActiveSyncControl(void)
TSyncEvents m_Events2Send
State x_PrepareSyncByBlobs(void)
virtual ~CNCActiveSyncControl(void)
void CmdFinished(ESyncResult res, ESynActionType action, CNCActiveHandler *conn, int hint)
void ExecuteSyncTask(const SSyncTaskInfo &task_info, CNCActiveHandler *conn)
State x_CheckSlotTheirSync(void)
State x_ExecuteFinalize(void)
void x_DoBlobUpdatePeer(const SSyncTaskInfo &task_info, CNCActiveHandler *conn)
State x_FinishScanSlots(void)
State x_StartScanSlots(void)
State x_DoPeriodicSync(void)
TBlobsListIt m_CurLocalBlob
SSyncSlotData * m_SlotData
void x_CalcNextTask(void)
TNCBlobSumList m_LocalBlobs
set< SSyncSlotSrv * > m_VisitedSrv
set< CNCActiveHandler * > m_SyncHandlers
void x_DoBlobUpdateOur(const SSyncTaskInfo &task_info, CNCActiveHandler *conn)
TBlobsListIt m_CurRemoteBlob
TNCBlobSumList m_RemoteBlobs
State x_WaitForBlobList(void)
TSyncEventsIt m_CurSendEvent
static void GetFullBlobsList(Uint2 slot, TNCBlobSumList &blobs_lst, const CNCPeerControl *peer)
static bool NeedStopWrite(void)
static void SaveMaxSyncLogRecNo(void)
static bool IsDraining(void)
static const TNCPeerList & GetPeers(void)
static Uint8 GetPeriodicSyncInterval(void)
static const vector< Uint2 > & GetSelfSlots(void)
static string GetPeerName(Uint8 srv_id)
static Uint8 GetNetworkErrorTimeout(void)
static Uint8 GetPeriodicSyncTimeout(void)
static Uint8 GetSelfTrustLevel(void)
static bool GetSlotByKey(const string &key, Uint2 &slot, Uint2 &time_bucket)
static Uint8 GetFailedSyncRetryDelay(void)
static const string & GetPeriodicLogFile(void)
static Uint8 GetMaxBlobSizeSync(void)
static Uint1 GetCntActiveSyncs(void)
static const vector< Uint2 > & GetCommonSlots(Uint8 server)
static Uint8 GetSelfID(void)
static Uint4 GetCleanAttemptInterval(void)
static Uint8 GetMinForcedCleanPeriod(void)
map< Uint2, Uint8 > m_LastForceTime
virtual ~CNCLogCleaner(void)
virtual void ExecuteSlice(TSrvThreadNum thr_idx)
This is the main method to do all work this task should do.
TSyncSlotsList::const_iterator m_NextSlotIt
Uint8 GetNextSyncTime(void)
static void SetServersForInitSync(Uint4 cnt_servers)
Uint8 GetSrvId(void) const
void RegisterConnSuccess(void)
void AddInitiallySyncedSlot(void)
bool FinishSync(CNCActiveSyncControl *sync_ctrl)
Uint8 GetTrustLevel(void) const
void RegisterSyncStat(bool is_passive, bool is_by_blobs, int result, int hint)
bool StartActiveSync(void)
static CAtomicCounter sm_TotalCopyRequests
CNCActiveHandler * GetBGConn(bool silent=false)
static void ReconfServersForInitSync(Uint4 cnt_servers)
void SetSlotsForInitSync(Uint2 cnt_slots)
void RegisterSyncStop(bool is_passive, Uint8 &next_sync_time, Uint8 next_sync_delay)
static bool HasServersForInitSync(void)
static CNCPeerControl * Peer(Uint8 srv_id)
static void ResetServersForInitSync(void)
static CAtomicCounter sm_CopyReqsRejected
static void ReInitialize(void)
static ESyncInitiateResult Initiate(Uint8 server_id, Uint2 slot, Uint8 *local_start_rec_no, Uint8 *remote_start_rec_no, TReducedSyncEvents *events, Uint8 *sync_id)
static void MarkCurSyncByBlobs(Uint8 server_id, Uint2 slot, Uint8 sync_id)
static void Finalize(void)
static void Commit(Uint8 server_id, Uint2 slot, Uint8 sync_id, Uint8 local_synced_rec_no, Uint8 remote_synced_rec_no)
static bool Initialize(void)
static void Cancel(Uint8 server_id, Uint2 slot, Uint8 sync_id)
static void ReConfig(void)
static ESyncInitiateResult CanStartSyncCommand(Uint8 server_id, Uint2 slot, bool can_abort, Uint8 &sync_id)
static void SyncCommandFinished(Uint8 server_id, Uint2 slot, Uint8 sync_id)
static void InitialSyncRequired(void)
static void InitialSyncComplete(void)
static void AddSyncServer(Uint8 srv_id)
static void PeerSyncFinished(Uint8 srv_id, Uint2 slot, Uint8 cnt_ops, bool success)
static Uint8 Clean(Uint2 slot)
static void GetLastSyncedRecNo(Uint8 server, Uint2 slot, Uint8 *local_synced_rec_no, Uint8 *remote_synced_rec_no)
static bool GetEventsList(Uint8 server, Uint2 slot, Uint8 *local_start_rec_no, Uint8 *remote_start_rec_no, TReducedSyncEvents *events)
static Uint8 GetCurrentRecNo(Uint2 slot)
static bool GetSyncOperations(Uint8 server, Uint2 slot, Uint8 local_start_rec_no, Uint8 remote_start_rec_no, const TReducedSyncEvents &remote_events, TSyncEvents *events_to_get, TSyncEvents *events_to_send, Uint8 *local_synced_rec_no, Uint8 *remote_synced_rec_no)
static Uint8 GetLogSize(void)
static bool IsOverLimit(Uint2 slot)
static void SetLastSyncRecNo(Uint8 server, Uint2 slot, Uint8 local_synced_rec_no, Uint8 remote_synced_rec_no)
Class used in all diagnostic logging.
CSrvDiagMsg & PrintExtra(void)
Starts "extra" message.
CSrvDiagMsg & StartRequest(void)
Starts "request-start" message.
CSrvDiagMsg & PrintParam(CTempString name, CTempString value)
Adds parameter to "request-start" or "extra" message.
void StopRequest(void)
Prints "request-stop" message.
void SetState(State state)
Sets current state of the machine.
CRequestContext * GetDiagCtx(void)
Get current diagnostic context for the task.
void RunAfter(Uint4 delay_sec)
This call is basically equivalent to SetRunnable() but with guarantee that task will be scheduled for...
void SetRunnable(bool boost=false)
Set this task "runnable", i.e.
void ReleaseDiagCtx(void)
Releases current diagnostic context of the task.
void CreateNewDiagCtx(void)
Create new diagnostic context for this task to work in.
Class incorporating convenient methods to work with struct timespec.
static int CurSecs(void)
Current time in seconds since epoch (time_t).
static CSrvTime Current(void)
Exact current time with precision up to nanoseconds.
Uint8 AsUSec(void) const
Converts object's value to microseconds since epoch.
Uint1 Print(char *buf, EFormatType fmt) const
Formats time value in the object and writes it in buf.
@ eFmtJson
For JSON output.
static bool IsInShutdown(void)
Checks if TaskServer received request to shutdown.
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
TNCBufferType & WriteNumber(NumType num)
TNCBufferType & WriteText(const char *buf)
TNCBufferType & WriteBool(bool b)
container_type::const_iterator const_iterator
const_iterator begin() const
const_iterator end() const
iterator_bool insert(const value_type &val)
const_iterator find(const key_type &key) const
iterator_bool insert(const value_type &val)
const_iterator find(const key_type &key) const
const_iterator end() const
static bool is_valid(const char *num, int type, CONV_RESULT *cr)
static CS_CONNECTION * conn
static DLIST_TYPE *DLIST_NAME() first(DLIST_LIST_TYPE *list)
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
#define ERASE_ITERATE(Type, Var, Cont)
Non-constant version with ability to erase current element, if container permits.
#define VECTOR_ERASE(Var, Cont)
Use this macro inside body of ERASE_ITERATE cycle to erase from vector-like container.
void SetBytesWr(Int8 bytes)
void SetRequestStatus(int status)
void Warning(CExceptionArgs_Base &args)
uint8_t Uint1
1-byte (8-bit) unsigned integer
uint32_t Uint4
4-byte (32-bit) unsigned integer
uint16_t Uint2
2-byte (16-bit) unsigned integer
int64_t Int8
8-byte (64-bit) signed integer
uint64_t Uint8
8-byte (64-bit) unsigned integer
#define NCBI_UINT8_FORMAT_SPEC
Uint4 TValue
Type of the generated integer value and/or the seed value.
TValue GetRand(void)
Get the next random number in the interval [0..GetMax()] (inclusive)
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
static unsigned int StringToUInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to unsigned int.
const TYPE & Get(const CNamedParameterList *param)
constexpr bool empty(list< Ts... >) noexcept
const struct ncbi::grid::netcache::search::fields::KEY key
@ eStatus_PeerError
Command should be proxied to peers but it's impossible to connect to any.
@ eStatus_SyncBusy
Synchronization cannot start because server is busy doing cleaning or some other synchronization on t...
@ eStatus_SyncAborted
Synchronization is aborted because something went wrong.
@ eStatus_CrossSync
Synchronization is rejected because both servers tried to start it simultaneously.
Common macro to detect used sanitizers and suppress memory leaks if run under LeakSanitizer.
#define NCBI_CLANG_ANALYZER_SUPPRESS
Suppress clang analyzer report.
vector< CNCActiveSyncControl * > TSyncControls
static CNCLogCleaner * s_LogCleaner
static void s_DoCleanLog(CNCLogCleaner *cleaner, Uint2 slot)
static void s_StopSync(SSyncSlotData *slot_data, SSyncSlotSrv *slot_srv, Uint8 next_delay, ESyncResult result, int hint)
static TSyncSlotsMap s_SlotsMap
static void s_CancelSync(SSyncSlotData *slot_data, SSyncSlotSrv *slot_srv, Uint8 next_delay, ESyncResult result, int hint)
static void s_CommitSync(SSyncSlotData *slot_data, SSyncSlotSrv *slot_srv, int hint)
static CMiniMutex s_RndLock
static TSyncSlotsList s_SlotsList
static TSyncControls s_SyncControls
static void s_ShuffleSrvsLists(void)
static ESyncInitiateResult s_StartSync(SSyncSlotData *slot_data, SSyncSlotSrv *slot_srv, bool is_passive)
static CRandom s_Rnd(CRandom::TValue(time(NULL)))
static void s_FindServerSlot(Uint8 server_id, Uint2 slot, SSyncSlotData *&slot_data, SSyncSlotSrv *&slot_srv)
void s_ShuffleList(vector< Type > &lst)
vector< SSyncSlotSrv * > TSlotSrvsList
vector< SSyncSlotData * > TSyncSlotsList
Defines CRequestContext class for NCBI C++ diagnostic API.
#define SRV_LOG(sev, msg)
Macro to be used for printing log messages.
constexpr Uint8 kUSecsPerSecond
bool isSameData(const SNCBlobSummary &other) const
SSyncSlotData(Uint2 slot)
SSyncSlotSrv(CNCPeerControl *peer)
Uint2 TSrvThreadNum
Type for thread number in TaskServer.
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4