RabbitMQ table engine
이 엔진은 ClickHouse를 RabbitMQ와 통합합니다.
RabbitMQ로 다음 작업을 수행할 수 있습니다:
- 데이터 스트림을 발행하거나 구독할 수 있습니다.
- 스트림이 도착하는 대로 처리할 수 있습니다.
테이블 생성
필수 매개변수:
rabbitmq_host_port– 호스트:포트 (예:localhost:5672).rabbitmq_exchange_name– RabbitMQ exchange 이름.rabbitmq_format– 메시지 형식입니다. SQLFORMATFUNCTION과 동일한 표기법을 사용하며, 예를 들어JSONEachRow와 같은 형식을 지정합니다. 자세한 내용은 Formats 섹션을 참조하십시오.
선택적 매개변수:
rabbitmq_exchange_type– RabbitMQ exchange의 유형입니다:direct,fanout,topic,headers,consistent_hash. 기본값:fanout.rabbitmq_routing_key_list– 라우팅 키의 콤마로 구분된 목록입니다.rabbitmq_schema– 포맷에서 스키마 정의가 필요한 경우 반드시 사용해야 하는 파라미터입니다. 예를 들어 Cap'n Proto는 스키마 파일의 경로와 루트schema.capnp:Message객체의 이름을 필요로 합니다.rabbitmq_num_consumers– 테이블당 consumer 수입니다. 하나의 consumer 처리량이 충분하지 않은 경우 더 많은 consumer를 지정하십시오. 기본값:1rabbitmq_num_queues– 전체 queue 개수입니다. 이 수를 늘리면 성능이 크게 향상될 수 있습니다. 기본값:1.rabbitmq_queue_base- queue 이름에 대한 힌트를 지정합니다. 이 설정의 사용 예시는 아래에 설명합니다.rabbitmq_persistent- 1(true)로 설정하면 insert 쿼리의 delivery mode가 2로 설정되어(메시지를 'persistent'로 표시) 전송됩니다. 기본값:0.rabbitmq_skip_broken_messages– 블록마다 스키마와 호환되지 않는 메시지에 대한 RabbitMQ 메시지 파서 허용치입니다.rabbitmq_skip_broken_messages = N이면 파싱할 수 없는 RabbitMQ 메시지 N개(메시지 1개는 데이터 1행에 해당)를 엔진이 건너뜁니다. 기본값:0.rabbitmq_max_block_size- RabbitMQ에서 데이터를 플러시하기 전에 수집되는 행 개수입니다. 기본값: max_insert_block_size.rabbitmq_flush_interval_ms- RabbitMQ에서 데이터를 플러시하기 위한 타임아웃입니다. 기본값: stream_flush_interval_ms.rabbitmq_queue_settings_list- queue를 생성할 때 RabbitMQ 설정을 지정할 수 있습니다. 사용 가능한 설정:x-max-length,x-max-length-bytes,x-message-ttl,x-expires,x-priority,x-max-priority,x-overflow,x-dead-letter-exchange,x-queue-type.durable설정은 queue에 대해 자동으로 활성화됩니다.rabbitmq_address- 연결에 사용할 주소입니다. 이 설정 또는rabbitmq_host_port중 하나를 사용하십시오.rabbitmq_vhost- RabbitMQ vhost입니다. 기본값:'\'.rabbitmq_queue_consume- 사용자 정의 queue를 사용하고, exchange, queue, binding 선언과 같은 RabbitMQ 설정을 수행하지 않습니다. 기본값:false.rabbitmq_username- RabbitMQ 사용자 이름입니다.rabbitmq_password- RabbitMQ 비밀번호입니다.reject_unhandled_messages- 오류가 발생한 경우 메시지를 거부하고(RabbitMQ negative acknowledgement 전송) 처리하지 않습니다.rabbitmq_queue_settings_list에x-dead-letter-exchange가 정의되어 있으면 이 설정이 자동으로 활성화됩니다.rabbitmq_commit_on_select- select 쿼리가 수행될 때 메시지를 커밋합니다. 기본값:false.rabbitmq_max_rows_per_message— 행 기반 포맷에서 하나의 RabbitMQ 메시지에 기록되는 최대 행 수입니다. 기본값:1.rabbitmq_empty_queue_backoff_start_ms— RabbitMQ queue가 비어 있을 때 읽기 작업을 다시 예약하기 위한 backoff 시작 지점입니다.rabbitmq_empty_queue_backoff_end_ms— RabbitMQ queue가 비어 있을 때 읽기 작업을 다시 예약하기 위한 backoff 종료 지점입니다.rabbitmq_empty_queue_backoff_step_ms— RabbitMQ queue가 비어 있을 때 읽기 작업을 다시 예약하기 위한 backoff 단계입니다.rabbitmq_handle_error_mode— RabbitMQ 엔진에서 오류를 처리하는 방법입니다. 가능한 값: default(메시지 파싱에 실패하면 예외가 발생), stream(예외 메시지와 원본 메시지가 가상 컬럼_error및_raw_message에 저장됨), dead_letter_queue(오류 관련 데이터가 system.dead_letter_queue에 저장됨).
SSL 연결
연결 주소에서 rabbitmq_secure = 1 또는 amqps 중 하나를 사용할 수 있습니다: rabbitmq_address = 'amqps://guest:guest@localhost/vhost'.
사용 중인 라이브러리의 기본 동작은 생성된 TLS 연결이 충분히 안전한지 여부를 검사하지 않는 것입니다. 인증서가 만료되었거나, 자체 서명되었거나, 누락되었거나, 유효하지 않더라도 연결은 그대로 허용됩니다. 인증서에 대한 보다 엄격한 검사는 향후에 구현될 수 있습니다.
또한 RabbitMQ 관련 설정과 함께 format 설정을 추가할 수 있습니다.
예:
RabbitMQ 서버 설정은 ClickHouse 설정 파일에 추가해야 합니다.
필수 설정:
추가 설정:
설명
각 메시지는 한 번만 읽을 수 있기 때문에 SELECT는 (디버깅을 제외하면) 메시지를 읽는 용도로는 그다지 유용하지 않습니다. 대신 materialized view를 사용하여 실시간 스레드를 만드는 것이 더 실용적입니다. 이를 위해서는 다음 단계를 수행합니다:
- 엔진을 사용해 RabbitMQ consumer를 생성하고 이를 데이터 스트림으로 간주합니다.
- 원하는 구조의 테이블을 생성합니다.
- 엔진에서 전달된 데이터를 변환해 앞에서 생성한 테이블에 넣는 materialized view를 생성합니다.
MATERIALIZED VIEW가 엔진에 연결되면 백그라운드에서 데이터를 수집하기 시작합니다. 이를 통해 RabbitMQ로부터 지속적으로 메시지를 수신하고 SELECT를 사용해 필요한 형식으로 변환할 수 있습니다.
하나의 RabbitMQ 테이블에는 원하는 만큼 materialized view를 연결할 수 있습니다.
데이터는 rabbitmq_exchange_type과 지정된 rabbitmq_routing_key_list에 따라 분기할 수 있습니다.
테이블당 exchange는 하나만 사용할 수 있습니다. 하나의 exchange를 여러 테이블이 공유할 수 있으며, 이를 통해 여러 테이블로 동시에 라우팅할 수 있습니다.
Exchange 유형 옵션:
direct- 키의 정확한 일치에 기반해 라우팅합니다. 예시 테이블 키 목록:key1,key2,key3,key4,key5, 메시지 키는 이 중 아무 값이나 될 수 있습니다.fanout- 키와 상관없이 (exchange 이름이 같은) 모든 테이블로 라우팅합니다.topic- 점(.)으로 구분된 키 패턴에 기반해 라우팅합니다. 예:*.logs,records.*.*.2020,*.2018,*.2019,*.2020.headers-x-match=all또는x-match=any설정과 함께key=value일치에 기반해 라우팅합니다. 예시 테이블 키 목록:x-match=all,format=logs,type=report,year=2020.consistent_hash- 모든 바인딩된 테이블(같은 exchange 이름을 가진 테이블) 사이에 데이터를 균등하게 분배합니다. 이 exchange 유형은 RabbitMQ 플러그인을 통해 활성화해야 합니다:rabbitmq-plugins enable rabbitmq_consistent_hash_exchange.
rabbitmq_queue_base 설정은 다음과 같은 경우에 사용할 수 있습니다:
- 서로 다른 테이블이 큐를 공유하게 하여, 동일한 큐에 대해 여러 consumer를 등록함으로써 성능을 향상시킬 수 있습니다.
rabbitmq_num_consumers및/또는rabbitmq_num_queues설정을 사용하는 경우, 이 파라미터들이 동일하면 큐 이름이 정확히 일치하게 됩니다. - 일부 메시지가 성공적으로 소비되지 못한 경우, 특정 durable 큐에 대한 읽기를 복원할 수 있습니다. 하나의 특정 큐에서 소비를 재개하려면 해당 큐 이름을
rabbitmq_queue_base설정에 지정하고,rabbitmq_num_consumers및rabbitmq_num_queues는 지정하지 않습니다(기본값 1). 특정 테이블에 대해 선언되었던 모든 큐에서 소비를 재개하려면rabbitmq_queue_base,rabbitmq_num_consumers,rabbitmq_num_queues에 동일한 설정을 지정하면 됩니다. 기본적으로 큐 이름은 테이블마다 고유하게 설정됩니다. - 큐가 durable로 선언되어 있고 자동 삭제되지 않으므로 이 큐들을 재사용할 수 있게 합니다. (RabbitMQ CLI 도구를 통해 삭제할 수 있습니다.)
성능을 향상시키기 위해, 수신된 메시지는 max_insert_block_size 크기의 블록으로 그룹화됩니다. stream_flush_interval_ms 밀리초 내에 블록이 구성되지 않으면, 블록의 완전성과 관계없이 데이터가 테이블로 플러시됩니다.
rabbitmq_exchange_type과 함께 rabbitmq_num_consumers 및/또는 rabbitmq_num_queues 설정이 지정된 경우:
rabbitmq-consistent-hash-exchange플러그인이 활성화되어 있어야 합니다.- 발행된 메시지의
message_id프로퍼티가 지정되어 있어야 합니다(각 메시지/배치마다 고유).
INSERT 쿼리에서는 발행된 각 메시지에 대해 messageID와 republished 플래그(한 번 이상 발행되었으면 true) 같은 메시지 메타데이터가 추가되며, 이는 메시지 헤더를 통해 접근할 수 있습니다.
동일한 테이블을 INSERT와 materialized view 둘 다에 사용하지 마십시오.
예시:
가상 컬럼
_exchange_name- RabbitMQ 익스체인지 이름. 데이터 타입:String._channel_id- 메시지를 수신한 컨슈머가 선언된 ChannelID. 데이터 타입:String._delivery_tag- 수신한 메시지의 DeliveryTag. 채널 단위로 범위가 지정됩니다. 데이터 타입:UInt64._redelivered- 메시지의redelivered플래그. 데이터 타입:UInt8._message_id- 수신한 메시지의 messageID. 메시지가 발행될 때 설정되었다면 비어 있지 않습니다. 데이터 타입:String._timestamp- 수신한 메시지의 타임스탬프. 메시지가 발행될 때 설정되었다면 비어 있지 않습니다. 데이터 타입:UInt64.
rabbitmq_handle_error_mode='stream'인 경우 추가로 제공되는 가상 컬럼:
_raw_message- 성공적으로 파싱할 수 없었던 원본 메시지. 데이터 타입:Nullable(String)._error- 파싱 실패 시 발생한 예외 메시지. 데이터 타입:Nullable(String).
참고: _raw_message와 _error 가상 컬럼은 파싱 중 예외가 발생한 경우에만 채워지며, 메시지가 성공적으로 파싱된 경우에는 항상 NULL입니다.
주의사항
테이블 정의에서 DEFAULT, MATERIALIZED, ALIAS와 같은 기본 컬럼 표현식을 지정하더라도 이러한 표현식은 무시됩니다. 대신 컬럼은 각 타입에 대해 정의된 기본값으로 채워집니다.
데이터 포맷 지원
RabbitMQ 엔진은 ClickHouse에서 지원하는 모든 포맷을 지원합니다. 하나의 RabbitMQ 메시지에 포함되는 행 수는 포맷이 행 기반인지 블록 기반인지에 따라 달라집니다.
- 행 기반 포맷의 경우 하나의 RabbitMQ 메시지에 포함되는 행 수는
rabbitmq_max_rows_per_message설정으로 제어할 수 있습니다. - 블록 기반 포맷의 경우 블록을 더 작은 파트로 나눌 수는 없지만, 하나의 블록에 포함되는 행 수는 일반 설정 항목인 max_block_size로 제어할 수 있습니다.