Confluent HTTP Sink 커넥터
HTTP Sink 커넥터는 데이터 유형에 구애받지 않으므로 Kafka 스키마가 필요하지 않고, 맵(Map) 및 배열(Array)과 같은 ClickHouse 특화 데이터 타입도 지원합니다. 이와 같은 추가적인 유연성으로 인해 설정이 약간 더 복잡해집니다.
아래에서는 단일 Kafka 토픽에서 메시지를 가져와 ClickHouse 테이블에 행을 삽입하는 간단한 설치 방법을 설명합니다.
HTTP 커넥터는 Confluent Enterprise License 하에 배포됩니다.
빠른 시작 단계
1. 연결 정보 준비
HTTP(S)로 ClickHouse에 연결하려면 다음 정보가 필요합니다:
| Parameter(s) | Description |
|---|---|
HOST and PORT | 일반적으로 TLS를 사용할 때는 포트가 8443이고, TLS를 사용하지 않을 때는 8123입니다. |
DATABASE NAME | 기본적으로 default라는 데이터베이스가 있으며, 연결하려는 데이터베이스의 이름을 사용합니다. |
USERNAME and PASSWORD | 기본값으로 사용자 이름은 default입니다. 사용하려는 용도에 적합한 사용자 이름을 사용합니다. |
ClickHouse Cloud 서비스에 대한 세부 정보는 ClickHouse Cloud 콘솔에서 확인할 수 있습니다. 서비스를 선택한 다음 Connect를 클릭하십시오:

HTTPS를 선택하십시오. 연결 정보는 예제 curl 명령에 표시됩니다.

자가 관리형 ClickHouse를 사용하는 경우, 연결 정보는 ClickHouse 관리자가 설정합니다.
2. Kafka Connect와 HTTP sink 커넥터 실행
다음 두 가지 옵션이 있습니다.
-
Self-managed(자가 관리형): Confluent 패키지를 다운로드하여 로컬에 설치합니다. 커넥터 설치 방법은 여기에 문서화된 내용을 따르십시오. Confluent Hub 설치 방법을 사용하는 경우 로컬 설정 파일이 업데이트됩니다.
-
Confluent Cloud: Kafka 호스팅에 Confluent Cloud를 사용하는 경우 HTTP Sink의 완전 관리형 버전을 사용할 수 있습니다. 이를 위해서는 Confluent Cloud에서 ClickHouse 환경에 접근할 수 있어야 합니다.
다음 예제에서는 Confluent Cloud를 사용합니다.
3. ClickHouse에서 대상 테이블 생성
연결 테스트에 앞서 먼저 ClickHouse Cloud에 테스트 테이블을 생성합니다. 이 테이블은 Kafka에서 전달되는 데이터를 저장합니다.
4. Configure HTTP Sink
Kafka 토픽과 HTTP Sink Connector 인스턴스를 생성합니다:

HTTP Sink Connector를 구성합니다:
- 앞에서 생성한 토픽 이름을 입력합니다.
- Authentication
HTTP Url-INSERT쿼리가 지정된 ClickHouse Cloud URL:<protocol>://<clickhouse_host>:<clickhouse_port>?query=INSERT%20INTO%20<database>.<table>%20FORMAT%20JSONEachRow. Note: 쿼리는 반드시 인코딩되어야 합니다.Endpoint Authentication type- BASICAuth username- ClickHouse 사용자 이름Auth password- ClickHouse 비밀번호
이 HTTP Url은 오류가 발생하기 쉽습니다. 문제가 발생하지 않도록 이스케이프 처리를 정확하게 하십시오.

- Configuration
Input Kafka record value format- 소스 데이터에 따라 다르지만 대부분의 경우 JSON 또는 Avro입니다. 이후 설정에서는JSON을 사용하는 것으로 가정합니다.advanced configurations섹션에서:HTTP Request Method- POST로 설정합니다.Request Body Format- jsonBatch batch size- ClickHouse 권장 사항에 따라 최소 1000 이상으로 설정합니다.Batch json as array- trueRetry on HTTP codes- 400-500으로 설정하되, 필요에 따라 조정합니다. 예를 들어 ClickHouse 앞에 HTTP 프록시가 있는 경우 값이 달라질 수 있습니다.Maximum Reties- 기본값(10)으로 적절하지만, 보다 견고한 재시도를 위해 조정해도 됩니다.

5. 연결 테스트
HTTP Sink로 구성한 토픽에 메시지를 생성합니다.

생성된 메시지가 ClickHouse 인스턴스에 정상적으로 기록되었는지 확인합니다.
문제 해결
HTTP Sink가 메시지를 배치 처리하지 않음
Sink 문서에 따르면:
HTTP Sink 커넥터는 서로 다른 Kafka 헤더 값을 포함하는 메시지에 대해서는 요청을 배치 처리하지 않습니다.
- Kafka 레코드에 동일한 키가 있는지 확인합니다.
- HTTP API URL에 매개변수를 추가하면 각 레코드가 고유한 URL을 생성할 수 있습니다. 이 때문에 추가 URL 매개변수를 사용하는 경우 배치가 비활성화됩니다.
400 잘못된 요청
CANNOT_PARSE_QUOTED_STRING
HTTP Sink가 String 컬럼에 JSON 객체를 삽입하려 할 때 다음 메시지와 함께 실패하는 경우:
URL에 input_format_json_read_objects_as_strings=1 설정을 URL 인코딩된 문자열 SETTINGS%20input_format_json_read_objects_as_strings%3D1로 지정합니다
GitHub 데이터셋 로드(선택 사항)
이 예제에서는 GitHub 데이터셋의 Array 필드를 그대로 유지합니다. 예제에서는 비어 있는 GitHub 토픽이 있다고 가정하고, Kafka로 메시지를 전송하기 위해 kcat을 사용합니다.
1. 구성 준비
단독 및 분산 클러스터 간 차이점에 유의하면서 설치 유형에 맞게 Connect를 설정하기 위해 다음 지침을 따르십시오. Confluent Cloud를 사용하는 경우 분산 구성이 해당합니다.
가장 중요한 매개변수는 http.api.url입니다. ClickHouse의 HTTP 인터페이스는 URL의 매개변수로 INSERT 문을 인코딩할 것을 요구합니다. 여기에는 형식(이 경우 JSONEachRow)과 대상 데이터베이스가 포함되어야 합니다. 형식은 HTTP 페이로드에서 문자열로 변환될 Kafka 데이터와 일치해야 합니다. 이러한 매개변수는 URL 인코딩되어야 합니다. GitHub 데이터셋에 대한 이 형식의 예시는(로컬에서 ClickHouse를 실행한다고 가정할 때) 아래와 같습니다.
다음 추가 매개변수들은 ClickHouse와 함께 HTTP Sink를 사용할 때 고려해야 합니다. 전체 매개변수 목록은 여기에서 확인할 수 있습니다:
request.method- POST로 설정합니다.retry.on.status.codes- 모든 오류 코드에서 재시도하도록 400-500으로 설정합니다. 데이터에서 예상되는 오류에 따라 값을 조정하십시오.request.body.format- 대부분의 경우 JSON을 사용합니다.auth.type- ClickHouse에 보안을 설정한 경우 BASIC으로 설정합니다. 현재 다른 ClickHouse 호환 인증 메커니즘은 지원되지 않습니다.ssl.enabled- SSL을 사용하는 경우 true로 설정합니다.connection.user- ClickHouse 사용자 이름입니다.connection.password- ClickHouse 비밀번호입니다.batch.max.size- 단일 배치로 전송할 행의 수입니다. 이 값이 충분히 큰 값으로 설정되어 있는지 확인하십시오. ClickHouse 권장 사항에 따르면 최소값으로 1000을 고려해야 합니다.tasks.max- HTTP Sink 커넥터는 하나 이상의 태스크 실행을 지원합니다. 이는 성능을 높이는 데 사용할 수 있습니다. 배치 크기와 함께 성능을 개선하는 주요 수단이 됩니다.key.converter- 키의 유형에 맞게 설정합니다.value.converter- 토픽의 데이터 유형에 따라 설정합니다. 이 데이터에는 스키마가 필요하지 않습니다. 여기에서 사용하는 포맷은http.api.url매개변수에 지정된 FORMAT과 일치해야 합니다. 가장 간단한 방법은 JSON과 org.apache.kafka.connect.json.JsonConverter 컨버터를 사용하는 것입니다. 값을 org.apache.kafka.connect.storage.StringConverter 컨버터를 통해 문자열로 처리하는 것도 가능합니다. 다만 이 경우 INSERT 문에서 함수를 사용해 값을 추출해야 합니다. 또한 io.confluent.connect.avro.AvroConverter 컨버터를 사용하는 경우 ClickHouse에서 Avro format도 지원됩니다.
프록시, 재시도, 고급 SSL 설정 방법을 포함한 전체 설정 목록은 여기에서 확인할 수 있습니다.
GitHub 샘플 데이터용 예제 설정 파일은 Connect가 standalone 모드로 실행되고 Kafka가 Confluent Cloud에서 호스팅된다고 가정할 때 여기에서 찾을 수 있습니다.
2. ClickHouse 테이블 생성
테이블이 생성되었는지 확인합니다. 아래는 표준 MergeTree를 사용한 최소 GitHub 데이터셋 예시입니다.
3. Kafka에 데이터 추가
Kafka로 메시지를 전송합니다. 아래에서는 kcat을 사용하여 10,000개의 메시지를 전송합니다.
대상 테이블 「Github」을(를) 단순 조회하면 데이터가 삽입되었는지 확인할 수 있습니다.