A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://github.com/GreptimeTeam/greptimedb-ingester-rust below:

GreptimeTeam/greptimedb-ingester-rust: A Rust ingester for GreptimeDB, which is compatible with GreptimeDB protocol and lightweight.

A high-performance Rust client for ingesting data into GreptimeDB, supporting both low-latency individual inserts and high-throughput bulk streaming operations.

The ingester provides two main APIs tailored for different use cases:

1. Low-Latency Insert API

Best for: Real-time applications, IoT sensors, interactive systems

use greptimedb_ingester::api::v1::*;
use greptimedb_ingester::client::Client;
use greptimedb_ingester::helpers::schema::*;
use greptimedb_ingester::helpers::values::*;
use greptimedb_ingester::{database::Database, Result, ColumnDataType};

#[tokio::main]
async fn main() -> Result<()> {
    // Connect to GreptimeDB
    let client = Client::with_urls(&["localhost:4001"]);
    let database = Database::new_with_dbname("public", client);
    
    // Define schema
    let schema = vec![
        tag("device_id", ColumnDataType::String),
        timestamp("ts", ColumnDataType::TimestampMillisecond),
        field("temperature", ColumnDataType::Float64),
    ];
    
    // Create data rows
    let rows = vec![Row {
        values: vec![
            string_value("device_001".to_string()),
            timestamp_millisecond_value(1234567890000),
            f64_value(23.5),
        ],
    }];
    
    // Insert data with minimal latency
    let insert_request = RowInsertRequests {
        inserts: vec![RowInsertRequest {
            table_name: "sensor_data".to_string(),
            rows: Some(Rows {
                schema,
                rows,
            }),
        }],
    };
    
    let affected_rows = database.insert(insert_request).await?;
    println!("Inserted {} rows", affected_rows);
    Ok(())
}
2. High-Throughput Bulk API

Best for: ETL operations, data migration, batch processing, log ingestion

use greptimedb_ingester::{BulkInserter, BulkWriteOptions, ColumnDataType, CompressionType, Row, TableSchema, Value};
use greptimedb_ingester::api::v1::*;
use greptimedb_ingester::helpers::schema::*;
use greptimedb_ingester::helpers::values::*;
use greptimedb_ingester::client::Client;
use greptimedb_ingester::database::Database;
use std::time::Duration;

#[tokio::main]
async fn main() -> greptimedb_ingester::Result<()> {
    let client = Client::with_urls(&["localhost:4001"]);
    let current_timestamp = || 1234567890000i64;
    struct SensorData { timestamp: i64, device_id: String, temperature: f64 }
    let sensor_data: Vec<SensorData> = vec![
        SensorData { timestamp: 1234567890000, device_id: "device_001".to_string(), temperature: 23.5 },
        SensorData { timestamp: 1234567890001, device_id: "device_002".to_string(), temperature: 24.0 },
    ];

    // Step 1: Create table manually (bulk API requires table to exist beforehand)
    // Option A: Use insert API to create table
    let database = Database::new_with_dbname("public", client.clone());
    let init_schema = vec![
        timestamp("ts", ColumnDataType::TimestampMillisecond),
        field("device_id", ColumnDataType::String),
        field("temperature", ColumnDataType::Float64),
    ];

    let init_request = RowInsertRequests {
        inserts: vec![RowInsertRequest {
            table_name: "sensor_readings".to_string(),
            rows: Some(Rows {
                schema: init_schema,
                rows: vec![greptimedb_ingester::api::v1::Row {
                    values: vec![
                        timestamp_millisecond_value(current_timestamp()),
                        string_value("init_device".to_string()),
                        f64_value(0.0),
                    ],
                }],
            }),
        }],
    };

    database.insert(init_request).await?; // Table is now created

    // Option B: Create table using SQL (if you have SQL access)
    // CREATE TABLE sensor_readings (
    //     ts TIMESTAMP TIME INDEX,
    //     device_id STRING,
    //     temperature DOUBLE
    // );

    // Step 2: Now use bulk API for high-throughput operations
    let bulk_inserter = BulkInserter::new(client, "public");

    // Define table schema (must match the insert API schema above)
    let table_template = TableSchema::builder()
        .name("sensor_readings")
        .build()
        .unwrap()
        .add_timestamp("ts", ColumnDataType::TimestampMillisecond)
        .add_field("device_id", ColumnDataType::String)
        .add_field("temperature", ColumnDataType::Float64);

    // Create high-performance stream writer
    let mut bulk_writer = bulk_inserter
        .create_bulk_stream_writer(
            &table_template,
            Some(BulkWriteOptions::default()
                .with_parallelism(8)            // 8 concurrent requests
                .with_compression(CompressionType::Zstd) // Enable Zstandard compression
                .with_timeout(Duration::from_secs(60))   // 60s timeout
            ),
        )
        .await?;

    // Method 1: Optimized API (recommended for production)
    let mut rows1 = bulk_writer.alloc_rows_buffer(10000, 1024)?;  // capacity: 10000, row_buffer_size: 1024
    for data in &sensor_data {
        let row = Row::new().add_values(vec![
            Value::TimestampMillisecond(data.timestamp),
            Value::String(data.device_id.clone()),
            Value::Float64(data.temperature),
        ]);
        rows1.add_row(row)?;
    }
    let request_id1 = bulk_writer.write_rows_async(rows1).await?;

    // Method 2: Schema-safe API
    let mut rows2 = bulk_writer.alloc_rows_buffer(10000, 1024)?;  // capacity: 10000, row_buffer_size: 1024
    for data in &sensor_data {
        let row = bulk_writer.new_row()
            .set("ts", Value::TimestampMillisecond(data.timestamp))?
            .set("device_id", Value::String(data.device_id.clone()))?
            .set("temperature", Value::Float64(data.temperature))?
            .build()?;
        rows2.add_row(row)?;
    }
    let request_id2 = bulk_writer.write_rows_async(rows2).await?;

    // Wait for all operations to complete
    let responses = bulk_writer.wait_for_all_pending().await?;
    bulk_writer.finish().await?;
    Ok(())
}

Important:

  1. Manual Table Creation Required: Bulk API does not create tables automatically. You must create the table beforehand using either:
    • Insert API (which supports auto table creation), or
    • SQL DDL statements (CREATE TABLE)
  2. Schema Matching: The table template in bulk API must exactly match the existing table schema.
  3. Column Types: For bulk operations, currently use add_field() instead of add_tag(). Tag columns are part of the primary key in GreptimeDB, but bulk operations don't yet support tables with tag columns. This limitation will be addressed in future versions.
Scenario API Choice Why IoT sensor data Low-Latency Insert Real-time requirements, small batches Interactive dashboards Low-Latency Insert User expects immediate feedback ETL pipelines Bulk Streaming Process millions of records efficiently Log ingestion Bulk Streaming High volume, can batch data Data migration Bulk Streaming Transfer large datasets quickly

The repository includes comprehensive examples demonstrating both approaches:

Run with: cargo run --example insert_example

Run with: cargo run --example bulk_stream_writer_example

API Design & Optimization

Each BulkStreamWriter is bound to a specific table schema, providing both safety and performance benefits:

use greptimedb_ingester::{BulkStreamWriter, Rows, Column};

async fn example(bulk_writer: &BulkStreamWriter, column_schemas: &[Column]) -> greptimedb_ingester::Result<()> {
    // Recommended: Use writer-bound buffer allocation
    let mut rows = bulk_writer.alloc_rows_buffer(10000, 1024)?;  // capacity: 10000, row_buffer_size: 1024
    // Shares Arc<Schema> with writer for optimal performance
    // Automatic schema compatibility
    
    // Alternative: Direct allocation
    let mut rows = Rows::new(column_schemas, 10000, 1024)?;  // capacity: 10000, row_buffer_size: 1024
    // Requires schema conversion and validation overhead
    Ok(())
}

Fast API (production recommended):

use greptimedb_ingester::{Row, Value};

fn create_row() -> Row {
    let ts = 1234567890i64;
    let device_id = "device001".to_string();
    let temperature = 25.0f64;
    
    Row::new().add_values(vec![
        Value::TimestampMillisecond(ts),
        Value::String(device_id),
        Value::Float64(temperature),
    ])
    // Fastest performance
    // Requires correct field order
}

Safe API (development recommended):

use greptimedb_ingester::{BulkStreamWriter, Value};

async fn example(bulk_writer: &BulkStreamWriter) -> greptimedb_ingester::Result<()> {
    let ts = 1234567890i64;
    let device_id = "device001".to_string();
    let temperature = 25.0f64;
    
    let row = bulk_writer.new_row()
        .set("timestamp", Value::TimestampMillisecond(ts))?
        .set("device_id", Value::String(device_id))?
        .set("temperature", Value::Float64(temperature))?
        .build()?;
    // O(1) field name lookup (HashMap-based)
    // Field name validation
    // Prevents field order mistakes
    // Compile-time safety
    Ok(())
}
Performance Characteristics

The bulk API supports true parallelism through async request submission:

use greptimedb_ingester::{BulkStreamWriter, Rows};

async fn example(bulk_writer: &mut BulkStreamWriter, batches: Vec<Rows>) -> greptimedb_ingester::Result<()> {
    // Submit multiple batches without waiting
    let mut request_ids = Vec::new();
    for batch in batches {
        let id = bulk_writer.write_rows_async(batch).await?;
        request_ids.push(id);
    }
    
    // Option 1: Wait for all pending requests
    let responses = bulk_writer.wait_for_all_pending().await?;
    
    // Option 2: Wait for specific requests
    for request_id in request_ids {
        let response = bulk_writer.wait_for_response(request_id).await?;
        println!("Request {} completed with {} rows", 
                 request_id, response.affected_rows());
    }
    Ok(())
}

Full support for GreptimeDB data types:

use greptimedb_ingester::{Value, ColumnDataType, Row};

fn create_data_row() -> Row {
    Row::new()
        .add_value(Value::TimestampMillisecond(1234567890123))
        .add_value(Value::String("device_001".to_string()))
        .add_value(Value::Float64(23.5))
        .add_value(Value::Int64(1))
        .add_value(Value::Boolean(true))
        .add_value(Value::Binary(vec![0xDE, 0xAD, 0xBE, 0xEF]))
        .add_value(Value::Json(r#"{"key": "value"}"#.to_string()))
}

Efficient data access patterns:

use greptimedb_ingester::Row;

fn process_row_data(row: &Row) {
    fn process_binary(_data: &[u8]) {
        // Process binary data
    }
    
    // Type-safe value access
    if let Some(device_name) = row.get_string(1) {
        println!("Device: {}", device_name);
    }
    
    // Binary data access
    if let Some(binary_data) = row.get_binary(5) {
        process_binary(&binary_data);
    }
}
For Low-Latency Applications For High-Throughput Applications

Set up your GreptimeDB connection:

use greptimedb_ingester::{ChannelConfig, ChannelManager};
use greptimedb_ingester::client::Client;
use std::time::Duration;

fn setup_client() -> Client {
    let channel_config = ChannelConfig::default()
        .timeout(Duration::from_secs(30))
        .connect_timeout(Duration::from_secs(5));
    let channel_manager = ChannelManager::with_config(channel_config);
    Client::with_manager_and_urls(channel_manager, 
        &["localhost:4001"])
}

The library provides comprehensive error types:

use greptimedb_ingester::{Result, Error};
use greptimedb_ingester::api::v1::RowInsertRequests;
use greptimedb_ingester::database::Database;

async fn handle_insert(database: &Database, request: RowInsertRequests) {
    match database.insert(request).await {
        Ok(affected_rows) => println!("Inserted {} rows", affected_rows),
        Err(Error::RequestTimeout { .. }) => {
            // Handle timeout
        },
        Err(Error::SerializeMetadata { .. }) => {
            // Handle metadata serialization issues
        },
        Err(e) => {
            eprintln!("Unexpected error: {:?}", e);
        }
    }
}

Low-Latency API:

Bulk API:

This library uses the Apache 2.0 license to strike a balance between open contributions and allowing you to use the software however you want.


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4