(
query!=
nullptr) {
59 query->SetSQL(
"SELECT data FROM "+ keyspace +
"."+ chunk_table_name
60+
" WHERE sat_key = ? AND last_modified = ? AND chunk_no = ?", 3);
63 query->BindInt32(2, chunk_no);
68shared_ptr<CCassConnection>
conn,
69 const string& keyspace,
80std::move(data_error_cb)
86shared_ptr<CCassConnection>
conn,
87 const string& keyspace,
95, m_Modified(modified)
96, m_LoadChunks(load_chunks)
97, m_Mode(eBlobTaskModeDefault)
99 m_Blob->SetModified(modified);
103shared_ptr<CCassConnection>
conn,
104 const string& keyspace,
105unique_ptr<CBlobRecord> blob_record,
110, m_Blob(std::move(blob_record))
111, m_Modified(m_Blob->GetModified())
112, m_LoadChunks(load_chunks)
113, m_PropsFound(
true)
114, m_ExplicitBlob(
true)
115, m_Mode(eBlobTaskModeDefault)
137 "CCassBlobTaskLoadBlob: DataReadyCB can't be assigned after the loading process has started");
144 boolb_need_repeat{
false};
146b_need_repeat =
false;
156b_need_repeat =
true;
162 string sql=
"SELECT " 175 " FROM "+
GetKeySpace() +
".blob_prop WHERE sat_key = ?";
178qry->SetSQL(
sql, 1);
179qry->BindInt32(0,
GetKey());
181 sql+=
" and last_modified = ?";
182qry->SetSQL(
sql, 2);
183qry->BindInt32(0,
GetKey());
184qry->BindInt64(1,
m_Blob->GetModified());
197 if(!it.query->IsEOF()) {
199.SetModified(it.query->FieldGetInt64Value(0))
200.SetClass(it.query->FieldGetInt16Value(1))
201.SetDateAsn1(it.query->FieldGetInt64Value(2, 0))
202.SetDiv(it.query->FieldGetStrValueDef(3,
""))
203.SetFlags(it.query->FieldGetInt64Value(4))
204.SetHupDate(it.query->FieldGetInt64Value(5, 0))
205.SetId2Info(it.query->FieldGetStrValueDef(6,
""))
206.SetNChunks(it.query->FieldGetInt32Value(7))
207.SetOwner(it.query->FieldGetInt32Value(8))
208.SetSize(it.query->FieldGetInt64Value(9))
209.SetSizeUnpacked(it.query->FieldGetInt64Value(10))
210.SetUserName(it.query->FieldGetStrValueDef(11,
""));
218b_need_repeat =
true;
244 string msg=
"Blob failed check or it's " 247 ", modified="+ to_string(
m_Blob->GetModified()) +
253 else if(
m_Blob->GetNChunks() < 0) {
254 string msg=
"Inconsistent n_chunks value: "+ to_string(
m_Blob->GetNChunks()) +
266b_need_repeat =
true;
277[
this] (
CBlobRecord const&,
const unsigned char*
data,
unsigned int size,
intchunk_no) {
284b_need_repeat =
true;
292b_need_repeat =
false;
299 char msg[1024];
msg[0] =
'\0';
301snprintf(
msg,
sizeof(
msg),
302 "Failed to fetch blob (key=%s.%d) result is " 303 "incomplete remaining %ld bytes",
319snprintf(
msg,
sizeof(
msg),
320 "Failed to fetch blob (key=%s.%d) unexpected state (%d)",
325}
while( b_need_repeat);
340 auton_chunks =
m_Blob->GetNChunks();
341 for(
int32_tchunk_no = 0; chunk_no < n_chunks && m_ActiveQueries > 0; ++chunk_no) {
350 if(it.query->IsEOF()) {
353snprintf(
msg,
sizeof(
msg),
354 "Failed to fetch blob chunk (key=%s.%d, chunk=%d)", keyspace.c_str(),
GetKey(), chunk_no);
358 const unsigned char* rawdata =
nullptr;
359 int64_t len= it.query->FieldGetBlobRaw(0, &rawdata);
365snprintf(
msg,
sizeof(
msg),
366 "Failed to fetch blob chunk (key=%s.%d, chunk=%d) size %ld " 367 "is too large", keyspace.c_str(),
GetKey(), chunk_no,
len);
386 auton_chunks =
m_Blob->GetNChunks();
387 boolpassed_active_check =
true;
396 if(!passed_active_check) {
397 ERR_POST(
Trace<<
"Max active queries level reached while fetching blob chunk");
404it.restart_count = 0;
423snprintf(
msg,
sizeof(
msg),
424 "Failed to setup data ready callback (expired) for blob chunk (key=%s.%d, chunk=%d)",
425keyspace.c_str(),
GetKey(), chunk_no);
434 auto tmp= make_unique<CBlobRecord>(
GetKey());
435 tmp->SetModified(
m_Blob->GetModified());
436 tmp->SetNChunks(
m_Blob->GetNChunks());
#define BEGIN_IDBLOB_SCOPE
function< void(CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)> TDataErrorCallback
vector< unsigned char > TBlobChunk
bool GetFlag(EBlobFlags flag_value) const
TTimestamp GetModified() const
void SetDataReadyCB(shared_ptr< CCassDataCallbackReceiver > callback)
unique_ptr< CBlobRecord > ConsumeBlobRecord()
static constexpr size_t kMaxChunksAhead
function< void(CBlobRecord const &blob, bool isFound)> TBlobPropsCallback
void SetPropsCallback(TBlobPropsCallback callback)
bool x_AreAllChunksProcessed() const
unique_ptr< CBlobRecord > m_Blob
void x_CheckChunksFinished(bool &need_repeat)
CCassBlobTaskLoadBlob(shared_ptr< CCassConnection > conn, const string &keyspace, CBlobRecord::TSatKey sat_key, bool load_chunks, TDataErrorCallback data_error_cb)
void SetChunkCallback(TBlobChunkCallbackEx callback)
CBlobRecord::TSize m_RemainingSize
bool IsBlobPropsFound() const
TBlobChunkCallbackEx m_ChunkCallback
function< void(CBlobRecord const &blob, const unsigned char *data, unsigned int size, int chunk_no)> TBlobChunkCallbackEx
vector< bool > m_ProcessedChunks
static void InitBlobChunkDataQuery(CCassQuery *query, string const &keyspace, CBlobRecord const &blob, int32_t chunk_no)
void x_RequestChunksAhead()
static constexpr CBlobRecord::TTimestamp kAnyModified
void x_RequestChunk(CCassQuery &qry, int32_t chunk_no)
TBlobPropsCallback m_PropsCallback
bool CheckMaxActive()
CCassBlobWaiter.
atomic< int32_t > m_State
string GetKeySpace() const
void SetDataReadyCB3(shared_ptr< CCassDataCallbackReceiver > datareadycb3)
TCassConsistency GetReadConsistency() const
weak_ptr< CCassDataCallbackReceiver > m_DataReadyCb3
void SetupQueryCB3(shared_ptr< CCassQuery > &query)
bool CheckReady(shared_ptr< CCassQuery > qry, unsigned int restart_counter, bool &need_repeat)
bool IsDataReadyCallbackExpired() const
vector< SQueryRec > m_QueryArr
shared_ptr< CCassQuery > ProduceQuery() const
void Error(CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
void Query(TCassConsistency c=CCassConsistency::kLocalQuorum, bool run_async=false, bool allow_prepare=true, unsigned int page_size=DEFAULT_PAGE_SIZE)
void SetOnData3(shared_ptr< CCassDataCallbackReceiver > cb)
static CS_CONNECTION * conn
void swap(NCBI_NS_NCBI::pair_base_member< T1, T2 > &pair1, NCBI_NS_NCBI::pair_base_member< T1, T2 > &pair2)
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
@ eDiag_Error
Error message.
@ e500_InternalServerError
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
void Trace(CExceptionArgs_Base &args)
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
BEGIN_IDBLOB_SCOPE USING_NCBI_SCOPE
const struct ncbi::grid::netcache::search::fields::SIZE size
static SLJIT_INLINE sljit_ins msg(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
static const char *const kChunkTableDefault
static const char *const kChunkTableBig
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4