본문으로 바로가기
본문으로 바로가기

AvroConfluent

입력출력별칭

설명

Apache Avro는 이진 인코딩을 사용하여 효율적인 데이터 처리를 지원하는 행 지향 직렬화 포맷입니다. AvroConfluent 포맷은 Confluent Schema Registry (또는 API 호환 서비스)를 사용해 직렬화된, 단일 객체 Avro로 인코딩된 Kafka 메시지의 디코딩을 지원합니다.

각 Avro 메시지에는 ClickHouse가 설정된 스키마 레지스트리를 쿼리하여 자동으로 조회하는 스키마 ID가 포함됩니다. 한 번 조회된 스키마는 최적의 성능을 위해 캐시됩니다.

데이터 타입 매핑

아래 표는 Apache Avro 형식에서 지원되는 모든 데이터 타입과 INSERTSELECT 쿼리에서 이에 대응하는 ClickHouse 데이터 타입을 보여줍니다.

Avro 데이터 타입 INSERTClickHouse 데이터 타입Avro 데이터 타입 SELECT
boolean, int, long, float, doubleInt(8\16\32), UInt(8\16\32)int
boolean, int, long, float, doubleInt64, UInt64long
boolean, int, long, float, doubleFloat32float
boolean, int, long, float, doubleFloat64double
bytes, string, fixed, enumStringbytes 또는 string *
bytes, string, fixedFixedString(N)fixed(N)
enumEnum(8\16)enum
array(T)Array(T)array(T)
map(V, K)Map(V, K)map(string, K)
union(null, T), union(T, null)Nullable(T)union(null, T)
union(T1, T2, …) **Variant(T1, T2, …)union(T1, T2, …) **
nullNullable(Nothing)null
int (date) ***Date, Date32int (date) ***
long (timestamp-millis) ***DateTime64(3)long (timestamp-millis) ***
long (timestamp-micros) ***DateTime64(6)long (timestamp-micros) ***
bytes (decimal) ***DateTime64(N)bytes (decimal) ***
intIPv4int
fixed(16)IPv6fixed(16)
bytes (decimal) ***Decimal(P, S)bytes (decimal) ***
string (uuid) ***UUIDstring (uuid) ***
fixed(16)Int128/UInt128fixed(16)
fixed(32)Int256/UInt256fixed(32)
recordTuplerecord

* 기본값은 bytes이며, output_format_avro_string_column_pattern 설정으로 제어됩니다.

** Variant type은 필드 값으로 null을 암묵적으로 허용하므로, 예를 들어 Avro union(T1, T2, null)Variant(T1, T2)로 변환됩니다. 따라서 ClickHouse에서 Avro를 생성할 때에는 스키마 추론 과정에서 어떤 값이 실제로 null인지 알 수 없으므로 Avro union 타입 집합에 항상 null 타입을 포함해야 합니다.

*** Avro logical types

지원되지 않는 Avro logical 데이터 타입:

  • time-millis
  • time-micros
  • duration

포맷 설정

Setting설명기본값
input_format_avro_allow_missing_fields스키마에서 필드를 찾을 수 없는 경우 오류를 발생시키는 대신 기본값을 사용할지 여부입니다.0
input_format_avro_null_as_default널을 허용하지 않는 컬럼에 null 값을 삽입할 때 오류를 발생시키는 대신 기본값을 사용할지 여부입니다.0
format_avro_schema_registry_urlConfluent Schema Registry URL입니다. Basic authentication을 사용하는 경우, URL 인코딩된 자격 증명을 URL 경로에 직접 포함할 수 있습니다.

예제

스키마 레지스트리 사용

Avro로 인코딩된 Kafka 토픽을 Kafka 테이블 엔진으로 읽으려면, 스키마 레지스트리의 URL을 지정하기 위해 format_avro_schema_registry_url 설정을 사용하십시오.

CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';

SELECT * FROM topic1_stream;

기본 인증 사용

스키마 레지스트리에 기본 인증이 필요한 경우(예: Confluent Cloud를 사용하는 경우), format_avro_schema_registry_url 설정에 URL 인코딩된 자격 증명을 제공할 수 있습니다.

CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';

문제 해결

Kafka consumer의 오류를 디버깅하고 수집 진행 상황을 모니터링하려면 system.kafka_consumers system table에서 쿼리를 실행합니다. 배포에 레플리카가 여러 개 있는 경우(예: ClickHouse Cloud) clusterAllReplicas 테이블 함수를 사용해야 합니다.

SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;

스키마 해석 관련 문제가 발생하면 kafkacatclickhouse-local을 사용하여 문제를 진단할 수 있습니다.

$ kafkacat -b kafka-broker  -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local   --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String"  -q 'select *  from table'
1 a
2 b
3 c