& client_ip, in_port_t & client_port)
85 structsockaddr sock_addr;
86 intsock_addr_size =
sizeof(sock_addr);
87 if(uv_tcp_getpeername(tcp, &sock_addr, &sock_addr_size) == 0) {
89client_port =
GetPort(&sock_addr);
104in_port_t client_port = 0;
121 PSG_INFO(
"CTcpWorkersList::~()>>");
123 PSG_INFO(
"CTcpWorkersList::~()<<");
124m_Daemon->m_WorkersList =
nullptr;
129 unsigned shortnworkers,
135 for(
unsigned int i= 0;
i< nworkers; ++
i) {
136m_Workers.emplace_back(
new CTcpWorker(
i+ 1, exp,
137m_Daemon, http_daemon));
140 for(
auto& it: m_Workers) {
142err_code = uv_thread_create(&worker->
m_thread, s_WorkerExecute,
143 static_cast<void*
>(worker));
146 "uv_thread_create failed", err_code);
148m_on_watch_dog = OnWatchDog;
149m_Daemon->m_WorkersList =
this;
155 for(
auto& it : m_Workers)
164 for(
auto& it : m_Workers)
176 if(proc_groups == 0) {
177 self->m_Daemon->StopDaemonLoop();
181 PSG_MESSAGE(
"Shutdown timeout is over when there are " 182 "unfinished requests. Exiting immediately.");
199 if(!self->AnyWorkerIsRunning()) {
200uv_stop(handle->loop);
202 if(self->m_on_watch_dog) {
203 self->m_on_watch_dog(*self->m_Daemon);
213 for(
auto& it : m_Workers) {
218err_code = uv_thread_join(&worker->
m_thread);
222}
else if(-err_code != EAGAIN) {
223 PSG_ERROR(
"uv_thread_join failed: "<< err_code);
235 boolneed_comma =
false;
236json.append(1,
'[');
238 for(
auto& it : m_Workers) {
239std::vector<SConnectionRunTimeProperties> conn_props = it->GetConnProps();
241 for(
auto& props : conn_props) {
242 if(props.m_Id == self_connection_id) {
247 const autodiff_ms = duration_cast<milliseconds>(now - props.m_OpenTimestamp);
248 if(diff_ms.count() < 10) {
256json.append(1,
',');
264json.append(1,
']');
279throttling_data.
Clear();
282 for(
auto& it : m_Workers) {
283it->PopulateThrottlingData(throttling_data, now, m_ConnThrottleIdleTimeoutMs);
291 if(it.second > m_ConnThrottleByHost) {
296 if(it.second > m_ConnThrottleBySite) {
301 if(it.second > m_ConnThrottleByProcess) {
306 if(it.second > m_ConnThrottleByUserAgent) {
316 for(
auto& it : m_Workers) {
317 if(it->m_id == worker_id) {
318 returnit->CloseThrottledConnection(conn_id);
327 if(m_started && !m_shutdown && !m_shuttingdown) {
328vector<CHttpConnection *> all_connections;
330m_ConnListLock.lock();
331 for(
autoit = m_ConnectedList.begin();
332it != m_ConnectedList.end(); ++it) {
334all_connections.push_back(http_connection);
336 for(
autoit = m_FreeList.begin();
337it != m_FreeList.end(); ++it) {
339all_connections.push_back(http_connection);
341m_ConnListLock.unlock();
343 for(
auto conn: all_connections) {
344 conn->CleanupTimers();
347uv_async_send(&m_internal->m_async_stop);
357 "Worker has already been started");
364m_protocol.BeforeStart();
365err_code =
uv_import(m_internal->m_loop.Handle(),
366 reinterpret_cast<uv_stream_t*
>(&m_internal->m_listener),
371 "uv_import failed", err_code);
373m_internal->m_listener.data =
this;
374err_code = uv_listen(
reinterpret_cast<uv_stream_t*
>(&m_internal->m_listener),
375m_Daemon->m_Backlog, s_OnTcpConnection);
378 "uv_listen failed", err_code);
379m_internal->m_listener.data =
this;
381err_code = uv_async_init(m_internal->m_loop.Handle(),
382&m_internal->m_async_stop, s_OnAsyncStop);
385 "uv_async_init failed", err_code);
386m_internal->m_async_stop.data =
this;
388err_code = uv_async_init(m_internal->m_loop.Handle(),
389&m_internal->m_async_work, s_OnAsyncWork);
392 "uv_async_init failed", err_code);
393m_internal->m_async_work.data =
this;
395err_code = uv_timer_init(m_internal->m_loop.Handle(),
396&m_internal->m_timer);
399 "uv_timer_init failed", err_code);
400m_internal->m_timer.data =
this;
402uv_timer_start(&m_internal->m_timer, s_OnTimer, 1000, 1000);
405m_protocol.ThreadStart(m_internal->m_loop.Handle(),
this);
410err_code = uv_run(m_internal->m_loop.Handle(), UV_RUN_DEFAULT);
414 PSG_INFO(
"uv_run (1) worker "<< m_id <<
415 " returned "<< err_code);
417 PSG_ERROR(
"Libuv exception while preparing/running worker "<< m_id <<
419 " Error: "<< exc.
GetMsg());
421 PSG_ERROR(
"NCBI exception while preparing/running worker "<< m_id <<
423 " Error: "<< exc.
GetMsg());
424}
catch(
constexception & exc) {
425 PSG_ERROR(
"Standard exception while preparing/running worker "<< m_id <<
426 " Error: "<< exc.
what());
428 PSG_ERROR(
"Unknown exception while preparing/running worker "<< m_id);
431m_shuttingdown =
true;
432 PSG_INFO(
"worker "<< m_id <<
" is closing");
437 if(m_internal->m_listener.type != 0) {
438uv_close(
reinterpret_cast<uv_handle_t*
>(&m_internal->m_listener),
444 while(!m_ConnectedList.empty()) {
445uv_run(m_internal->m_loop.Handle(), UV_RUN_NOWAIT);
448 if(m_internal->m_async_stop.type != 0) {
449uv_close(
reinterpret_cast<uv_handle_t*
>(&m_internal->m_async_stop),
452 if(m_internal->m_async_work.type != 0) {
453uv_close(
reinterpret_cast<uv_handle_t*
>(&m_internal->m_async_work),
456 if(m_internal->m_timer.type != 0) {
457uv_close(
reinterpret_cast<uv_handle_t*
>(&m_internal->m_timer),
461m_protocol.ThreadStop();
463err_code = uv_run(m_internal->m_loop.Handle(), UV_RUN_DEFAULT);
466 ", uv_run (2) returned "<< err_code <<
467 ", st: "<< m_started.load());
470err_code = m_internal->m_loop.Close();
473 ", uv_loop_close returned "<< err_code <<
474 ", st: "<< m_started.load());
477uv_walk(m_internal->m_loop.Handle(), s_LoopWalk,
this);
479m_internal.reset(
nullptr);
480}
catch(
constexception & exc) {
481 PSG_ERROR(
"Exception while shutting down worker "<< m_id <<
484 PSG_ERROR(
"Unexpected exception while shutting down worker "<<
494 if(!m_close_all_issued) {
495m_close_all_issued =
true;
498vector<CHttpConnection *> http_conns;
500m_ConnListLock.lock();
501 for(
autoit = m_ConnectedList.begin(); it != m_ConnectedList.end(); ++it) {
506http_conns.push_back(http_conn);
508m_ConnListLock.unlock();
528 for(
autohttp_conn : http_conns) {
537uv_tcp_t *tcp =
reinterpret_cast<uv_tcp_t*
>(handle);
538std::lock_guard<std::mutex> lock(m_ConnListLock);
540 for(
autoit = m_ConnectedList.begin(); it != m_ConnectedList.end(); ++it) {
541 if(tcp == &std::get<0>(*it)) {
547m_Daemon->DecrementAboveSoftLimitConnCount();
549m_Daemon->DecrementBelowSoftLimitConnCount();
552m_protocol.OnClientClosedConnection(
reinterpret_cast<uv_stream_t*
>(handle),
569m_FreeList.splice(m_FreeList.begin(), m_ConnectedList, it);
580uv_async_send(&m_internal->m_async_work);
584std::list<std::tuple<uv_tcp_t, CHttpConnection>> &
587 returnm_ConnectedList;
591std::vector<SConnectionRunTimeProperties>
594std::vector<SConnectionRunTimeProperties> conn_props;
595conn_props.reserve(m_ConnectedList.size());
597std::lock_guard<std::mutex> lock(m_ConnListLock);
598 for(
auto& it: m_ConnectedList) {
599conn_props.emplace_back(std::get<1>(it).GetProperties());
607std::lock_guard<std::mutex> lock(m_ConnListLock);
608 for(
auto& it: m_ConnectedList) {
609 if(std::get<1>(it).GetConnectionId() == conn_id) {
610std::get<1>(it).CloseThrottledConnection(
624std::lock_guard<std::mutex> lock(m_ConnListLock);
625 for(
auto& it: m_ConnectedList) {
661 int64_ttimespan = chrono::duration_cast<chrono::milliseconds>
663 if(timespan <=
static_cast<int64_t>(idle_timeout_ms)) {
667 int64_ttimespan = chrono::duration_cast<chrono::milliseconds>
669 if(timespan <=
static_cast<int64_t>(idle_timeout_ms)) {
684m_id, props.
m_Id));
694m_protocol.OnAsyncWork(m_shuttingdown || m_shutdown);
700 PSG_INFO(
"Worker async work requested");
710m_protocol.OnTimer();
725 PSG_INFO(
"Worker async stop requested");
726uv_stop(handle->loop);
732 if(listener && status == 0) {
753 " ("<< handle->type <<
754 ") @ worker "<< (worker ? worker->
m_id: -1) <<
755 " ("<< worker <<
")");
763 if(m_FreeList.empty()) {
767m_FreeList.emplace_back(
768tuple<uv_tcp_t, CHttpConnection>(uv_tcp_t{0},
771conn_force_close_wait_ms)));
772 autonew_item = m_FreeList.rbegin();
774http_connection->
SetupTimers(m_internal->m_loop.Handle());
777 autoit = m_FreeList.begin();
778uv_tcp_t * tcp = & get<0>(*it);
779 interr_code = uv_tcp_init(m_internal->m_loop.Handle(), tcp);
784 PSG_ERROR(
"TCP connection accept failed; uv_tcp_init() error code: "<< err_code);
793uv_tcp_nodelay(tcp, 1);
794uv_tcp_keepalive(tcp, 1, 120);
797m_ConnListLock.lock();
798m_ConnectedList.splice(m_ConnectedList.begin(), m_FreeList, it);
799m_ConnListLock.unlock();
802 int64_tnum_connections = m_Daemon->NumOfConnections();
804err_code = uv_accept(listener,
reinterpret_cast<uv_stream_t*
>(tcp));
812m_Daemon->IncrementAboveSoftLimitConnCount();
818 PSG_ERROR(
"TCP connection accept failed; uv_accept() error code: "+
819to_string(err_code));
826uv_close(
reinterpret_cast<uv_handle_t*
>(tcp), s_OnClientClosed);
831in_port_t peer_port = 0;
834 if(m_shuttingdown) {
843m_Daemon->IncrementAboveSoftLimitConnCount();
844http_conn->
PrepareForUsage(
nullptr, num_connections, peer_ip,
true);
846uv_close(
reinterpret_cast<uv_handle_t*
>(tcp), s_OnClientClosed);
854 int64_tconn_hard_limit = m_Daemon->GetMaxConnectionsHardLimit();
856 if(num_connections >= conn_hard_limit) {
864m_Daemon->IncrementAboveSoftLimitConnCount();
865http_conn->
PrepareForUsage(
nullptr, num_connections, peer_ip,
true);
870 stringerr_msg =
"TCP Connection accept failed; " 871 "too many connections (maximum: "+
872to_string(conn_hard_limit) +
")";
881uv_close(
reinterpret_cast<uv_handle_t*
>(tcp), s_OnClientClosed);
885 int64_tconn_alert_limit = m_Daemon->GetMaxConnectionsAlertLimit();
886 int64_tconn_soft_limit = m_Daemon->GetMaxConnectionsSoftLimit();
887 int64_tnum_below_soft_limit_connections = m_Daemon->GetBelowSoftLimitConnCount();
889 boolexceed_soft_limit = (num_below_soft_limit_connections >= conn_soft_limit);
891 if(exceed_soft_limit) {
895m_Daemon->IncrementAboveSoftLimitConnCount();
899m_Daemon->IncrementBelowSoftLimitConnCount();
902 boolexceed_alert_limit = (num_connections >= conn_alert_limit);
903 if(exceed_alert_limit && ! exceed_soft_limit) {
908 stringwarn_msg =
"Number of client connections ("+
909to_string(num_connections) +
910 ") is getting too high, " 911 "exceeds the alert threshold ("+
912to_string(conn_alert_limit) +
")";
922m_protocol.OnNewConnection(
reinterpret_cast<uv_stream_t*
>(tcp),
923http_conn, s_OnClientClosed);
931 PSG_MESSAGE(
"SIGINT received. Immediate shutdown performed.");
944 autonow = psg_clock_t::now();
945 autoexpiration = now + chrono::hours(24);
949 PSG_MESSAGE(
"SIGTERM received. The previous shutdown " 950 "expiration is shorter than this one. Ignored.");
955 PSG_MESSAGE(
"SIGTERM received. Graceful shutdown is initiated");
973*http_proto =
nullptr;
987 if(m_Address.empty())
989 "Failed to start daemon: address is empty");
992 "Failed to start daemon: port is not specified");
994signal(SIGPIPE, SIG_IGN);
999 "uv_key_create failed", rc);
1013sigint.
Start(SIGINT, s_OnMainSigInt);
1016sigterm.
Start(SIGTERM, s_OnMainSigTerm);
1019sighup.
Start(SIGHUP, s_OnMainSigHup);
1022sigusr1.
Start(SIGUSR1, s_OnMainSigUsr1);
1025sigusr2.
Start(SIGUSR2, s_OnMainSigUsr2);
1028sigwinch.
Start(SIGWINCH, s_OnMainSigWinch);
1036 reinterpret_cast<uv_stream_t*
>(listener.
Handle()),
1037IPC_PIPE_NAME, m_NumWorkers, &exp);
1040 "uv_export_start failed", rc);
1043workers.
Start(exp, m_NumWorkers, http_daemon, OnWatchDog);
1044}
catch(
constexception & exc) {
1052 "uv_export_wait failed", rc);
1054listener.
Close(
nullptr);
1058uv_timer_t watch_dog;
1059uv_timer_init(loop.
Handle(), &watch_dog);
1060watch_dog.data = &workers;
1061uv_timer_start(&watch_dog, workers.
s_OnWatchDog, 1000, 1000);
1065uv_run(loop.
Handle(), UV_RUN_DEFAULT);
1067uv_close(
reinterpret_cast<uv_handle_t*
>(&watch_dog),
NULL);
size_t GetActiveProcGroupCounter(void)
@ ePSGS_TcpConnHardLimitExceeded
@ ePSGS_TcpConnAlertLimitExceeded
void PrepareForUsage(uv_tcp_t *uv_tcp_stream, int64_t conn_cnt_at_open, const string &peer_ip, bool exceed_soft_limit_flag)
void SetupTimers(uv_loop_t *tcp_worker_loop)
int64_t GetConnectionId(void) const
bool GetExceedSoftLimitFlag(void) const
static void DaemonStopped()
static void DaemonStarted()
void Register(EPSGS_AlertType alert_type, const string &message)
void IncrementRequestStopCounter(int status)
@ ePSGS_NumConnHardLimitExceeded
@ ePSGS_NumConnAlertLimitExceeded
@ ePSGS_NumConnSoftLimitExceeded
@ ePSGS_IncomingConnectionsCounter
void Increment(IPSGS_Processor *processor, EPSGS_CounterType counter)
void UnregisterUVLoop(uv_thread_t uv_thread)
void RegisterDaemonUVLoop(uv_thread_t uv_thread, uv_loop_t *uv_loop)
CPSGSCounters & GetCounters(void)
void CancelAllProcessors(void)
void RegisterUVLoop(uv_thread_t uv_thread, uv_loop_t *uv_loop)
CExcludeBlobCache * GetExcludeBlobCache(void)
static CPubseqGatewayApp * GetInstance(void)
CPSGAlerts & GetAlerts(void)
const SPubseqGatewaySettings & Settings(void) const
int GetUVLibraryErrorCode(void) const
static void s_OnMainSigTerm(uv_signal_t *, int)
bool OnRequest(CHttpProto **http_proto)
static void s_OnMainSigInt(uv_signal_t *, int)
static constexpr const char IPC_PIPE_NAME[]
void StopDaemonLoop(void)
void Run(CHttpDaemon &http_daemon, std::function< void(CTcpDaemon &daemon)> OnWatchDog=nullptr)
void Start(struct uv_export_t *exp, unsigned short nworkers, CHttpDaemon &http_daemon, std::function< void(CTcpDaemon &daemon)> OnWatchDog=nullptr)
static void s_WorkerExecute(void *_worker)
bool CloseThrottledConnection(unsigned int worker_id, int64_t conn_id)
void PopulateThrottlingData(SThrottlingData &throttling_data)
bool AnyWorkerIsRunning(void)
static uv_key_t s_thread_worker_key
string GetConnectionsStatus(int64_t self_connection_id)
static void s_OnWatchDog(uv_timer_t *handle)
void Start(int signum, uv_signal_cb cb)
void Bind(const char *addr, unsigned int port)
void Close(void(*close_cb)(uv_handle_t *handle))
const_iterator end() const
const_iterator find(const key_type &key) const
static CRef< CSerialObject > CloseAll(ParserPtr pp)
static CS_CONNECTION * conn
@ e503_ServiceUnavailable
TErrCode GetErrCode(void) const
Get error code.
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
const string & GetMsg(void) const
Get message string.
#define NCBI_THROW2(exception_class, err_code, message, extra)
Throw exception with extra parameter.
virtual const char * what(void) const noexcept
Standard report (includes full backlog).
unsigned short GetPort() const
Get the listening port number back.
unsigned short m_Port
TCP port to listen on.
string GetIPAddress(struct sockaddr *sock_addr)
chrono::system_clock::time_point time_point
string ToJsonString(const CBioseqInfoRecord &bioseq_info, SPSGS_ResolveRequest::TPSGS_BioseqIncludeData include_data_flags, const string &custom_blob_id)
#define PSG_ERROR(message)
#define PSG_INFO(message)
#define PSG_MESSAGE(message)
#define PSG_WARNING(message)
#define PSG_TRACE(message)
string GetSiteFromIP(const string &ip_address)
void DismissErrorRequestContext(CRef< CRequestContext > &context, int status, size_t bytes_sent)
CRef< CRequestContext > CreateErrorRequestContext(const string &client_ip, in_port_t client_port, int64_t connection_id)
std::atomic_bool m_shutdown
static void s_OnTcpConnection(uv_stream_t *listener, const int status)
std::vector< SConnectionRunTimeProperties > GetConnProps(void)
void OnClientClosed(uv_handle_t *handle)
std::list< std::tuple< uv_tcp_t, CHttpConnection > > & GetConnList(void)
static void s_OnAsyncStop(uv_async_t *handle)
static void s_LoopWalk(uv_handle_t *handle, void *arg)
static void s_OnClientClosed(uv_handle_t *handle)
static void s_OnTimer(uv_timer_t *handle)
void OnTcpConnection(uv_stream_t *listener)
static void s_OnAsyncWork(uv_async_t *handle)
bool CloseThrottledConnection(int64_t conn_id)
void PopulateThrottlingData(SThrottlingData &throttling_data, const system_clock::time_point &now, uint64_t idle_timeout_ms)
optional< string > m_PeerUserAgent
optional< string > m_PeerId
system_clock::time_point m_OpenTimestamp
optional< system_clock::time_point > m_LastRequestTimestamp
chrono::system_clock::time_point m_LastActivityTimestamp
size_t m_ConnThrottleByProcess
size_t m_ConnThrottleByUserAgent
double m_ConnForceCloseWaitSec
size_t m_ConnThrottleByHost
double m_ConnThrottleCloseIdleSec
size_t m_ConnThrottleBySite
atomic_bool m_ShutdownRequested
psg_time_point_t m_Expired
list< string > m_PeerIPOverLimit
list< string > m_UserAgentOverLimit
map< string, size_t > m_PeerIPCounts
map< string, size_t > m_PeerSiteCounts
list< string > m_PeerIDOverLimit
map< string, size_t > m_PeerIDCounts
list< SPSGS_IdleConnectionProps > m_IdleConnProps
map< string, size_t > m_UserAgentCounts
list< string > m_PeerSiteOverLimit
bool IdleTimestampPredicate(const SPSGS_IdleConnectionProps &lhs, const SPSGS_IdleConnectionProps &rhs)
void GetPeerIpAndPort(uv_tcp_t *tcp, string &client_ip, in_port_t &client_port)
SShutdownData g_ShutdownData
void UnregisterUVLoop(uv_thread_t uv_thread)
CRef< CRequestContext > CreateErrorRequestContextHelper(uv_tcp_t *tcp, int64_t connection_id)
void CancelAllProcessors(void)
void RegisterDaemonUVLoop(uv_thread_t uv_thread, uv_loop_t *uv_loop)
void RegisterUVLoop(uv_thread_t uv_thread, uv_loop_t *uv_loop)
void CollectGarbage(void)
static CS_CONTEXT * context
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4