본문으로 바로가기
본문으로 바로가기

RabbitMQ table engine

이 엔진은 ClickHouse를 RabbitMQ와 통합합니다.

RabbitMQ로 다음 작업을 수행할 수 있습니다:

  • 데이터 스트림을 발행하거나 구독할 수 있습니다.
  • 스트림이 도착하는 대로 처리할 수 있습니다.

테이블 생성

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1],
    name2 [type2],
    ...
) ENGINE = RabbitMQ SETTINGS
    rabbitmq_host_port = 'host:port' [or rabbitmq_address = 'amqp(s)://guest:guest@localhost/vhost'],
    rabbitmq_exchange_name = 'exchange_name',
    rabbitmq_format = 'data_format'[,]
    [rabbitmq_exchange_type = 'exchange_type',]
    [rabbitmq_routing_key_list = 'key1,key2,...',]
    [rabbitmq_secure = 0,]
    [rabbitmq_schema = '',]
    [rabbitmq_num_consumers = N,]
    [rabbitmq_num_queues = N,]
    [rabbitmq_queue_base = 'queue',]
    [rabbitmq_deadletter_exchange = 'dl-exchange',]
    [rabbitmq_persistent = 0,]
    [rabbitmq_skip_broken_messages = N,]
    [rabbitmq_max_block_size = N,]
    [rabbitmq_flush_interval_ms = N,]
    [rabbitmq_queue_settings_list = 'x-dead-letter-exchange=my-dlx,x-max-length=10,x-overflow=reject-publish',]
    [rabbitmq_queue_consume = false,]
    [rabbitmq_address = '',]
    [rabbitmq_vhost = '/',]
    [rabbitmq_username = '',]
    [rabbitmq_password = '',]
    [rabbitmq_commit_on_select = false,]
    [rabbitmq_max_rows_per_message = 1,]
    [rabbitmq_handle_error_mode = 'default']

필수 매개변수:

  • rabbitmq_host_port – 호스트:포트 (예: localhost:5672).
  • rabbitmq_exchange_name – RabbitMQ exchange 이름.
  • rabbitmq_format – 메시지 형식입니다. SQL FORMAT FUNCTION과 동일한 표기법을 사용하며, 예를 들어 JSONEachRow와 같은 형식을 지정합니다. 자세한 내용은 Formats 섹션을 참조하십시오.

선택적 매개변수:

  • rabbitmq_exchange_type – RabbitMQ exchange의 유형입니다: direct, fanout, topic, headers, consistent_hash. 기본값: fanout.
  • rabbitmq_routing_key_list – 라우팅 키의 콤마로 구분된 목록입니다.
  • rabbitmq_schema – 포맷에서 스키마 정의가 필요한 경우 반드시 사용해야 하는 파라미터입니다. 예를 들어 Cap'n Proto는 스키마 파일의 경로와 루트 schema.capnp:Message 객체의 이름을 필요로 합니다.
  • rabbitmq_num_consumers – 테이블당 consumer 수입니다. 하나의 consumer 처리량이 충분하지 않은 경우 더 많은 consumer를 지정하십시오. 기본값: 1
  • rabbitmq_num_queues – 전체 queue 개수입니다. 이 수를 늘리면 성능이 크게 향상될 수 있습니다. 기본값: 1.
  • rabbitmq_queue_base - queue 이름에 대한 힌트를 지정합니다. 이 설정의 사용 예시는 아래에 설명합니다.
  • rabbitmq_persistent - 1(true)로 설정하면 insert 쿼리의 delivery mode가 2로 설정되어(메시지를 'persistent'로 표시) 전송됩니다. 기본값: 0.
  • rabbitmq_skip_broken_messages – 블록마다 스키마와 호환되지 않는 메시지에 대한 RabbitMQ 메시지 파서 허용치입니다. rabbitmq_skip_broken_messages = N이면 파싱할 수 없는 RabbitMQ 메시지 N개(메시지 1개는 데이터 1행에 해당)를 엔진이 건너뜁니다. 기본값: 0.
  • rabbitmq_max_block_size - RabbitMQ에서 데이터를 플러시하기 전에 수집되는 행 개수입니다. 기본값: max_insert_block_size.
  • rabbitmq_flush_interval_ms - RabbitMQ에서 데이터를 플러시하기 위한 타임아웃입니다. 기본값: stream_flush_interval_ms.
  • rabbitmq_queue_settings_list - queue를 생성할 때 RabbitMQ 설정을 지정할 수 있습니다. 사용 가능한 설정: x-max-length, x-max-length-bytes, x-message-ttl, x-expires, x-priority, x-max-priority, x-overflow, x-dead-letter-exchange, x-queue-type. durable 설정은 queue에 대해 자동으로 활성화됩니다.
  • rabbitmq_address - 연결에 사용할 주소입니다. 이 설정 또는 rabbitmq_host_port 중 하나를 사용하십시오.
  • rabbitmq_vhost - RabbitMQ vhost입니다. 기본값: '\'.
  • rabbitmq_queue_consume - 사용자 정의 queue를 사용하고, exchange, queue, binding 선언과 같은 RabbitMQ 설정을 수행하지 않습니다. 기본값: false.
  • rabbitmq_username - RabbitMQ 사용자 이름입니다.
  • rabbitmq_password - RabbitMQ 비밀번호입니다.
  • reject_unhandled_messages - 오류가 발생한 경우 메시지를 거부하고(RabbitMQ negative acknowledgement 전송) 처리하지 않습니다. rabbitmq_queue_settings_listx-dead-letter-exchange가 정의되어 있으면 이 설정이 자동으로 활성화됩니다.
  • rabbitmq_commit_on_select - select 쿼리가 수행될 때 메시지를 커밋합니다. 기본값: false.
  • rabbitmq_max_rows_per_message — 행 기반 포맷에서 하나의 RabbitMQ 메시지에 기록되는 최대 행 수입니다. 기본값: 1.
  • rabbitmq_empty_queue_backoff_start_ms — RabbitMQ queue가 비어 있을 때 읽기 작업을 다시 예약하기 위한 backoff 시작 지점입니다.
  • rabbitmq_empty_queue_backoff_end_ms — RabbitMQ queue가 비어 있을 때 읽기 작업을 다시 예약하기 위한 backoff 종료 지점입니다.
  • rabbitmq_empty_queue_backoff_step_ms — RabbitMQ queue가 비어 있을 때 읽기 작업을 다시 예약하기 위한 backoff 단계입니다.
  • rabbitmq_handle_error_mode — RabbitMQ 엔진에서 오류를 처리하는 방법입니다. 가능한 값: default(메시지 파싱에 실패하면 예외가 발생), stream(예외 메시지와 원본 메시지가 가상 컬럼 _error_raw_message에 저장됨), dead_letter_queue(오류 관련 데이터가 system.dead_letter_queue에 저장됨).

SSL 연결

연결 주소에서 rabbitmq_secure = 1 또는 amqps 중 하나를 사용할 수 있습니다: rabbitmq_address = 'amqps://guest:guest@localhost/vhost'. 사용 중인 라이브러리의 기본 동작은 생성된 TLS 연결이 충분히 안전한지 여부를 검사하지 않는 것입니다. 인증서가 만료되었거나, 자체 서명되었거나, 누락되었거나, 유효하지 않더라도 연결은 그대로 허용됩니다. 인증서에 대한 보다 엄격한 검사는 향후에 구현될 수 있습니다.

또한 RabbitMQ 관련 설정과 함께 format 설정을 추가할 수 있습니다.

예:

  CREATE TABLE queue (
    key UInt64,
    value UInt64,
    date DateTime
  ) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672',
                            rabbitmq_exchange_name = 'exchange1',
                            rabbitmq_format = 'JSONEachRow',
                            rabbitmq_num_consumers = 5,
                            date_time_input_format = 'best_effort';

RabbitMQ 서버 설정은 ClickHouse 설정 파일에 추가해야 합니다.

필수 설정:

 <rabbitmq>
    <username>root</username>
    <password>clickhouse</password>
 </rabbitmq>

추가 설정:

 <rabbitmq>
    <vhost>clickhouse</vhost>
 </rabbitmq>

설명

각 메시지는 한 번만 읽을 수 있기 때문에 SELECT는 (디버깅을 제외하면) 메시지를 읽는 용도로는 그다지 유용하지 않습니다. 대신 materialized view를 사용하여 실시간 스레드를 만드는 것이 더 실용적입니다. 이를 위해서는 다음 단계를 수행합니다:

  1. 엔진을 사용해 RabbitMQ consumer를 생성하고 이를 데이터 스트림으로 간주합니다.
  2. 원하는 구조의 테이블을 생성합니다.
  3. 엔진에서 전달된 데이터를 변환해 앞에서 생성한 테이블에 넣는 materialized view를 생성합니다.

MATERIALIZED VIEW가 엔진에 연결되면 백그라운드에서 데이터를 수집하기 시작합니다. 이를 통해 RabbitMQ로부터 지속적으로 메시지를 수신하고 SELECT를 사용해 필요한 형식으로 변환할 수 있습니다. 하나의 RabbitMQ 테이블에는 원하는 만큼 materialized view를 연결할 수 있습니다.

데이터는 rabbitmq_exchange_type과 지정된 rabbitmq_routing_key_list에 따라 분기할 수 있습니다. 테이블당 exchange는 하나만 사용할 수 있습니다. 하나의 exchange를 여러 테이블이 공유할 수 있으며, 이를 통해 여러 테이블로 동시에 라우팅할 수 있습니다.

Exchange 유형 옵션:

  • direct - 키의 정확한 일치에 기반해 라우팅합니다. 예시 테이블 키 목록: key1,key2,key3,key4,key5, 메시지 키는 이 중 아무 값이나 될 수 있습니다.
  • fanout - 키와 상관없이 (exchange 이름이 같은) 모든 테이블로 라우팅합니다.
  • topic - 점(.)으로 구분된 키 패턴에 기반해 라우팅합니다. 예: *.logs, records.*.*.2020, *.2018,*.2019,*.2020.
  • headers - x-match=all 또는 x-match=any 설정과 함께 key=value 일치에 기반해 라우팅합니다. 예시 테이블 키 목록: x-match=all,format=logs,type=report,year=2020.
  • consistent_hash - 모든 바인딩된 테이블(같은 exchange 이름을 가진 테이블) 사이에 데이터를 균등하게 분배합니다. 이 exchange 유형은 RabbitMQ 플러그인을 통해 활성화해야 합니다: rabbitmq-plugins enable rabbitmq_consistent_hash_exchange.

rabbitmq_queue_base 설정은 다음과 같은 경우에 사용할 수 있습니다:

  • 서로 다른 테이블이 큐를 공유하게 하여, 동일한 큐에 대해 여러 consumer를 등록함으로써 성능을 향상시킬 수 있습니다. rabbitmq_num_consumers 및/또는 rabbitmq_num_queues 설정을 사용하는 경우, 이 파라미터들이 동일하면 큐 이름이 정확히 일치하게 됩니다.
  • 일부 메시지가 성공적으로 소비되지 못한 경우, 특정 durable 큐에 대한 읽기를 복원할 수 있습니다. 하나의 특정 큐에서 소비를 재개하려면 해당 큐 이름을 rabbitmq_queue_base 설정에 지정하고, rabbitmq_num_consumersrabbitmq_num_queues는 지정하지 않습니다(기본값 1). 특정 테이블에 대해 선언되었던 모든 큐에서 소비를 재개하려면 rabbitmq_queue_base, rabbitmq_num_consumers, rabbitmq_num_queues에 동일한 설정을 지정하면 됩니다. 기본적으로 큐 이름은 테이블마다 고유하게 설정됩니다.
  • 큐가 durable로 선언되어 있고 자동 삭제되지 않으므로 이 큐들을 재사용할 수 있게 합니다. (RabbitMQ CLI 도구를 통해 삭제할 수 있습니다.)

성능을 향상시키기 위해, 수신된 메시지는 max_insert_block_size 크기의 블록으로 그룹화됩니다. stream_flush_interval_ms 밀리초 내에 블록이 구성되지 않으면, 블록의 완전성과 관계없이 데이터가 테이블로 플러시됩니다.

rabbitmq_exchange_type과 함께 rabbitmq_num_consumers 및/또는 rabbitmq_num_queues 설정이 지정된 경우:

  • rabbitmq-consistent-hash-exchange 플러그인이 활성화되어 있어야 합니다.
  • 발행된 메시지의 message_id 프로퍼티가 지정되어 있어야 합니다(각 메시지/배치마다 고유).

INSERT 쿼리에서는 발행된 각 메시지에 대해 messageIDrepublished 플래그(한 번 이상 발행되었으면 true) 같은 메시지 메타데이터가 추가되며, 이는 메시지 헤더를 통해 접근할 수 있습니다.

동일한 테이블을 INSERT와 materialized view 둘 다에 사용하지 마십시오.

예시:

  CREATE TABLE queue (
    key UInt64,
    value UInt64
  ) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672',
                            rabbitmq_exchange_name = 'exchange1',
                            rabbitmq_exchange_type = 'headers',
                            rabbitmq_routing_key_list = 'format=logs,type=report,year=2020',
                            rabbitmq_format = 'JSONEachRow',
                            rabbitmq_num_consumers = 5;

  CREATE TABLE daily (key UInt64, value UInt64)
    ENGINE = MergeTree() ORDER BY key;

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT key, value FROM queue;

  SELECT key, value FROM daily ORDER BY key;

가상 컬럼

  • _exchange_name - RabbitMQ 익스체인지 이름. 데이터 타입: String.
  • _channel_id - 메시지를 수신한 컨슈머가 선언된 ChannelID. 데이터 타입: String.
  • _delivery_tag - 수신한 메시지의 DeliveryTag. 채널 단위로 범위가 지정됩니다. 데이터 타입: UInt64.
  • _redelivered - 메시지의 redelivered 플래그. 데이터 타입: UInt8.
  • _message_id - 수신한 메시지의 messageID. 메시지가 발행될 때 설정되었다면 비어 있지 않습니다. 데이터 타입: String.
  • _timestamp - 수신한 메시지의 타임스탬프. 메시지가 발행될 때 설정되었다면 비어 있지 않습니다. 데이터 타입: UInt64.

rabbitmq_handle_error_mode='stream'인 경우 추가로 제공되는 가상 컬럼:

  • _raw_message - 성공적으로 파싱할 수 없었던 원본 메시지. 데이터 타입: Nullable(String).
  • _error - 파싱 실패 시 발생한 예외 메시지. 데이터 타입: Nullable(String).

참고: _raw_message_error 가상 컬럼은 파싱 중 예외가 발생한 경우에만 채워지며, 메시지가 성공적으로 파싱된 경우에는 항상 NULL입니다.

주의사항

테이블 정의에서 DEFAULT, MATERIALIZED, ALIAS와 같은 기본 컬럼 표현식을 지정하더라도 이러한 표현식은 무시됩니다. 대신 컬럼은 각 타입에 대해 정의된 기본값으로 채워집니다.

데이터 포맷 지원

RabbitMQ 엔진은 ClickHouse에서 지원하는 모든 포맷을 지원합니다. 하나의 RabbitMQ 메시지에 포함되는 행 수는 포맷이 행 기반인지 블록 기반인지에 따라 달라집니다.

  • 행 기반 포맷의 경우 하나의 RabbitMQ 메시지에 포함되는 행 수는 rabbitmq_max_rows_per_message 설정으로 제어할 수 있습니다.
  • 블록 기반 포맷의 경우 블록을 더 작은 파트로 나눌 수는 없지만, 하나의 블록에 포함되는 행 수는 일반 설정 항목인 max_block_size로 제어할 수 있습니다.