seconds > 0.0 ?
static_cast<uint64_t>(seconds * milli::den) : 0;
117 const string&
GetValue(
const string& name)
const 124 template<EValue value>
struct SArg;
130 using TType= pair<SPSG_ArgsBase::EItemType, reference_wrapper<const string>>;
131 staticconstexpr
autoname =
"item_type";
138 using TType= pair<SPSG_ArgsBase::EChunkType, reference_wrapper<const string>>;
139 staticconstexpr
autoname =
"chunk_type";
146 using TType= reference_wrapper<const string>;
147 staticconstexpr
autoname =
"blob_id";
154 using TType= reference_wrapper<const string>;
155 staticconstexpr
autoname =
"id2_chunk";
161 usingSPSG_ArgsBase::SPSG_ArgsBase;
164 template<EValue value>
168 auto& cached = get<SValue<value>>(
m_Cached);
175 template<EValue value>
struct SValue: std::optional<typename SArg<value>::TType> {};
180 template<
typenameTValue>
183 template<
template<
typename>
classTCmp>
205 string GetCookie(
const string& name)
const;
249 string GetCookie(
function<
string()> get_auth_token);
269 template<
classTArg,
class...TRest>
272 template<
classTArg>
276 template<
class...TArgs>
279 if(
IsPerf())
return Event(std::forward<TArgs>(args)...);
283 Print(std::forward<TArgs>(args)...);
288 template<
class...TArgs>
297 auto ms= chrono::duration<double, milli>(chrono::steady_clock::now().time_since_epoch()).count();
298 autothread_id = this_thread::get_id();
314vector<tuple<double, EType, thread::id>>
m_Events;
317 template<
classTArg,
class...TRest>
321 SPack<TRest...>(std::move(base)),
325 template<
classTNextArg>
326 SPack<TNextArg, TArg, TRest...>
operator<<(
constTNextArg& next_arg)
328 return{ std::move(*
this), &next_arg };
334 template<
class...TArgs>
344 template<
classTArg>
348m_DebugPrintout(debug_printout),
352 template<
classTNextArg>
355 return{ std::move(*
this), &next_arg };
361 template<
class...TArgs>
364m_DebugPrintout->Process(*
m_Arg, std::forward<TArgs>(args)...);
382 template<
typenameT, enable_if_t<is_convertible_v<T,
int>,
int> = 0>
454 SPSG_Reply(
string id,
const SPSG_Params& params, shared_ptr<TPSG_Queue> q, weak_ptr<SPSG_Stats> s = weak_ptr<SPSG_Stats>(),
bool r=
false) :
457 queue(std::move(q)),
476 auto& values = refused_stream ? values_pair.second : values_pair.first;
477 returnvalues ? values-- : 0;
484 using TValues= pair<TValuesPair, TValuesPair>;
523shared_ptr<void>
Set();
611 return m_Request->reply->debug_printout.id;
617 template<
classTOnRetry,
classTOnFail>
642 static auto s_Pop(list<SPSG_TimedRequest>& queue)
644 autotimed_req = std::move(queue.front());
645 autop = timed_req.Get();
647 returntuple_cat(make_tuple(make_optional(std::move(timed_req))), std::move(p));
656 returnlocked->empty() ? decltype(
s_Pop(*locked)){} :
s_Pop(*locked);
659 template<
class... TArgs>
717pair<bitset<SPSG_ThrottleParams::SThreshold::kMaxDenominator>,
size_t>
threshold_reg;
740that->m_Active.store(new_value);
742 if(new_value ==
eOff) {
743 ERR_POST(
Warning<<
"Disabling throttling for server "<< that->m_Address <<
" after wait");
771 template<
classTSession>
776 usingunordered_map::begin;
777 usingunordered_map::end;
779 usingunordered_map::find;
781 template<
class... TArgs>
785 returnunordered_map::emplace(std::forward<TArgs>(args)...);
791 returnunordered_map::erase(it);
797 returnunordered_map::clear();
808 template<
class... TNgHttp2Cbs>
837 int OnHeader(nghttp2_session* session,
constnghttp2_frame* frame,
const uint8_t* name,
size_tnamelen,
841 enum EHeaders{
eMethod,
eScheme,
eAuthority,
ePath,
eUserAgent,
ePeerID,
eSessionID,
eSubHitID,
eCookie,
eClientIP,
eSize};
849 if(req->Retry(
error, refused_stream)) {
854 return Fail(processor_id, req,
error, refused_stream);
858 int OnFrameRecv(nghttp2_session *session,
constnghttp2_frame *frame);
860 static int s_OnFrameRecv(nghttp2_session* session,
constnghttp2_frame* frame,
void* user_data)
872 template<
classTImpl>
875 template<
class... TArgs>
877TImpl(std::forward<TArgs>(args)...),
900io->TImpl::OnShutdown(handle);
906io->TImpl::OnTimer(handle);
913io->TImpl::OnExecute(loop);
918start_barrier.
Wait();
922stop_barrier.
Wait();
924io->TImpl::AfterExecute();
942 usingTBase::operator[];
948 template<
class... TArgs>
951TBase::emplace_back(std::forward<TArgs>(args)...);
969++
m_Data[group][counter];
975 template<
class... TArgs>
976 void Report(TArgs&&... args);
979 using TData= vector<vector<atomic_uint>>;
981 template<EGroup group>
985 struct SReport{
template<EGroup group>
static void Func(
const TData&
data,
const char* prefix,
unsignedreport); };
987 template<
classTWhat,
class... TArgs>
1001++
m_Data[avg_time].second;
1007 void Report(
const char* prefix,
unsignedreport);
1012vector<pair<atomic_uint64_t, atomic_uint>>
m_Data;
1040 void Report(
const char* prefix,
unsignedreport);
1043 template<
classTDataId>
1059 void Report(
const char* prefix,
unsignedreport,
const char* name);
1085 autothat =
static_cast<SPSG_Stats*
>(handle->data);
1100 m_Random(piecewise_construct, {}, forward_as_tuple(random_device()()))
1105 void OnTimer(uv_timer_t* handle);
1118 void OnQueue(uv_async_t* handle);
1142pair<uniform_real_distribution<>, default_random_engine>
m_Random;
1158 void OnTimer(uv_timer_t* handle);
1196 bool AddRequest(shared_ptr<SPSG_Request> req,
constatomic_bool& stopped,
const CDeadline& deadline);
1204vector<unique_ptr<SPSG_Thread<SPSG_IoImpl>>>
m_Io;
Template class allowing to store a value or null (unassigned) state.
const string & GetId2Info() const
Get ID2 info.
static const char * expected[]
bool IsNull(void) const
Check if the object is unassigned.
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
EDiagSev
Severity level for the posted diagnostics.
@ eDiag_Error
Error message.
void Warning(CExceptionArgs_Base &args)
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
CUrlArgs(TFlags flags=0)
Create an empty arguments set.
const string & GetValue(const string &name, bool *is_found=0) const
Get value for the given name.
unsigned int
A callback function used to compare two keys in a database.
const TYPE & Get(const CNamedParameterList *param)
const struct ncbi::grid::netcache::search::fields::SIZE size
const GenericPointer< typename T::ValueType > T2 value
double r(size_t dimension_, const Int4 *score_, const double *prob_, double theta_)
EPSG_Status
Retrieval result.
@ eSuccess
Successfully retrieved.
@ eError
An error was encountered while trying to send request or to read and to process the reply.
deque< SPSG_AsyncQueue > TPSG_AsyncQueues
#define PSG_IO_TRACE(message)
uint64_t SecondsToMs(double seconds)
static SLJIT_INLINE sljit_ins ms(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
static SLJIT_INLINE sljit_ins l(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
void Process(TArgs &&... args)
SDebugPrintout * m_DebugPrintout
void operator<<(ostream &(*)(ostream &))
SPack(SDebugPrintout *debug_printout, const TArg *arg)
void Process(TArgs &&... args)
void operator<<(ostream &(*)(ostream &))
SPack(SPack< TRest... > &&base, const TArg *arg)
SPack< TNextArg, TArg, TRest... > operator<<(const TNextArg &next_arg)
SPack< TArg > operator<<(const TArg &arg)
vector< tuple< double, EType, thread::id > > m_Events
void Event(SSocketAddress, TArgs &&...)
void Event(unsigned, const SUvNgHttp2_Error &)
void Event(const SPSG_Args &, const SPSG_Chunk &)
SDebugPrintout(string i, const SPSG_Params ¶ms)
void Print(SSocketAddress address, const string &path, const string &sid, const string &phid, const string &ip, SUv_Tcp::TPort port)
void Event(const SUvNgHttp2_Error &)
void Process(TArgs &&... args)
uint32_t GetMaxStreams() const
static TType Get(const string &value)
reference_wrapper< const string > TType
pair< SPSG_ArgsBase::EChunkType, reference_wrapper< const string > > TType
static TType Get(const string &value)
reference_wrapper< const string > TType
pair< SPSG_ArgsBase::EItemType, reference_wrapper< const string > > TType
const string & GetValue(const string &name) const
tuple< SValue< eItemType >, SValue< eChunkType >, SValue< eBlobId >, SValue< eId2Chunk > > m_Cached
static auto s_Pop(list< SPSG_TimedRequest > &queue)
void Emplace(TArgs &&... args)
SThreadSafe< list< SPSG_TimedRequest > > m_Queue
SPSG_AsyncQueuesRef(TPSG_AsyncQueues &queues)
TPSG_AsyncQueues & m_Queues
void NotifyOne() volatile
SPSG_Chunk & operator=(SPSG_Chunk &&)=default
SPSG_Chunk(SPSG_Chunk &&)=default
SPSG_Chunk & operator=(const SPSG_Chunk &)=delete
SPSG_Chunk(const SPSG_Chunk &)=delete
SNoServers(const SPSG_Params ¶ms, SPSG_Servers::TTS &servers)
atomic_bool & m_FailRequests
bool operator()(bool discovered, SUv_Timer *timer)
const uint64_t m_RetryDelay
void OnShutdown(uv_async_t *)
void OnExecute(uv_loop_t &loop)
SPSG_ThrottleParams m_ThrottleParams
void OnTimer(uv_timer_t *handle)
SPSG_Servers::TTS & m_Servers
SPSG_AsyncQueuesRef m_QueuesRef
shared_ptr< SPSG_Stats > m_Stats
CServiceDiscovery m_Service
SPSG_DiscoveryImpl(CServiceDiscovery service, shared_ptr< SPSG_Stats > stats, const SPSG_Params ¶ms, SPSG_Servers::TTS &servers, SPSG_AsyncQueuesRef queues_ref)
optional< CHttpCookies > m_Cookies
string GetCookie(const string &name) const
bool AddRequest(shared_ptr< SPSG_Request > req, const atomic_bool &stopped, const CDeadline &deadline)
SPSG_Servers::TTS m_Servers
bool RejectsRequests() const
SUv_Barrier m_StartBarrier
SPSG_IoCoordinator(CServiceDiscovery service)
atomic< size_t > m_RequestCounter
SUv_Barrier m_StopBarrier
TPSG_AsyncQueues m_Queues
SPSG_Thread< SPSG_DiscoveryImpl > m_Discovery
shared_ptr< SPSG_Stats > stats
atomic< size_t > m_RequestId
vector< unique_ptr< SPSG_Thread< SPSG_IoImpl > > > m_Io
SPSG_Server & operator*()
SPSG_Server * operator->()
deque< SUvNgHttp2_Session< SPSG_IoSession > > sessions
static void s_OnQueue(uv_async_t *handle)
void OnTimer(uv_timer_t *handle)
void CheckForNewServers(uv_async_t *handle)
void OnQueue(uv_async_t *handle)
void CheckRequestExpiration()
SPSG_IoImpl(const SPSG_Params ¶ms, SPSG_Servers::TTS &servers, SPSG_AsyncQueue &queue)
deque< SServerSessions > m_Sessions
void OnShutdown(uv_async_t *handle)
SPSG_IoImpl.
SPSG_AsyncQueue & m_Queue
void OnExecute(uv_loop_t &loop)
void AddNewServers(uv_async_t *handle)
pair< uniform_real_distribution<>, default_random_engine > m_Random
SPSG_Servers::TTS & m_Servers
int OnStreamClose(nghttp2_session *session, int32_t stream_id, uint32_t error_code)
bool Fail(SPSG_Processor::TId processor_id, shared_ptr< SPSG_Request > req, const SUvNgHttp2_Error &error, bool refused_stream=false)
bool RetryFail(SPSG_Processor::TId processor_id, shared_ptr< SPSG_Request > req, const SUvNgHttp2_Error &error, bool refused_stream=false)
SPSG_AsyncQueue & m_Queue
bool CanProcessRequest(shared_ptr< SPSG_Request > &req)
SPSG_Requests< SPSG_IoSession > m_Requests
static int s_OnFrameRecv(nghttp2_session *session, const nghttp2_frame *frame, void *user_data)
SPSG_Submitter::TId GetInternalId() const
SPSG_IoSession(SPSG_Server &s, const SPSG_Params ¶ms, SPSG_AsyncQueue &queue, uv_loop_t *loop, TNgHttp2Cbs &&... callbacks)
SPSG_IoSession.
void CheckRequestExpiration()
bool ProcessRequest(SPSG_TimedRequest timed_req, SPSG_Processor::TId processor_id, shared_ptr< SPSG_Request > req)
int OnHeader(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, size_t namelen, const uint8_t *value, size_t valuelen, uint8_t flags)
void OnReset(SUvNgHttp2_Error error) override
array< SNgHttp2_Header< NGHTTP2_NV_FLAG_NO_COPY_NAME >, eSize > m_Headers
int OnData(nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len)
int OnFrameRecv(nghttp2_session *session, const nghttp2_frame *frame)
TPSG_DebugPrintout debug_printout
TPSG_RefusedStreamRetries refused_stream_retries
TPSG_MaxConcurrentRequestsPerServer max_concurrent_requests_per_server
TPSG_RequestsPerIo requests_per_io
TPSG_AdminAuthTokenName admin_auth_token_name
string GetCookie(function< string()> get_auth_token)
TPSG_AuthToken auth_token
TPSG_RequestRetries request_retries
static unsigned s_GetCompetitiveAfter(double io_timer_period, double timeout)
TPSG_PsgClientMode client_mode
const unsigned competitive_after
static unsigned s_GetRequestTimeout(double io_timer_period)
SPSG_Params(SPSG_Env env={})
TPSG_MaxConcurrentSubmits max_concurrent_submits
TPSG_AdminAuthToken admin_auth_token
TPSG_UserRequestIds user_request_ids
TPSG_AuthTokenName auth_token_name
TPSG_IoTimerPeriod io_timer_period
const unsigned request_timeout
static atomic< TId > sm_NextId
SPSG_Nullable< size_t > expected
vector< SPSG_Chunk > chunks
static int From(EPSG_Status status)
auto CanBeChangedTo(const SStatus &other) const
void SetComplete() volatile
void SetStatus(SStatus status, bool reset) volatile
const volatile atomic_bool & InProgress() const volatile
void AddMessage(string message, EDiagSev severity, optional< int > code)
atomic< SStatus > m_Status
void AddError(string message, SStatus status=EPSG_Status::eError, EDiagSev severity=eDiag_Error, optional< int > code=nullopt)
SStatus GetStatus() const volatile
deque< SPSG_Message > m_Messages
SPSG_Message GetMessage(EDiagSev min_severity)
shared_ptr< TPSG_Queue > queue
SDebugPrintout debug_printout
void SetFailed(string message, SState::SStatus status=EPSG_Status::eError)
SThreadSafe< list< SItem::TTS * > > new_items
SThreadSafe< list< SItem::TTS > > items
weak_ptr< SPSG_Stats > stats
SPSG_Reply(string id, const SPSG_Params ¶ms, shared_ptr< TPSG_Queue > q, weak_ptr< SPSG_Stats > s=weak_ptr< SPSG_Stats >(), bool r=false)
optional< SItem::TTS * > GetNextItem(CDeadline deadline)
CRef< CRequestContext > m_Context
weak_ptr< void > m_ExistingGuard
SContext(CRef< CRequestContext > context)
unsigned GetRetries(SPSG_Retries::EType type, bool refused_stream)
unordered_map< string, SPSG_Reply::SItem::TTS * > m_ItemsByID
SPSG_Processor processed_by
EStateResult(SPSG_Request::*)(const char *&data, size_t &len) TState
bool Retry(const SUvNgHttp2_Error &error, bool refused_stream=false)
EStateResult StateData(const char *&data, size_t &len)
shared_ptr< SPSG_Reply > reply
SPSG_Submitter submitted_by
SPSG_Request(string p, shared_ptr< SPSG_Reply > r, CRef< CRequestContext > c, const SPSG_Params ¶ms)
EStateResult OnReplyData(SPSG_Processor::TId processor_id, const char *data, size_t len)
bool Fail(SPSG_Processor::TId processor_id, const SUvNgHttp2_Error &error, bool refused_stream=false)
EStateResult StatePrefix(const char *&data, size_t &len)
auto & OnReplyDone(SPSG_Processor::TId processor_id)
EUpdateResult UpdateItem(SPSG_Args::EItemType item_type, SPSG_Reply::SItem &item, const SPSG_Args &args)
EStateResult StateArgs(const char *&data, size_t &len)
SPSG_Requests(TSession &session)
auto emplace(TArgs &&... args)
SPSG_Retries(const SPSG_Params ¶ms)
pair< TValuesPair, TValuesPair > TValues
pair< unsigned, unsigned > TValuesPair
SPSG_Retries(const TValuesPair &values_pair)
unsigned Get(EType type, bool refused_stream)
SPSG_Throttling throttling
atomic_int available_streams
SPSG_Server(SSocketAddress a, double r, int as, SPSG_ThrottleParams p, uv_loop_t *l)
const SSocketAddress address
atomic_bool fail_requests
void emplace_back(TArgs &&... args)
deque< SPSG_Server > TBase
size_t size() const volatile
vector< pair< atomic_uint64_t, atomic_uint > > m_Data
void Report(const char *prefix, unsigned report)
static const char * GetName(EAvgTime avg_time)
void AddTime(EAvgTime avg_time, double value)
static void Func(TData &data)
static void Func(const TData &data, const char *prefix, unsigned report)
void Apply(EGroup start_with, TArgs &&... args)
void IncCounter(EGroup group, unsigned counter)
void Report(TArgs &&... args)
vector< vector< atomic_uint > > TData
void AddId(const TDataId &data_id)
atomic_uint64_t m_Received
SThreadSafe< deque< TDataId > > m_Ids
void Report(const char *prefix, unsigned report, const char *name)
void AddData(EDataType type, size_t size)
void AddData(bool has_blob_id, EDataType type, size_t size)
SData< CPSG_BlobId > m_Blobs
void Report(const char *prefix, unsigned report)
SThreadSafe< unordered_set< string > > m_TSEs
SData< CPSG_ChunkId > m_Chunks
void AddId(const CPSG_BlobId &blob_id)
void AddId(const CPSG_ChunkId &chunk_id)
static void s_OnTimer(uv_timer_t *handle)
void Init(uv_loop_t *loop)
SPSG_Servers::TTS & m_Servers
SPSG_Stats(SPSG_Servers::TTS &servers)
static void s_Execute(SPSG_Thread *io, SUv_Barrier &start_barrier, SUv_Barrier &stop_barrier)
SPSG_Thread(SUv_Barrier &start_barrier, SUv_Barrier &stop_barrier, uint64_t timeout, uint64_t repeat, TArgs &&... args)
static void s_OnTimer(uv_timer_t *handle)
static void s_OnShutdown(uv_async_t *handle)
SThreshold(string error_rate)
SPSG_ThrottleParams.
constexpr static size_t kMaxDenominator
TPSG_ThrottleUntilDiscovery until_discovery
const volatile uint64_t period
TPSG_ThrottleMaxFailures max_failures
pair< bitset< SPSG_ThrottleParams::SThreshold::kMaxDenominator >, size_t > threshold_reg
bool Adjust(const SSocketAddress &address, bool result)
SStats(SPSG_ThrottleParams p)
SPSG_ThrottleParams params
SPSG_Throttling(const SSocketAddress &address, SPSG_ThrottleParams p, uv_loop_t *l)
SPSG_Throttling.
atomic< EThrottling > m_Active
bool AddResult(bool result)
SThreadSafe< SStats > m_Stats
uint64_t Configured() const
static void s_OnTimer(uv_timer_t *handle)
static void s_OnSignal(uv_async_t *handle)
const SSocketAddress & m_Address
SPSG_TimedRequest & operator=(SPSG_TimedRequest &&)=default
bool CheckExpiration(const SPSG_Params ¶ms, const SUvNgHttp2_Error &error, TOnRetry on_retry, TOnFail on_fail)
SPSG_TimedRequest(shared_ptr< SPSG_Request > r)
shared_ptr< SPSG_Request > m_Request
SPSG_TimedRequest(SPSG_TimedRequest &&)=default
static SSocketAddress Parse(const string &address, SHost::EName name=SHost::EName::eResolved)
SNgHttp2_Session m_Session
void Init(void *d, uv_loop_t *l, uv_async_cb cb)
void Run(uv_run_mode mode=UV_RUN_DEFAULT)
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4