&CSrvSocketTask::m_SockListHook> TSockListHookOpt;
91intr::constant_time_size<false> >
TSockList;
92 #elif NC_SOCKLIST_USE_TYPE == NC_SOCKLIST_USE_BASE_HOOK 94intr::base_hook<TSrvSockListHook>,
96 #elif NC_SOCKLIST_USE_TYPE == NC_SOCKLIST_USE_STD_LIST 97 typedefstd::list<CSrvSocketTask*>
TSockList;
198 stringis(
"\": "), eol(
",\n\"");
220 const char* err_msg,
222 const char* errno_str,
229<< err_msg <<
", errno="<< x_errno <<
" ("<< errno_str <<
")";
236 const char* err_msg,
245 #define LOG_WITH_ERRNO(sev, msg, x_errno) \ 246 s_LogWithErrno(CSrvDiagMsg::sev, msg, x_errno, \ 247 __FILE__, __LINE__, NCBI_CURRENT_FUNCTION) \ 254 const char* err_msg,
263 #define LOG_WITH_AIERR(sev, msg, x_aierr) \ 264 s_LogWithAIErr(CSrvDiagMsg::sev, msg, x_aierr, \ 265 __FILE__, __LINE__, NCBI_CURRENT_FUNCTION) \ 274 intres = fcntl(sock, F_SETFL, O_NONBLOCK);
288 intres = setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &
value,
sizeof(
value));
293res = setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &
value,
sizeof(
value));
307 intres = setsockopt(sock, IPPROTO_TCP, TCP_QUICKACK, &
value,
sizeof(
value));
318 intsock = socket(AF_INET, SOCK_STREAM, 0);
329 intres = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &
value,
sizeof(
value));
333 structsockaddr_in
addr;
334memset(&
addr, 0,
sizeof(
addr));
335 addr.sin_family = AF_INET;
337 addr.sin_port = htons(sock_info.
port);
338res = bind(sock, (
structsockaddr*)&
addr,
sizeof(
addr));
340 stringerr_msg(
"Cannot bind socket to port ");
346res = listen(sock, 128);
353 structepoll_event evt;
354evt.events =
EPOLLIN| EPOLLET;
355evt.data.ptr = (
void*)&sock_info;
356res = epoll_ctl(
s_EpollFD, EPOLL_CTL_ADD, sock, &evt);
363sock_info.
fd= sock;
414 intres = getsockopt(fd, SOL_SOCKET, SO_ERROR, &x_errno, &
x_size);
422 #define LOG_SOCK_ERROR(sev, fd, prefix) \ 423 s_LogSocketError(CSrvDiagMsg::sev, fd, prefix, __FILE__, __LINE__, NCBI_CURRENT_FUNCTION) 435res = setsockopt(fd, SOL_SOCKET, SO_LINGER, (
void*)&
lgr,
sizeof(
lgr));
441res = setsockopt(fd, IPPROTO_TCP, TCP_LINGER2, (
void*)&
val,
sizeof(
val));
449 while(res && (x_errno = errno) == EINTR);
468--
thr->socks->sock_cnt;
477snprintf(
buf, 20,
"%u.%u.%u.%u", hb[0], hb[1], hb[2], hb[3]);
487 structsockaddr_in
addr;
488memset(&
addr, 0,
sizeof(
addr));
489 addr.sin_family = AF_INET;
490 addr.sin_addr.s_addr =
ip;
492 buf,
sizeof(
buf),
NULL, 0, NI_NAMEREQD | NI_NOFQDN);
494LOG_WITH_AIERR(
Critical,
"Error from getnameinfo", x_errno);
507 if(gethostname(
buf,
sizeof(
buf)))
520 ip= inet_addr(host.c_str());
523memset(&in_addr, 0,
sizeof(in_addr));
524in_addr.ai_family = AF_INET;
528LOG_WITH_AIERR(
Critical,
"Error from getaddrinfo", x_errno);
532 ip= ((
structsockaddr_in*)out_addr->ai_addr)->sin_addr.s_addr;
577 structsockaddr_in
addr;
579 intnew_sock = accept(sock_info.
fd, (
structsockaddr*)&
addr, &
len);
581cmd_len -= cmd_start;
585 SRV_LOG(
Warning,
"socket accept takes: "<< len_usec <<
"us");
587 if(new_sock == -1) {
589 if(x_errno != EAGAIN && x_errno != EWOULDBLOCK) {
599<<
". Rejecting new connection.");
609task->
m_Fd= new_sock;
627 Uint4wait_msec = wait_time.
NSec() / 1000000;
634 if(x_errno != EINTR)
637 for(
int i= 0;
i< res; ++
i) {
638 structepoll_event& evt = events[
i];
691 #if NC_SOCKLIST_USE_TYPE == NC_SOCKLIST_USE_STD_LIST 704 #if defined(NCBI_COMPILER_GCC) || defined(NCBI_COMPILER_ANY_CLANG) 712memset(old_socks, 0,
sizeof(old_socks));
713memset(old_active, 0,
sizeof(old_active));
720 #if NC_SOCKLIST_USE_TYPE == NC_SOCKLIST_USE_STD_LIST 726 if(active >= limit_time)
731 Uint1low = 0, high = cnt_old;
733 Uint1mid = (high + low) / 2;
734 if(old_active[mid] > active)
744 memmove(&old_socks[low + 1], &old_socks[low],
745(cnt_old - low) *
sizeof(old_socks[0]));
746 memmove(&old_active[low + 1], &old_active[low],
747(cnt_old - low) *
sizeof(old_active[0]));
749old_socks[low] = task;
750old_active[low] = active;
754 for(
Uint1 i= 0;
i< cnt_old; ++
i) {
788 #if NC_SOCKLIST_USE_TYPE == NC_SOCKLIST_USE_STD_LIST 810 #if NC_SOCKLIST_USE_TYPE == NC_SOCKLIST_USE_STD_LIST 831 #if NC_SOCKLIST_USE_TYPE == NC_SOCKLIST_USE_STD_LIST 832(*it)->SetRunnable();
859 if(x_errno == EINTR)
864 if(x_errno == EWOULDBLOCK) {
867 if(x_errno == EAGAIN) {
881 returnsize_t(n_read);
897 if(n_written == -1) {
899 if(x_errno == EINTR)
901 if(x_errno == EAGAIN || x_errno == EWOULDBLOCK) {
915 returnsize_t(n_written);
937 if(c ==
'\n'|| c ==
'\0') {
999 gotofinish_with_error;
1001 while(
size!= 0) {
1005 if(to_write >
size)
1014 gotofinish_with_error;
1015 if(n_done < to_write) {
1029 gotofinish_with_error;
1037 if(to_read >
size)
1048 gotofinish_with_error;
1051 if(n_done > to_read)
1063 gotofinish_with_error;
1078 gotofinish_with_error;
1104 gotounlock_and_exit;
1109sock_info->
index= idx;
1110sock_info->
port= port;
1111sock_info->
factory= factory;
1125 #if __NC_TASKS_MONITOR 1126m_TaskName =
"CSrvListener";
1138 for(
Uint1 i= 0;
i< cnt_listen; ++
i) {
1146 if(sock_info.
fd!= -1)
1155: m_ProxySrc(
NULL),
1165m_ProxyHadError(
false),
1166m_SockHasRead(
false),
1167m_SockCanWrite(
false),
1168m_SockCanReadMore(
true),
1169m_NeedToClose(
false),
1170m_NeedToFlush(
false),
1175m_RegReadHup(
false),
1176m_RegError(
false),
1177m_ErrorPrinted(
false)
1182 #if __NC_TASKS_MONITOR 1183m_TaskName =
"CSrvSocketTask";
1227 if(c ==
'\n'|| c ==
'\r'|| c ==
'\0')
1240 if(
m_RdBuf[crlf_pos] ==
'\r') {
1256 buf= (
char*)
buf+ copied;
1276 if(has_size == 0) {
1304 buf= (
const char*)
buf+ to_copy;
1307 returnto_copy + n_written;
1433 #ifdef NCBI_OS_LINUX 1434 intsock = socket(AF_INET, SOCK_STREAM, 0);
1440 gotoclose_and_error;
1442 structsockaddr_in
addr;
1443memset(&
addr, 0,
sizeof(
addr));
1444 addr.sin_family = AF_INET;
1445 addr.sin_addr.s_addr = host;
1446 addr.sin_port = htons(port);
1449res = connect(sock, (
structsockaddr*)&
addr,
sizeof(
addr));
1451 intx_errno = errno;
1452 if(x_errno == EINTR)
1454 if(x_errno != EINPROGRESS) {
1456 gotoclose_and_error;
1480 #ifdef NCBI_OS_LINUX 1481 structepoll_event evt;
1484 intres = epoll_ctl(
s_EpollFD, EPOLL_CTL_ADD,
m_Fd, &evt);
1498 #ifdef NCBI_OS_LINUX 1499 structsockaddr_in
addr;
1502 if(getsockname(
m_Fd, (
structsockaddr*)&
addr, &
len) == 0)
1503 returnntohs(
addr.sin_port);
void MarkTaskTerminated(CSrvTask *task, bool immediate)
Mutex created to have minimum possible size (its size is 4 bytes) and to sleep using kernel capabilit...
void Unlock(void)
Unlock the mutex.
void Lock(void)
Lock the mutex.
Class used in all diagnostic logging.
const CSrvDiagMsg & StartSrvLog(ESeverity sev, const char *file, int line, const char *func) const
Starts log message which will include severity, filename, line number and function name.
CSrvDiagMsg & StartRequest(void)
Starts "request-start" message.
CSrvDiagMsg & PrintParam(CTempString name, CTempString value)
Adds parameter to "request-start" or "extra" message.
void StopRequest(void)
Prints "request-stop" message.
static bool IsSeverityVisible(ESeverity sev)
Checks if given severity level is visible, i.e.
ESeverity
Severity levels for logging.
virtual ~CSrvListener(void)
Uint4 m_SeenErrors[kMaxCntListeningSocks]
Per-listening-socket numbers copied from s_ListenErrors when errors are processed.
virtual void ExecuteSlice(TSrvThreadNum thread_idx)
This is the main method to do all work this task should do.
Uint4 m_SeenEvents[kMaxCntListeningSocks]
Per-listening-socket numbers copied from s_ListenEvents when events are processed.
Factory that creates CSrvSocketTask-derived object for each connection coming to listening port which...
virtual CSrvSocketTask * CreateSocketTask(void)=0
virtual ~CSrvSocketFactory(void)
Task controlling a socket.
Uint2 m_WrPos
Position of current writing pointer in the write buffer.
Uint8 m_ConnStartJfy
Jiffy number when Connect() method was called.
bool Connect(Uint4 host, Uint2 port)
Create new socket and connect it to given IP and port.
void x_CloseSocket(bool do_abort)
Close or abort the socket â they have little difference, thus they joined in one method.
bool m_RegReadHup
Flag showing if epoll returned RDHUP on this socket.
CSrvSocketTask * m_ProxySrc
Source task for proxying.
Uint1 m_SeenWriteEvts
Number of last write event seen by Write() when it wrote to socket.
bool HasError(void)
Checks if socket has some error in it.
bool m_CRMet
Flag showing if '\r' symbol was seen at the end of last line but ' ' wasn't seen yet.
Uint8 m_ReadBytes
Total number of bytes read from socket.
int m_Fd
File descriptor for the socket.
size_t Write(const void *buf, size_t size)
Write into the socket as much as immediately possible (including writing into internal write buffers ...
Uint2 m_PeerPort
Remembered peer port.
CSrvSocketTask & WriteText(CTempString message)
Write text into socket.
CSrvSocketTask & WriteNumber(NumType num)
Write number into socket as string, i.e.
void GetPeerAddress(string &host, Uint2 &port)
Get peer IP and port for this socket.
void CloseSocket(void)
Close the socket gracefully, i.e.
Uint2 m_WrMemSize
Size of memory allocated for write buffer.
Uint8 m_ProxySize
Amount left to proxy if proxying operation is in progress.
bool m_NeedToFlush
Flag showing that task needs to flush all write buffers.
virtual void InternalRunSlice(TSrvThreadNum thr_num)
Internal function to execute time slice work.
virtual void Terminate(void)
Terminate the task.
bool m_NeedToClose
Flag showing that socket needs to be closed because of long inactivity.
char * m_RdBuf
Read buffer.
Uint4 m_PeerAddr
Remembered peer IP address.
bool m_ErrorPrinted
Flag showing if pending error in socket was printed in logs.
bool m_ProxyHadError
Flag showing that last proxying operation finished with error.
bool m_SockCanReadMore
Flag showing that socket can have more reads, i.e. there was no EOF yet.
bool m_FlushIsDone
Flag showing that write buffers were flushed.
bool m_RegError
Flag showing if there's error pending on the socket.
CSrvSocketTask * m_ProxyDst
Destination task for proxying.
bool ReadToBuf(void)
Read from socket into internal buffer.
Uint2 m_RdPos
Position of current reading in the read buffer, i.e.
void Flush(void)
Flush all data saved in internal write buffers to socket.
size_t Read(void *buf, size_t size)
Read from socket into memory.
char * m_WrBuf
Write buffer.
virtual ~CSrvSocketTask(void)
Uint2 GetLocalPort(void)
Get local port this socket was created on.
void WriteData(const void *buf, size_t size)
Write the exact amount of data into the socket.
Uint1 m_SeenReadEvts
Number of last read event seen by Read() when it read from socket.
bool IsWriteDataPending(void)
Checks if there's some data pending in write buffers and waiting to be sent to kernel.
Uint2 m_WrSize
Size of data in the write buffer waiting for writing.
Uint8 m_WrittenBytes
Total number of bytes written to socket.
Uint2 m_RdSize
Size of data available for reading in the read buffer.
bool m_SockCanWrite
Flag showing that socket is writable.
bool StartProcessing(TSrvThreadNum thread_num=0, bool boost=false)
Start processing of the socket and include it into TaskServer's central epoll.
bool ReadLine(CTempString *line)
Read from socket one line which ends with ' ', '\r ' or '\0'.
bool NeedEarlyClose(void)
Checks if socket should be closed because of internal reasons (long inactivity or "hard" shutdown as ...
void x_PrintError(void)
Prints socket's error if there's any error pending on the socket.
void StartProxyTo(CSrvSocketTask *dst_task, Uint8 proxy_size)
Start proxying of raw data from this socket to the one in dst_task.
Uint1 m_RegWriteEvts
Counter of "writable" events received from epoll.
bool m_SockHasRead
Flag showing that socket is readable.
Uint1 m_RegReadEvts
Counter of "readable" events received from epoll.
void SockOpenActive(void)
void SockClose(int status, Uint8 open_time)
void SockOpenPassive(void)
Main working entity in TaskServer.
TSrvTaskFlags m_TaskFlags
Bit-OR of flags for this task.
int m_LastActive
Time (in seconds) when the task was active last time, i.e.
CRequestContext * GetDiagCtx(void)
Get current diagnostic context for the task.
TSrvThreadNum m_LastThread
Thread number where this task was executed last time.
virtual void ExecuteSlice(TSrvThreadNum thr_num)=0
This is the main method to do all work this task should do.
void SetRunnable(bool boost=false)
Set this task "runnable", i.e.
void ReleaseDiagCtx(void)
Releases current diagnostic context of the task.
void CreateNewDiagCtx(void)
Create new diagnostic context for this task to work in.
Class incorporating convenient methods to work with struct timespec.
static int CurSecs(void)
Current time in seconds since epoch (time_t).
static CSrvTime Current(void)
Exact current time with precision up to nanoseconds.
Uint8 AsUSec(void) const
Converts object's value to microseconds since epoch.
long & NSec(void)
Read/set number of nanoseconds stored in the object.
static const string & GetHostName(void)
Returns name of server this application is executing on.
static string GetHostByIP(Uint4 ip)
Converts 4-byte encoded IP address into server name.
static bool AddListeningPort(Uint2 port, CSrvSocketFactory *factory)
Adds port for TaskServer to listen to.
static bool IsRunning(void)
Checks if TaskServer is running now, i.e.
static string IPToString(Uint4 ip)
Converts 4-byte encoded IP address into its string representation.
static bool IsInShutdown(void)
Checks if TaskServer received request to shutdown.
static Uint4 GetIPByHost(const string &host)
Converts server name (or IP address written as string) to encoded 4-byte IP address.
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
#define getnameinfo(a, b, c, d, e, f, g)
#define getaddrinfo(n, s, h, r)
#define ERASE_ITERATE(Type, Var, Cont)
Non-constant version with ability to erase current element, if container permits.
#define NON_CONST_ITERATE(Type, Var, Cont)
Non constant version of ITERATE macro.
void SetClientIP(const string &client)
TCount GetRequestID(void) const
Get request ID (or zero if not set).
void Critical(CExceptionArgs_Base &args)
void Error(CExceptionArgs_Base &args)
void Warning(CExceptionArgs_Base &args)
uint8_t Uint1
1-byte (8-bit) unsigned integer
int16_t Int2
2-byte (16-bit) signed integer
uint32_t Uint4
4-byte (32-bit) unsigned integer
uint16_t Uint2
2-byte (16-bit) unsigned integer
uint64_t Uint8
8-byte (64-bit) unsigned integer
virtual int GetInt(const string §ion, const string &name, int default_value, TFlags flags=0, EErrAction err_action=eThrow) const
Get integer value of specified parameter name.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
CTempString & assign(const char *src_str, size_type len)
Assign new values to the content of the a string.
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
static string UInt8ToString(Uint8 value, TNumToStringFlags flags=0, int base=10)
Convert UInt8 to string.
if(yy_accept[yy_current_state])
constexpr bool empty(list< Ts... >) noexcept
const struct ncbi::grid::netcache::search::fields::SIZE size
const GenericPointer< typename T::ValueType > T2 value
static size_t x_size(const char *dst, size_t len, const char *ptr)
Process information in the NCBI Registry, including working with configuration files.
Defines CRequestContext class for NCBI C++ diagnostic API.
static SLJIT_INLINE sljit_ins lgr(sljit_gpr dst, sljit_gpr src)
static void s_FlushData(CSrvSocketTask *task)
void CheckConnectsTimeout(SSocketsData *socks)
intr::list< CSrvSocketTask, intr::base_hook< TSrvSockListHook >, intr::constant_time_size< false > > TSockList
static void s_CloseSocket(int fd, bool do_abort)
bool ReConfig_Sockets(const CTempString §ion, const CNcbiRegistry &new_reg, string &)
void ReleaseThreadSocks(SSrvThread *thr)
void RequestStopListening(void)
bool InitSocketsMan(void)
static void s_CopyData(CSrvSocketTask *task, const void *buf, Uint2 size)
static void s_ProcessListenError(Uint1 sock_idx)
static size_t s_ReadFromSocket(CSrvSocketTask *task, void *buf, size_t size)
bool StartSocketsMan(void)
static bool s_StartListening(void)
static void s_LogSocketError(CSrvDiagMsg::ESeverity severity, int fd, const char *prefix, const char *file, int line, const char *func)
static void s_CompactWrBuffer(CSrvSocketTask *task)
static const Uint1 kEpollEventsArraySize
static void s_CreateDiagRequest(CSrvSocketTask *task, Uint2 port, Uint4 phost, Uint2 pport)
void s_DeleteOldestSockets(TSockList &lst)
static void s_DoDataProxy(CSrvSocketTask *src)
#define LOG_WITH_ERRNO(sev, msg, x_errno)
static size_t s_WriteNoPending(CSrvSocketTask *task, const void *buf, size_t size)
static void s_RegisterClientEvent(CSrvSocketTask *task, Uint4 event)
void MoveAllSockets(SSocketsData *dst_socks, SSocketsData *src_socks)
void ConfigureSockets(const CNcbiRegistry *reg, CTempString section)
void AssignThreadSocks(SSrvThread *thr)
static const Uint2 kSockMinWriteSize
#define LOG_SOCK_ERROR(sev, fd, prefix)
static Uint4 s_ListenEvents[kMaxCntListeningSocks]
static Uint1 s_CntListeningSocks
static const Uint2 kSockReadBufSize
1000 below is chosen to be a little bit less than maximum packet size in Ethernet (~1500 bytes).
static const Uint2 kSockWriteBufSize
In calculations in the file it's assumed that kSockWriteBufSize is at least twice as large as kSockMi...
void CleanSocketList(SSocketsData *socks)
static Uint1 s_OldSocksDelBatch
static CSrvListener s_Listener
static SListenSockInfo s_ListenSocks[kMaxCntListeningSocks]
static Uint8 s_ConnTimeout
static Uint8 s_AcceptDelay
static Uint2 s_ReadFromBuffer(CSrvSocketTask *task, void *dest, size_t size)
static void s_LogWithErrStr(CSrvDiagMsg::ESeverity severity, const char *err_msg, int x_errno, const char *errno_str, const char *file, int line, const char *func)
static void s_LogWithErrno(CSrvDiagMsg::ESeverity severity, const char *err_msg, int x_errno, const char *file, int line, const char *func)
static void s_SaveSocket(CSrvSocketTask *task)
void WriteSetup_Sockets(CSrvSocketTask &task)
static bool s_SetSocketOptions(int sock)
static void s_SetSocketQuickAck(int sock)
static void s_CleanSockResources(CSrvSocketTask *task)
static void s_CompactBuffer(char *buf, Uint2 &size, Uint2 &pos)
static const Uint1 kMaxCntListeningSocks
16 Uint4s on x86_64 is the size of CPU's cacheline.
static CMiniMutex s_ListenSocksLock
static void s_ReadLF(CSrvSocketTask *task)
void SetAllSocksRunnable(SSocketsData *socks)
void PromoteSockAmount(SSocketsData *socks)
static bool s_CreateListeningSocket(Uint1 idx)
static void s_RegisterListenEvent(SListenSockInfo *sock_info, Uint4 event)
void FinalizeSocketsMan(void)
static void s_ProcessListenEvent(Uint1 sock_idx, TSrvThreadNum thread_num)
static Uint4 s_ListenErrors[kMaxCntListeningSocks]
static size_t s_WriteToSocket(CSrvSocketTask *task, const void *buf, size_t size)
static int s_SocketTimeout
static bool s_SetSocketNonBlock(int sock)
#define SRV_LOG(sev, msg)
Macro to be used for printing log messages.
intr::list_base_hook< intr::tag< SSrvSockList_tag > > TSrvSockListHook
T AtomicAdd(T volatile &var, T add_value)
#define ACCESS_ONCE(x)
Purpose of this macro is to force compiler to access variable exactly at the place it's written (no m...
T AtomicSub(T volatile &var, T sub_value)
constexpr Uint8 kUSecsPerSecond
constexpr Uint8 kUSecsPerMSec
constexpr Uint8 kNSecsPerMSec
CSrvSocketFactory * factory
Factory that will create CSrvSocketTask for each incoming socket.
Uint2 port
Port to listen to.
int fd
File descriptor for the listening socket.
Uint1 index
Index in the s_ListenSocks array.
Per-thread structure containing information about sockets.
Int2 sock_cnt
"Number of sockets" that this thread created/deleted.
TSockList sock_list
List of all open and not yet deleted sockets which were opened in this thread.
For TaskServer's internal use only.
Uint2 TSrvThreadNum
Type for thread number in TaskServer.
CRef< CTestThread > thr[k_NumThreadsMax]
SSrvThread * GetCurThread(void)
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4