본문으로 바로가기
본문으로 바로가기

ClickHouse Rust 클라이언트

ClickHouse에 연결하기 위한 공식 Rust 클라이언트로, 원래는 Paul Loyd가 개발했습니다. 클라이언트 소스 코드는 GitHub 저장소에서 확인할 수 있습니다.

개요

  • 행을 인코딩/디코딩하기 위해 serde를 사용합니다.
  • skip_serializing, skip_deserializing, rename와 같은 serde 속성을 지원합니다.
  • HTTP 전송에서 RowBinary 포맷을 사용합니다.
    • TCP를 통해 Native 포맷으로 전환할 계획입니다.
  • TLS(native-tlsrustls-tls 기능을 통해)를 지원합니다.
  • 압축 및 압축 해제(LZ4)를 지원합니다.
  • 데이터 조회 및 삽입, DDL 실행, 클라이언트 측 배치 수행을 위한 API를 제공합니다.
  • 단위 테스트를 위한 편리한 목(mock)을 제공합니다.

설치

이 crate를 사용하려면 Cargo.toml에 다음을 추가하십시오:

[dependencies]
clickhouse = "0.12.2"

[dev-dependencies]
clickhouse = { version = "0.12.2", features = ["test-util"] }

또한 다음을 참고하십시오: crates.io 페이지.

Cargo features

  • lz4(기본값으로 활성화됨) — Compression::Lz4Compression::Lz4Hc(_) 변형을 활성화합니다. 이 기능이 활성화되면 WATCH를 제외한 모든 쿼리에 기본값으로 Compression::Lz4가 사용됩니다.
  • native-tls — OpenSSL에 링크되는 hyper-tls를 통해 HTTPS 스키마를 사용하는 URL을 지원합니다.
  • rustls-tls — OpenSSL에 링크되지 않는 hyper-rustls를 통해 HTTPS 스키마를 사용하는 URL을 지원합니다.
  • inserterclient.inserter()를 활성화합니다.
  • test-util — mock을 추가합니다. 예제를 참조하십시오. dev-dependencies에서만 사용하십시오.
  • watchclient.watch 기능을 활성화합니다. 자세한 내용은 해당 섹션을 참조하십시오.
  • uuiduuid 크레이트와 함께 동작하기 위해 serde::uuid를 추가합니다.
  • timetime 크레이트와 함께 동작하기 위해 serde::time을 추가합니다.
참조

HTTPS URL을 통해 ClickHouse에 연결하는 경우 native-tls 또는 rustls-tls 기능 중 하나를 활성화해야 합니다. 둘 다 활성화되어 있는 경우 rustls-tls 기능이 우선적으로 사용됩니다.

ClickHouse 버전 호환성

클라이언트는 LTS 또는 그 이후 버전의 ClickHouse 및 ClickHouse Cloud와 호환됩니다.

v22.6보다 이전 버전의 ClickHouse 서버는 RowBinary를 드물게 잘못 처리합니다. 이 문제를 해결하려면 v0.11+ 클라이언트를 사용하고 wa-37420 기능을 활성화하면 됩니다. 참고: 이 기능은 더 새로운 ClickHouse 버전에서는 사용하지 않아야 합니다.

예시

클라이언트 저장소의 예시를 통해 다양한 클라이언트 사용 시나리오를 다루고자 합니다. 개요는 examples README에서 확인할 수 있습니다.

예시나 다음 문서에서 불명확하거나 누락된 부분이 있다면 언제든지 문의해 주십시오.

사용 방법

참고

ch2rs 크레이트는 ClickHouse의 행 타입을 생성하는 데 유용합니다.

클라이언트 인스턴스 생성

생성한 클라이언트를 재사용하거나 클론하여 내부 hyper 커넥션 풀을 재사용하십시오.

use clickhouse::Client;

let client = Client::default()
    // should include both protocol and port
    .with_url("http://localhost:8123")
    .with_user("name")
    .with_password("123")
    .with_database("test");

HTTPS 또는 ClickHouse Cloud 연결

HTTPS는 rustls-tls 또는 native-tls Cargo feature와 함께 동작합니다.

그런 다음 평소와 같이 클라이언트를 생성합니다. 다음 예시에서는 연결 정보를 저장하기 위해 환경 변수를 사용합니다.

참조

URL에는 프로토콜과 포트가 모두 포함되어야 합니다. 예: https://instance.clickhouse.cloud:8443.

fn read_env_var(key: &str) -> String {
    env::var(key).unwrap_or_else(|_| panic!("{key} env variable should be set"))
}

let client = Client::default()
    .with_url(read_env_var("CLICKHOUSE_URL"))
    .with_user(read_env_var("CLICKHOUSE_USER"))
    .with_password(read_env_var("CLICKHOUSE_PASSWORD"));

또한 다음을 참조하십시오:

행 선택

use serde::Deserialize;
use clickhouse::Row;
use clickhouse::sql::Identifier;

#[derive(Row, Deserialize)]
struct MyRow<'a> {
    no: u32,
    name: &'a str,
}

let table_name = "some";
let mut cursor = client
    .query("SELECT ?fields FROM ? WHERE no BETWEEN ? AND ?")
    .bind(Identifier(table_name))
    .bind(500)
    .bind(504)
    .fetch::<MyRow<'_>>()?;

while let Some(row) = cursor.next().await? { .. }
  • 플레이스홀더 ?fieldsRow 의 필드인 no, name 으로 대체됩니다.
  • 플레이스홀더 ? 는 이후의 bind() 호출에 있는 값들로 대체됩니다.
  • 편리한 fetch_one::<Row>()fetch_all::<Row>() 메서드를 사용하면 각각 첫 번째 행 또는 모든 행을 가져올 수 있습니다.
  • sql::Identifier 를 사용하여 테이블 이름을 바인딩할 수 있습니다.

NB: 전체 응답이 스트리밍되므로, 커서는 일부 행을 이미 생성한 이후에도 오류를 반환할 수 있습니다. 이와 같은 상황이 발생하면, 서버 측에서 응답 버퍼링을 활성화하기 위해 query(...).with_option("wait_end_of_query", "1") 를 사용하는 방법을 시도할 수 있습니다. 자세한 내용. buffer_size 옵션도 유용할 수 있습니다.

참고

행을 선택할 때 wait_end_of_query 를 사용할 경우, 서버 측 메모리 사용량이 증가하고 전체 성능이 저하될 가능성이 높으므로 주의해서 사용하십시오.

행 삽입

use serde::Serialize;
use clickhouse::Row;

#[derive(Row, Serialize)]
struct MyRow {
    no: u32,
    name: String,
}

let mut insert = client.insert("some")?;
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
  • end()가 호출되지 않으면 INSERT는 취소됩니다.
  • 행은 네트워크 부하를 분산하기 위해 스트리밍 방식으로 순차적으로 전송됩니다.
  • ClickHouse는 모든 행이 동일한 파티션에 속하고 행 개수가 max_insert_block_size보다 작을 때에만 배치를 원자적으로 삽입합니다.

비동기 insert (서버 측 배치)

ClickHouse asynchronous inserts를 사용하여 수신 데이터에 대한 클라이언트 측 배치 처리를 피할 수 있습니다. 이를 위해 insert 메서드에 async_insert 옵션을 지정하기만 하면 됩니다(또는 Client 인스턴스 자체에 설정하여 모든 insert 호출에 적용되도록 할 수도 있습니다).

let client = Client::default()
    .with_url("http://localhost:8123")
    .with_option("async_insert", "1")
    .with_option("wait_for_async_insert", "0");

다음도 함께 참조하십시오:

Inserter 기능(클라이언트 측 배치 처리)

inserter Cargo 기능이 필요합니다.

let mut inserter = client.inserter("some")?
    .with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
    .with_max_bytes(50_000_000)
    .with_max_rows(750_000)
    .with_period(Some(Duration::from_secs(15)));

inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.rows > 0 {
    println!(
        "{} bytes, {} rows, {} transactions have been inserted",
        stats.bytes, stats.rows, stats.transactions,
    );
}

// don't forget to finalize the inserter during the application shutdown
// and commit the remaining rows. `.end()` will provide stats as well.
inserter.end().await?;
  • Inserter는 활성 insert 작업 중 하나의 임계값(max_bytes, max_rows, period)에 도달하면 commit()에서 해당 활성 insert를 종료합니다.
  • 병렬 inserter로 인한 부하 급증을 피하기 위해 with_period_bias를 사용하여 활성 INSERT 종료 사이의 간격을 조정할 수 있습니다.
  • 현재 period가 끝나는 시점을 감지하기 위해 Inserter::time_left()를 사용할 수 있습니다. 스트림에서 항목이 드물게 출력되는 경우에는, 한 번 더 Inserter::commit()을 호출해 임계값을 다시 확인하십시오.
  • 시간 임계값은 quanta 크레이트를 사용해 구현되어 inserter의 동작을 빠르게 합니다. test-util이 활성화된 경우에는 사용되지 않으며, 이 경우 커스텀 테스트에서 tokio::time::advance()로 시간을 제어할 수 있습니다.
  • commit() 호출 사이의 모든 행은 동일한 INSERT 문으로 삽입됩니다.
참고

삽입을 종료/완료하려면 flush를 수행해야 하는 것을 잊지 마십시오:

inserter.end().await?;

DDL 실행

단일 노드 배포에서는 다음과 같이 DDL을 실행하면 됩니다:

client.query("DROP TABLE IF EXISTS some").execute().await?;

그러나 로드 밸런서가 있는 클러스터 배포 환경이나 ClickHouse Cloud 환경에서는 wait_end_of_query 옵션을 사용하여 모든 레플리카에 DDL이 적용될 때까지 기다리는 것이 좋습니다. 다음과 같이 수행할 수 있습니다:

client
    .query("DROP TABLE IF EXISTS some")
    .with_option("wait_end_of_query", "1")
    .execute()
    .await?;

ClickHouse 설정

with_option 메서드를 사용하여 다양한 ClickHouse settings을(를) 적용할 수 있습니다. 예를 들면 다음과 같습니다.

let numbers = client
    .query("SELECT number FROM system.numbers")
    // This setting will be applied to this particular query only;
    // it will override the global client setting.
    .with_option("limit", "3")
    .fetch_all::<u64>()
    .await?;

query 외에도 insertinserter 메서드에서 동일하게 동작하며, 추가로 Client 인스턴스에서 동일한 메서드를 호출하여 모든 쿼리에 적용되는 전역 설정을 지정할 수 있습니다.

쿼리 ID

.with_option을 사용하면 query_id 옵션을 설정하여 ClickHouse 쿼리 로그에서 쿼리를 식별할 수 있습니다.

let numbers = client
    .query("SELECT number FROM system.numbers LIMIT 1")
    .with_option("query_id", "some-query-id")
    .fetch_all::<u64>()
    .await?;

query 외에도 insertinserter 메서드에서도 비슷하게 동작합니다.

위험

query_id를 수동으로 설정하는 경우, 반드시 해당 값이 고유하도록 하십시오. 이를 위해 UUID를 사용하는 것이 좋습니다.

또한 클라이언트 리포지토리의 query_id 예제를 참고하십시오.

Session ID

query_id와 마찬가지로 동일한 세션에서 SQL 문을 실행하도록 session_id를 설정할 수 있습니다. session_id는 클라이언트 수준에서 전역으로 설정하거나, query, insert, inserter 호출마다 개별적으로 설정할 수 있습니다.

let client = Client::default()
    .with_url("http://localhost:8123")
    .with_option("session_id", "my-session");
위험

클러스터 배포 환경에서는 「sticky session」이 지원되지 않으므로, 이 기능을 제대로 사용하려면 특정 클러스터 노드에 계속 연결되어 있어야 합니다. 예를 들어 라운드 로빈 방식 로드 밸런서는 연달아 전송되는 요청이 동일한 ClickHouse 노드에서 처리된다는 것을 보장하지 않습니다.

다음 항목도 참고하십시오: 클라이언트 리포지토리의 session_id 예제.

사용자 정의 HTTP 헤더

프록시 인증을 사용하거나 사용자 정의 헤더를 전달해야 하는 경우에는 다음과 같이 설정합니다:

let client = Client::default()
    .with_url("http://localhost:8123")
    .with_header("X-My-Header", "hello");

클라이언트 저장소의 커스텀 HTTP 헤더 예제도 함께 참고하십시오.

커스텀 HTTP 클라이언트

이는 내부 HTTP 연결 풀 설정을 미세 조정할 때 유용합니다.

use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client as HyperClient;
use hyper_util::rt::TokioExecutor;

let connector = HttpConnector::new(); // or HttpsConnectorBuilder
let hyper_client = HyperClient::builder(TokioExecutor::new())
    // For how long keep a particular idle socket alive on the client side (in milliseconds).
    // It is supposed to be a fair bit less that the ClickHouse server KeepAlive timeout,
    // which was by default 3 seconds for pre-23.11 versions, and 10 seconds after that.
    .pool_idle_timeout(Duration::from_millis(2_500))
    // Sets the maximum idle Keep-Alive connections allowed in the pool.
    .pool_max_idle_per_host(4)
    .build(connector);

let client = Client::with_http_client(hyper_client).with_url("http://localhost:8123");
참고

이 예제는 레거시 Hyper API에 의존하며, 향후 변경될 수 있습니다.

또한 클라이언트 저장소의 custom HTTP client example도 참고하십시오.

데이터 타입

  • (U)Int(8|16|32|64|128)은 해당하는 (u|i)(8|16|32|64|128) 타입이나 이를 감싸는 newtype과 상호 변환됩니다.

  • (U)Int256은 직접 지원되지 않지만, 이를 위한 우회 방법이 있습니다.

  • Float(32|64)은 해당하는 f(32|64) 또는 이를 감싸는 newtype과 상호 변환됩니다.

  • Decimal(32|64|128)은 해당하는 i(32|64|128) 또는 이를 감싸는 newtype과 상호 변환됩니다. 부호가 있는 고정 소수점 수를 위해서는 fixnum 또는 다른 구현을 사용하는 것이 더 편리합니다.

  • Booleanbool 또는 이를 감싸는 newtype과 상호 변환됩니다.

  • String&str, &[u8], String, Vec<u8> 또는 SmartString과 같이 임의의 문자열 또는 바이트 타입과 상호 변환됩니다. 새로운 타입도 지원됩니다. 바이트를 저장할 때는 더 효율적이므로 serde_bytes 사용을 고려하는 것이 좋습니다.

#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow<'a> {
    str: &'a str,
    string: String,
    #[serde(with = "serde_bytes")]
    bytes: Vec<u8>,
    #[serde(with = "serde_bytes")]
    byte_slice: &'a [u8],
}
  • FixedString(N)은(는) 바이트 배열(예: [u8; N])로 지원됩니다.
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow {
    fixed_str: [u8; 16], // FixedString(16)
}
  • Enum(8|16) 타입은 serde_repr를 사용해 지원합니다.
use serde_repr::{Deserialize_repr, Serialize_repr};

#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    level: Level,
}

#[derive(Debug, Serialize_repr, Deserialize_repr)]
#[repr(u8)]
enum Level {
    Debug = 1,
    Info = 2,
    Warn = 3,
    Error = 4,
}
  • UUIDserde::uuid를 사용하여 uuid::Uuid 타입과 상호 변환합니다. uuid feature가 필요합니다.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(with = "clickhouse::serde::uuid")]
    uuid: uuid::Uuid,
}
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(with = "clickhouse::serde::ipv4")]
    ipv4: std::net::Ipv4Addr,
}
  • Dateu16 또는 이를 감싼 newtype과 매핑되며, 1970-01-01 이후 경과한 일 수를 나타냅니다. 또한 time::Dateserde::time::date를 사용하여 지원되며, 이를 사용하려면 time feature가 필요합니다.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    days: u16,
    #[serde(with = "clickhouse::serde::time::date")]
    date: Date,
}
  • Date32i32 또는 이를 감싼 newtype과 상호 변환되며, 1970-01-01 이후 경과한 일 수를 나타냅니다. 또한 time::Dateserde::time::date32를 사용하여 지원되며, 이를 위해서는 time feature가 필요합니다.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    days: i32,
    #[serde(with = "clickhouse::serde::time::date32")]
    date: Date,
}
  • DateTimeu32 또는 이를 감싼 newtype과 상호 매핑되며, UNIX epoch 이후 경과한 초(초 단위)의 값을 나타냅니다. 또한 time::OffsetDateTimetime feature가 필요한 serde::time::datetime을 사용하여 지원됩니다.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    ts: u32,
    #[serde(with = "clickhouse::serde::time::datetime")]
    dt: OffsetDateTime,
}
  • DateTime64(_)i32 또는 이를 감싼 뉴타입과 상호 변환되며 UNIX epoch 이후 경과한 시간을 나타냅니다. 또한 time::OffsetDateTimeserde::time::datetime64::*를 사용할 때 지원되며, 이를 위해 time feature가 필요합니다.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    ts: i64, // elapsed s/us/ms/ns depending on `DateTime64(X)`
    #[serde(with = "clickhouse::serde::time::datetime64::secs")]
    dt64s: OffsetDateTime,  // `DateTime64(0)`
    #[serde(with = "clickhouse::serde::time::datetime64::millis")]
    dt64ms: OffsetDateTime, // `DateTime64(3)`
    #[serde(with = "clickhouse::serde::time::datetime64::micros")]
    dt64us: OffsetDateTime, // `DateTime64(6)`
    #[serde(with = "clickhouse::serde::time::datetime64::nanos")]
    dt64ns: OffsetDateTime, // `DateTime64(9)`
}
  • Tuple(A, B, ...)(A, B, ...) 또는 이를 감싼 newtype과 상호 변환됩니다.
  • Array(_)는 예를 들어 Vec<_>, &[_]와 같은 임의의 슬라이스와 상호 변환됩니다. 새로운 타입도 지원됩니다.
  • Map(K, V)Array((K, V))처럼 동작합니다.
  • LowCardinality(_)는 별도 처리가 필요 없이 원활하게 지원됩니다.
  • Nullable(_)Option<_>와 상호 변환됩니다. clickhouse::serde::* 헬퍼의 경우 ::option을 추가하십시오.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(with = "clickhouse::serde::ipv4::option")]
    ipv4_opt: Option<Ipv4Addr>,
}
  • Nested는 여러 배열을 제공하고 이름을 변경하는 방식으로 지원합니다.
// CREATE TABLE test(items Nested(name String, count UInt32))
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(rename = "items.name")]
    items_name: Vec<String>,
    #[serde(rename = "items.count")]
    items_count: Vec<u32>,
}
  • Geo 타입이 지원됩니다. Point는 튜플 (f64, f64)처럼 동작하며, 나머지 타입들은 모두 Point의 슬라이스입니다.
type Point = (f64, f64);
type Ring = Vec<Point>;
type Polygon = Vec<Ring>;
type MultiPolygon = Vec<Polygon>;
type LineString = Vec<Point>;
type MultiLineString = Vec<LineString>;

#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    point: Point,
    ring: Ring,
    polygon: Polygon,
    multi_polygon: MultiPolygon,
    line_string: LineString,
    multi_line_string: MultiLineString,
}
  • Variant, Dynamic, (new) JSON 데이터 타입은 아직 지원되지 않습니다.

모킹

이 크레이트는 ClickHouse 서버를 모킹하고 DDL, SELECT, INSERT, WATCH 쿼리를 테스트하기 위한 유틸리티를 제공합니다. 이 기능은 test-util 기능 플래그로 활성화할 수 있습니다. 반드시 개발 의존성(dev-dependency)으로만 사용하십시오.

예제를 참고하십시오.

문제 해결

CANNOT_READ_ALL_DATA

CANNOT_READ_ALL_DATA 오류의 가장 일반적인 원인은 애플리케이션 측의 행(row) 정의가 ClickHouse의 행 정의와 일치하지 않는 것입니다.

다음 테이블을 살펴보십시오:

CREATE OR REPLACE TABLE event_log (id UInt32)
ENGINE = MergeTree
ORDER BY timestamp

그런 다음, 예를 들어 애플리케이션 측에서 EventLog가 타입이 일치하지 않게 정의되어 있는 경우:

#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
    id: String, // <- should be u32 instead!
}

데이터를 삽입할 때 다음 오류가 발생할 수 있습니다:

Error: BadResponse("Code: 33. DB::Exception: Cannot read all data. Bytes read: 5. Bytes expected: 23.: (at row 1)\n: While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)")

이 예제에서는 EventLog struct를 올바르게 정의하면 문제가 해결됩니다.

#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
    id: u32
}

알려진 제한 사항

  • Variant, Dynamic, (new) JSON 데이터 타입은 아직 지원되지 않습니다.
  • 서버 측 매개변수 바인딩은 아직 지원되지 않습니다. 진행 상황은 해당 이슈에서 확인하십시오.

문의

질문이 있거나 도움이 필요하시면 Community Slack 또는 GitHub issues를 통해 언제든지 문의해 주십시오.