: m_query_list(query_list)
48shared_ptr<CCassQueryList> ql = m_query_list.lock();
62shared_ptr<CCassQueryList> rv = make_shared<CCassQueryListAccess>();
64rv->m_cass_conn = cass_conn;
65rv->m_keyspace = cass_conn->Keyspace();
74 ERR_POST(
Error<<
"Destructing non-empty CCassQueryList -- has active queries");
77 ERR_POST(
Error<<
"Destructing non-empty CCassQueryList -- has busy slots");
80 ERR_POST(
Error<<
"Destructing non-empty CCassQueryList -- has pending tasks");
91shared_ptr<CCassQueryList>
self=
m_self_weak.lock();
154 const char*
kSQrySlotStateStr[] = {
"ssAvailable",
"ssAttached",
"ssReadingRow",
"ssReseting",
"ssReleasing"};
170thread::id expected_empty{};
171 if(!
m_owning_thread.compare_exchange_strong(expected_empty, this_thread::get_id())) {
196 if(post_async && !slot) {
197 m_pending_arr.push_back({std::move(consumer), retry_count});
203 AttachSlot(slot, {std::move(consumer), retry_count});
245slot->
m_qry->Close();
257 assert(pending_slot.m_retry_count > 0);
258 assert(pending_slot.m_retry_count < 1000);
261slot->
m_consumer= std::move(pending_slot.m_consumer);
262 if(!slot->
m_qry) {
266slot->
m_qry->Close();
273 ERR_POST(
Info<<
"Consumer refused to start, detaching...");
294 boolrelease{
true};
317 booldo_continue{
true};
320do_continue =
false;
347do_continue =
false;
375shared_ptr<CCassQuery> rv = slot.m_qry;
376slot.m_qry =
nullptr;
389 if(slot->m_state !=
ssAvailable&& slot->m_consumer.get() == consumer) {
391 if(slot->m_consumer) {
392slot->m_consumer->Failed(slot->m_qry, *
this, slot->m_index, e);
403 if(it.m_consumer.get() == consumer) {
421 if(slot->m_consumer) {
422slot->m_consumer->Failed(slot->m_qry, *
this, slot->m_index, e);
426slot->m_qry =
nullptr;
470 while(
m_query_arr[index].m_qry ==
nullptr&& index > 0) {
473 if(index == 0 &&
m_query_arr[index].m_qry ==
nullptr) {
483 for(
size_tindex = 0; index <
m_query_arr.size(); ++index) {
502 boolneed_restart{
false};
505 if(slot->
m_qry->IsActive()) {
506 switch(slot->
m_qry->WaitAsync(0)) {
530need_restart =
true;
550slot->
m_qry->Restart();
551 ERR_POST(
Warning<<
"CCassQueryList::CheckSlots: exception (IGNORING & RESTARTING) ["<< index <<
"]: "<< ex_msg <<
"\nquery: "<< slot->
m_qry->ToString().c_str());
559 ERR_POST(
Error<<
"CCassQueryList::CheckSlots: exception ["<< index <<
"]: "<< ex_msg <<
" -- unexpected state: "<< slot->
m_state<<
"\nquery: "<< slot->
m_qry->ToString().c_str());
572}
else if(slot->
m_qry&& slot->
m_qry->IsAsync()) {
573slot->
m_qry->Close();
#define BEGIN_IDBLOB_SCOPE
BEGIN_IDBLOB_SCOPE USING_NCBI_SCOPE
const char * kSQrySlotStateStr[]
function< void()> TCassQueryListTickCB
virtual void OnData() override
CQryNotification(shared_ptr< CCassQueryList > query_list, size_t index)
CCassQueryList::CQryNotification.
vector< shared_ptr< CQryNotification > > m_notification_arr
void DetachSlot(SQrySlot *slot)
CCassQueryList & SetKeyspace(const string &keyspace)
atomic< thread::id > m_owning_thread
SQrySlot * CheckSlot(size_t index, bool discard)
void AttachSlot(SQrySlot *slot, SPendingSlot &&pending_slot)
void Release(SQrySlot *slot)
atomic_bool m_yield_in_progress
vector< SPendingSlot > m_pending_arr
virtual ~CCassQueryList()
atomic_size_t m_attached_slots
static constexpr uint64_t kReadyPopWaitTimeout
weak_ptr< CCassQueryList > m_self_weak
void Execute(unique_ptr< ICassQueryListConsumer > consumer, int retry_count, bool post_async=false)
static shared_ptr< CCassQueryList > Create(shared_ptr< CCassConnection > cass_conn) noexcept
CCassQueryList.
size_t NumberOfActiveQueries() const
shared_ptr< CCassConnection > m_cass_conn
shared_ptr< CCassQuery > Extract(size_t slot_index)
static constexpr unsigned int kResetRelaxTime
size_t NumberOfBusySlots() const
vector< SQrySlot > m_query_arr
void CheckPending(SQrySlot *slot)
string GetKeyspace() const
size_t GetMaxQueries() const
void ReadRows(SQrySlot *slot)
TCassQueryListTickCB m_tick_cb
SQrySlot * CheckSlots(bool discard, bool wait=true)
size_t NumberOfPendingSlots() const
CCassQueryList & SetTickCB(TCassQueryListTickCB cb)
CCassQueryList & SetMaxQueries(size_t max_queries)
static constexpr uint64_t kReadyPushWaitTimeout
void Cancel(const exception *e=nullptr)
mpmc_bounded_queue_w< size_t, kNotifyQueueLen > m_ready
@ eQueryFailedRestartable
static bool s_CtrlCPressed(void)
bool pop_wait(T *data, int64_t timeoutmks)
static const char * expected[]
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
void Error(CExceptionArgs_Base &args)
TErrCode GetErrCode(void) const
Get error code.
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
void Warning(CExceptionArgs_Base &args)
virtual const char * what(void) const noexcept
Standard report (includes full backlog).
void Info(CExceptionArgs_Base &args)
shared_ptr< CCassQuery > m_qry
unique_ptr< ICassQueryListConsumer > m_consumer
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4