A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from http://www.ncbi.nlm.nih.gov/IEB/ToolBox/CPP_DOC/doxyhtml/tcp__daemon_8cpp_source.html below:

NCBI C++ ToolKit: src/app/pubseq_gateway/server/tcp_daemon.cpp Source File

83  string

& client_ip, in_port_t & client_port)

85  struct

sockaddr sock_addr;

86  int

sock_addr_size =

sizeof

(sock_addr);

87  if

(uv_tcp_getpeername(tcp, &sock_addr, &sock_addr_size) == 0) {

89

client_port =

GetPort

(&sock_addr);

104

in_port_t client_port = 0;

121  PSG_INFO

(

"CTcpWorkersList::~()>>"

);

123  PSG_INFO

(

"CTcpWorkersList::~()<<"

);

124

m_Daemon->m_WorkersList =

nullptr

;

129  unsigned short

nworkers,

135  for

(

unsigned int i

= 0;

i

< nworkers; ++

i

) {

136

m_Workers.emplace_back(

new CTcpWorker

(

i

+ 1, exp,

137

m_Daemon, http_daemon));

140  for

(

auto

& it: m_Workers) {

142

err_code = uv_thread_create(&worker->

m_thread

, s_WorkerExecute,

143  static_cast<void

*

>

(worker));

146  "uv_thread_create failed"

, err_code);

148

m_on_watch_dog = OnWatchDog;

149

m_Daemon->m_WorkersList =

this

;

155  for

(

auto

& it : m_Workers)

164  for

(

auto

& it : m_Workers)

176  if

(proc_groups == 0) {

177  self

->m_Daemon->StopDaemonLoop();

181  PSG_MESSAGE

(

"Shutdown timeout is over when there are " 182  "unfinished requests. Exiting immediately."

);

199  if

(!self->AnyWorkerIsRunning()) {

200

uv_stop(handle->loop);

202  if

(self->m_on_watch_dog) {

203  self

->m_on_watch_dog(*self->m_Daemon);

213  for

(

auto

& it : m_Workers) {

218

err_code = uv_thread_join(&worker->

m_thread

);

222

}

else if

(-err_code != EAGAIN) {

223  PSG_ERROR

(

"uv_thread_join failed: "

<< err_code);

235  bool

need_comma =

false

;

236

json.append(1,

'['

);

238  for

(

auto

& it : m_Workers) {

239

std::vector<SConnectionRunTimeProperties> conn_props = it->GetConnProps();

241  for

(

auto

& props : conn_props) {

242  if

(props.m_Id == self_connection_id) {

247  const auto

diff_ms = duration_cast<milliseconds>(now - props.m_OpenTimestamp);

248  if

(diff_ms.count() < 10) {

256

json.append(1,

','

);

264

json.append(1,

']'

);

279

throttling_data.

Clear

();

282  for

(

auto

& it : m_Workers) {

283

it->PopulateThrottlingData(throttling_data, now, m_ConnThrottleIdleTimeoutMs);

291  if

(it.second > m_ConnThrottleByHost) {

296  if

(it.second > m_ConnThrottleBySite) {

301  if

(it.second > m_ConnThrottleByProcess) {

306  if

(it.second > m_ConnThrottleByUserAgent) {

316  for

(

auto

& it : m_Workers) {

317  if

(it->m_id == worker_id) {

318  return

it->CloseThrottledConnection(conn_id);

327  if

(m_started && !m_shutdown && !m_shuttingdown) {

328

vector<CHttpConnection *> all_connections;

330

m_ConnListLock.lock();

331  for

(

auto

it = m_ConnectedList.begin();

332

it != m_ConnectedList.end(); ++it) {

334

all_connections.push_back(http_connection);

336  for

(

auto

it = m_FreeList.begin();

337

it != m_FreeList.end(); ++it) {

339

all_connections.push_back(http_connection);

341

m_ConnListLock.unlock();

343  for

(

auto conn

: all_connections) {

344  conn

->CleanupTimers();

347

uv_async_send(&m_internal->m_async_stop);

357  "Worker has already been started"

);

364

m_protocol.BeforeStart();

365

err_code =

uv_import

(m_internal->m_loop.Handle(),

366  reinterpret_cast<

uv_stream_t*

>

(&m_internal->m_listener),

371  "uv_import failed"

, err_code);

373

m_internal->m_listener.data =

this

;

374

err_code = uv_listen(

reinterpret_cast<

uv_stream_t*

>

(&m_internal->m_listener),

375

m_Daemon->m_Backlog, s_OnTcpConnection);

378  "uv_listen failed"

, err_code);

379

m_internal->m_listener.data =

this

;

381

err_code = uv_async_init(m_internal->m_loop.Handle(),

382

&m_internal->m_async_stop, s_OnAsyncStop);

385  "uv_async_init failed"

, err_code);

386

m_internal->m_async_stop.data =

this

;

388

err_code = uv_async_init(m_internal->m_loop.Handle(),

389

&m_internal->m_async_work, s_OnAsyncWork);

392  "uv_async_init failed"

, err_code);

393

m_internal->m_async_work.data =

this

;

395

err_code = uv_timer_init(m_internal->m_loop.Handle(),

396

&m_internal->m_timer);

399  "uv_timer_init failed"

, err_code);

400

m_internal->m_timer.data =

this

;

402

uv_timer_start(&m_internal->m_timer, s_OnTimer, 1000, 1000);

405

m_protocol.ThreadStart(m_internal->m_loop.Handle(),

this

);

410

err_code = uv_run(m_internal->m_loop.Handle(), UV_RUN_DEFAULT);

414  PSG_INFO

(

"uv_run (1) worker "

<< m_id <<

415  " returned "

<< err_code);

417  PSG_ERROR

(

"Libuv exception while preparing/running worker "

<< m_id <<

419  " Error: "

<< exc.

GetMsg

());

421  PSG_ERROR

(

"NCBI exception while preparing/running worker "

<< m_id <<

423  " Error: "

<< exc.

GetMsg

());

424

}

catch

(

const

exception & exc) {

425  PSG_ERROR

(

"Standard exception while preparing/running worker "

<< m_id <<

426  " Error: "

<< exc.

what

());

428  PSG_ERROR

(

"Unknown exception while preparing/running worker "

<< m_id);

431

m_shuttingdown =

true

;

432  PSG_INFO

(

"worker "

<< m_id <<

" is closing"

);

437  if

(m_internal->m_listener.type != 0) {

438

uv_close(

reinterpret_cast<

uv_handle_t*

>

(&m_internal->m_listener),

444  while

(!m_ConnectedList.empty()) {

445

uv_run(m_internal->m_loop.Handle(), UV_RUN_NOWAIT);

448  if

(m_internal->m_async_stop.type != 0) {

449

uv_close(

reinterpret_cast<

uv_handle_t*

>

(&m_internal->m_async_stop),

452  if

(m_internal->m_async_work.type != 0) {

453

uv_close(

reinterpret_cast<

uv_handle_t*

>

(&m_internal->m_async_work),

456  if

(m_internal->m_timer.type != 0) {

457

uv_close(

reinterpret_cast<

uv_handle_t*

>

(&m_internal->m_timer),

461

m_protocol.ThreadStop();

463

err_code = uv_run(m_internal->m_loop.Handle(), UV_RUN_DEFAULT);

466  ", uv_run (2) returned "

<< err_code <<

467  ", st: "

<< m_started.load());

470

err_code = m_internal->m_loop.Close();

473  ", uv_loop_close returned "

<< err_code <<

474  ", st: "

<< m_started.load());

477

uv_walk(m_internal->m_loop.Handle(), s_LoopWalk,

this

);

479

m_internal.reset(

nullptr

);

480

}

catch

(

const

exception & exc) {

481  PSG_ERROR

(

"Exception while shutting down worker "

<< m_id <<

484  PSG_ERROR

(

"Unexpected exception while shutting down worker "

<<

494  if

(!m_close_all_issued) {

495

m_close_all_issued =

true

;

498

vector<CHttpConnection *> http_conns;

500

m_ConnListLock.lock();

501  for

(

auto

it = m_ConnectedList.begin(); it != m_ConnectedList.end(); ++it) {

506

http_conns.push_back(http_conn);

508

m_ConnListLock.unlock();

528  for

(

auto

http_conn : http_conns) {

537

uv_tcp_t *tcp =

reinterpret_cast<

uv_tcp_t*

>

(handle);

538

std::lock_guard<std::mutex> lock(m_ConnListLock);

540  for

(

auto

it = m_ConnectedList.begin(); it != m_ConnectedList.end(); ++it) {

541  if

(tcp == &std::get<0>(*it)) {

547

m_Daemon->DecrementAboveSoftLimitConnCount();

549

m_Daemon->DecrementBelowSoftLimitConnCount();

552

m_protocol.OnClientClosedConnection(

reinterpret_cast<

uv_stream_t*

>

(handle),

569

m_FreeList.splice(m_FreeList.begin(), m_ConnectedList, it);

580

uv_async_send(&m_internal->m_async_work);

584

std::list<std::tuple<uv_tcp_t, CHttpConnection>> &

587  return

m_ConnectedList;

591

std::vector<SConnectionRunTimeProperties>

594

std::vector<SConnectionRunTimeProperties> conn_props;

595

conn_props.reserve(m_ConnectedList.size());

597

std::lock_guard<std::mutex> lock(m_ConnListLock);

598  for

(

auto

& it: m_ConnectedList) {

599

conn_props.emplace_back(std::get<1>(it).GetProperties());

607

std::lock_guard<std::mutex> lock(m_ConnListLock);

608  for

(

auto

& it: m_ConnectedList) {

609  if

(std::get<1>(it).GetConnectionId() == conn_id) {

610

std::get<1>(it).CloseThrottledConnection(

624

std::lock_guard<std::mutex> lock(m_ConnListLock);

625  for

(

auto

& it: m_ConnectedList) {

661  int64_t

timespan = chrono::duration_cast<chrono::milliseconds>

663  if

(timespan <=

static_cast<int64_t>

(idle_timeout_ms)) {

667  int64_t

timespan = chrono::duration_cast<chrono::milliseconds>

669  if

(timespan <=

static_cast<int64_t>

(idle_timeout_ms)) {

684

m_id, props.

m_Id

));

694

m_protocol.OnAsyncWork(m_shuttingdown || m_shutdown);

700  PSG_INFO

(

"Worker async work requested"

);

710

m_protocol.OnTimer();

725  PSG_INFO

(

"Worker async stop requested"

);

726

uv_stop(handle->loop);

732  if

(listener && status == 0) {

753  " ("

<< handle->type <<

754  ") @ worker "

<< (worker ? worker->

m_id

: -1) <<

755  " ("

<< worker <<

")"

);

763  if

(m_FreeList.empty()) {

767

m_FreeList.emplace_back(

768

tuple<uv_tcp_t, CHttpConnection>(uv_tcp_t{0},

771

conn_force_close_wait_ms)));

772  auto

new_item = m_FreeList.rbegin();

774

http_connection->

SetupTimers

(m_internal->m_loop.Handle());

777  auto

it = m_FreeList.begin();

778

uv_tcp_t * tcp = & get<0>(*it);

779  int

err_code = uv_tcp_init(m_internal->m_loop.Handle(), tcp);

784  PSG_ERROR

(

"TCP connection accept failed; uv_tcp_init() error code: "

<< err_code);

793

uv_tcp_nodelay(tcp, 1);

794

uv_tcp_keepalive(tcp, 1, 120);

797

m_ConnListLock.lock();

798

m_ConnectedList.splice(m_ConnectedList.begin(), m_FreeList, it);

799

m_ConnListLock.unlock();

802  int64_t

num_connections = m_Daemon->NumOfConnections();

804

err_code = uv_accept(listener,

reinterpret_cast<

uv_stream_t*

>

(tcp));

812

m_Daemon->IncrementAboveSoftLimitConnCount();

818  PSG_ERROR

(

"TCP connection accept failed; uv_accept() error code: "

+

819

to_string(err_code));

826

uv_close(

reinterpret_cast<

uv_handle_t*

>

(tcp), s_OnClientClosed);

831

in_port_t peer_port = 0;

834  if

(m_shuttingdown) {

843

m_Daemon->IncrementAboveSoftLimitConnCount();

844

http_conn->

PrepareForUsage

(

nullptr

, num_connections, peer_ip,

true

);

846

uv_close(

reinterpret_cast<

uv_handle_t*

>

(tcp), s_OnClientClosed);

854  int64_t

conn_hard_limit = m_Daemon->GetMaxConnectionsHardLimit();

856  if

(num_connections >= conn_hard_limit) {

864

m_Daemon->IncrementAboveSoftLimitConnCount();

865

http_conn->

PrepareForUsage

(

nullptr

, num_connections, peer_ip,

true

);

870  string

err_msg =

"TCP Connection accept failed; " 871  "too many connections (maximum: "

+

872

to_string(conn_hard_limit) +

")"

;

881

uv_close(

reinterpret_cast<

uv_handle_t*

>

(tcp), s_OnClientClosed);

885  int64_t

conn_alert_limit = m_Daemon->GetMaxConnectionsAlertLimit();

886  int64_t

conn_soft_limit = m_Daemon->GetMaxConnectionsSoftLimit();

887  int64_t

num_below_soft_limit_connections = m_Daemon->GetBelowSoftLimitConnCount();

889  bool

exceed_soft_limit = (num_below_soft_limit_connections >= conn_soft_limit);

891  if

(exceed_soft_limit) {

895

m_Daemon->IncrementAboveSoftLimitConnCount();

899

m_Daemon->IncrementBelowSoftLimitConnCount();

902  bool

exceed_alert_limit = (num_connections >= conn_alert_limit);

903  if

(exceed_alert_limit && ! exceed_soft_limit) {

908  string

warn_msg =

"Number of client connections ("

+

909

to_string(num_connections) +

910  ") is getting too high, " 911  "exceeds the alert threshold ("

+

912

to_string(conn_alert_limit) +

")"

;

922

m_protocol.OnNewConnection(

reinterpret_cast<

uv_stream_t*

>

(tcp),

923

http_conn, s_OnClientClosed);

931  PSG_MESSAGE

(

"SIGINT received. Immediate shutdown performed."

);

944  auto

now = psg_clock_t::now();

945  auto

expiration = now + chrono::hours(24);

949  PSG_MESSAGE

(

"SIGTERM received. The previous shutdown " 950  "expiration is shorter than this one. Ignored."

);

955  PSG_MESSAGE

(

"SIGTERM received. Graceful shutdown is initiated"

);

973

*http_proto =

nullptr

;

987  if

(m_Address.empty())

989  "Failed to start daemon: address is empty"

);

992  "Failed to start daemon: port is not specified"

);

994

signal(SIGPIPE, SIG_IGN);

999  "uv_key_create failed"

, rc);

1013

sigint.

Start

(SIGINT, s_OnMainSigInt);

1016

sigterm.

Start

(SIGTERM, s_OnMainSigTerm);

1019

sighup.

Start

(SIGHUP, s_OnMainSigHup);

1022

sigusr1.

Start

(SIGUSR1, s_OnMainSigUsr1);

1025

sigusr2.

Start

(SIGUSR2, s_OnMainSigUsr2);

1028

sigwinch.

Start

(SIGWINCH, s_OnMainSigWinch);

1036  reinterpret_cast<

uv_stream_t*

>

(listener.

Handle

()),

1037

IPC_PIPE_NAME, m_NumWorkers, &exp);

1040  "uv_export_start failed"

, rc);

1043

workers.

Start

(exp, m_NumWorkers, http_daemon, OnWatchDog);

1044

}

catch

(

const

exception & exc) {

1052  "uv_export_wait failed"

, rc);

1054

listener.

Close

(

nullptr

);

1058

uv_timer_t watch_dog;

1059

uv_timer_init(loop.

Handle

(), &watch_dog);

1060

watch_dog.data = &workers;

1061

uv_timer_start(&watch_dog, workers.

s_OnWatchDog

, 1000, 1000);

1065

uv_run(loop.

Handle

(), UV_RUN_DEFAULT);

1067

uv_close(

reinterpret_cast<

uv_handle_t*

>

(&watch_dog),

NULL

);

size_t GetActiveProcGroupCounter(void)

@ ePSGS_TcpConnHardLimitExceeded

@ ePSGS_TcpConnAlertLimitExceeded

void PrepareForUsage(uv_tcp_t *uv_tcp_stream, int64_t conn_cnt_at_open, const string &peer_ip, bool exceed_soft_limit_flag)

void SetupTimers(uv_loop_t *tcp_worker_loop)

int64_t GetConnectionId(void) const

bool GetExceedSoftLimitFlag(void) const

static void DaemonStopped()

static void DaemonStarted()

void Register(EPSGS_AlertType alert_type, const string &message)

void IncrementRequestStopCounter(int status)

@ ePSGS_NumConnHardLimitExceeded

@ ePSGS_NumConnAlertLimitExceeded

@ ePSGS_NumConnSoftLimitExceeded

@ ePSGS_IncomingConnectionsCounter

void Increment(IPSGS_Processor *processor, EPSGS_CounterType counter)

void UnregisterUVLoop(uv_thread_t uv_thread)

void RegisterDaemonUVLoop(uv_thread_t uv_thread, uv_loop_t *uv_loop)

CPSGSCounters & GetCounters(void)

void CancelAllProcessors(void)

void RegisterUVLoop(uv_thread_t uv_thread, uv_loop_t *uv_loop)

CExcludeBlobCache * GetExcludeBlobCache(void)

static CPubseqGatewayApp * GetInstance(void)

CPSGAlerts & GetAlerts(void)

const SPubseqGatewaySettings & Settings(void) const

int GetUVLibraryErrorCode(void) const

static void s_OnMainSigTerm(uv_signal_t *, int)

bool OnRequest(CHttpProto **http_proto)

static void s_OnMainSigInt(uv_signal_t *, int)

static constexpr const char IPC_PIPE_NAME[]

void StopDaemonLoop(void)

void Run(CHttpDaemon &http_daemon, std::function< void(CTcpDaemon &daemon)> OnWatchDog=nullptr)

void Start(struct uv_export_t *exp, unsigned short nworkers, CHttpDaemon &http_daemon, std::function< void(CTcpDaemon &daemon)> OnWatchDog=nullptr)

static void s_WorkerExecute(void *_worker)

bool CloseThrottledConnection(unsigned int worker_id, int64_t conn_id)

void PopulateThrottlingData(SThrottlingData &throttling_data)

bool AnyWorkerIsRunning(void)

static uv_key_t s_thread_worker_key

string GetConnectionsStatus(int64_t self_connection_id)

static void s_OnWatchDog(uv_timer_t *handle)

void Start(int signum, uv_signal_cb cb)

void Bind(const char *addr, unsigned int port)

void Close(void(*close_cb)(uv_handle_t *handle))

const_iterator end() const

const_iterator find(const key_type &key) const

static CRef< CSerialObject > CloseAll(ParserPtr pp)

static CS_CONNECTION * conn

@ e503_ServiceUnavailable

TErrCode GetErrCode(void) const

Get error code.

#define NCBI_THROW(exception_class, err_code, message)

Generic macro to throw an exception, given the exception class, error code and message string.

const string & GetMsg(void) const

Get message string.

#define NCBI_THROW2(exception_class, err_code, message, extra)

Throw exception with extra parameter.

virtual const char * what(void) const noexcept

Standard report (includes full backlog).

unsigned short GetPort() const

Get the listening port number back.

unsigned short m_Port

TCP port to listen on.

string GetIPAddress(struct sockaddr *sock_addr)

chrono::system_clock::time_point time_point

string ToJsonString(const CBioseqInfoRecord &bioseq_info, SPSGS_ResolveRequest::TPSGS_BioseqIncludeData include_data_flags, const string &custom_blob_id)

#define PSG_ERROR(message)

#define PSG_INFO(message)

#define PSG_MESSAGE(message)

#define PSG_WARNING(message)

#define PSG_TRACE(message)

string GetSiteFromIP(const string &ip_address)

void DismissErrorRequestContext(CRef< CRequestContext > &context, int status, size_t bytes_sent)

CRef< CRequestContext > CreateErrorRequestContext(const string &client_ip, in_port_t client_port, int64_t connection_id)

std::atomic_bool m_shutdown

static void s_OnTcpConnection(uv_stream_t *listener, const int status)

std::vector< SConnectionRunTimeProperties > GetConnProps(void)

void OnClientClosed(uv_handle_t *handle)

std::list< std::tuple< uv_tcp_t, CHttpConnection > > & GetConnList(void)

static void s_OnAsyncStop(uv_async_t *handle)

static void s_LoopWalk(uv_handle_t *handle, void *arg)

static void s_OnClientClosed(uv_handle_t *handle)

static void s_OnTimer(uv_timer_t *handle)

void OnTcpConnection(uv_stream_t *listener)

static void s_OnAsyncWork(uv_async_t *handle)

bool CloseThrottledConnection(int64_t conn_id)

void PopulateThrottlingData(SThrottlingData &throttling_data, const system_clock::time_point &now, uint64_t idle_timeout_ms)

optional< string > m_PeerUserAgent

optional< string > m_PeerId

system_clock::time_point m_OpenTimestamp

optional< system_clock::time_point > m_LastRequestTimestamp

chrono::system_clock::time_point m_LastActivityTimestamp

size_t m_ConnThrottleByProcess

size_t m_ConnThrottleByUserAgent

double m_ConnForceCloseWaitSec

size_t m_ConnThrottleByHost

double m_ConnThrottleCloseIdleSec

size_t m_ConnThrottleBySite

atomic_bool m_ShutdownRequested

psg_time_point_t m_Expired

list< string > m_PeerIPOverLimit

list< string > m_UserAgentOverLimit

map< string, size_t > m_PeerIPCounts

map< string, size_t > m_PeerSiteCounts

list< string > m_PeerIDOverLimit

map< string, size_t > m_PeerIDCounts

list< SPSGS_IdleConnectionProps > m_IdleConnProps

map< string, size_t > m_UserAgentCounts

list< string > m_PeerSiteOverLimit

bool IdleTimestampPredicate(const SPSGS_IdleConnectionProps &lhs, const SPSGS_IdleConnectionProps &rhs)

void GetPeerIpAndPort(uv_tcp_t *tcp, string &client_ip, in_port_t &client_port)

SShutdownData g_ShutdownData

void UnregisterUVLoop(uv_thread_t uv_thread)

CRef< CRequestContext > CreateErrorRequestContextHelper(uv_tcp_t *tcp, int64_t connection_id)

void CancelAllProcessors(void)

void RegisterDaemonUVLoop(uv_thread_t uv_thread, uv_loop_t *uv_loop)

void RegisterUVLoop(uv_thread_t uv_thread, uv_loop_t *uv_loop)

void CollectGarbage(void)

static CS_CONTEXT * context


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4