본문으로 바로가기
본문으로 바로가기

Amazon Glue를 ClickHouse 및 Spark와 통합하기

ClickHouse Supported

Amazon Glue는 Amazon Web Services(AWS)에서 제공하는 완전 관리형 서버리스 데이터 통합 서비스입니다. 분석, 머신 러닝, 애플리케이션 개발을 위해 데이터를 탐색, 준비 및 변환하는 작업을 단순화합니다.

설치

Glue 코드를 ClickHouse와 통합하기 위해, 다음 방법 중 하나를 사용하여 Glue에서 공식 Spark 커넥터를 사용할 수 있습니다.

  • AWS Marketplace에서 ClickHouse Glue 커넥터를 설치합니다(권장).
  • Spark Connector의 JAR을 수동으로 Glue 잡에 추가합니다.
  1. 커넥터 구독

    계정에서 커넥터를 사용하려면 AWS Marketplace에서 ClickHouse AWS Glue Connector를 구독합니다.

  2. 필수 권한 부여

    최소 권한 가이드에 설명된 대로, Glue 잡의 IAM 역할에 필요한 권한이 있는지 확인합니다.

  3. 커넥터 활성화 및 커넥션 생성

    이 링크를 클릭하여 커넥터를 활성화하고 바로 커넥션을 생성할 수 있습니다. 이 링크는 주요 필드가 미리 채워진 Glue 커넥션 생성 페이지를 엽니다. 커넥션 이름을 지정한 뒤 생성 버튼을 누릅니다(이 단계에서는 ClickHouse 커넥션 세부 정보를 제공할 필요가 없습니다).

  4. Glue 잡에서 사용

    Glue 잡에서 Job details 탭을 선택한 다음 Advanced properties 창을 펼칩니다. Connections 섹션에서 방금 생성한 커넥션을 선택합니다. 커넥터는 필요한 JAR을 잡 런타임에 자동으로 주입합니다.

Glue Notebook 커넥션 구성
참고

Glue 커넥터에서 사용하는 JAR은 Spark 3.3, Scala 2, Python 3용으로 빌드되어 있습니다. Glue 잡을 구성할 때 이 버전들을 선택해야 합니다.

예제

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.clickhouseScala.Native.NativeSparkRead.spark
import org.apache.spark.sql.SparkSession

import scala.collection.JavaConverters._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

object ClickHouseGlueExample {
  def main(sysArgs: Array[String]) {
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)

    val sparkSession: SparkSession = SparkSession.builder
      .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
      .config("spark.sql.catalog.clickhouse.host", "<your-clickhouse-host>")
      .config("spark.sql.catalog.clickhouse.protocol", "https")
      .config("spark.sql.catalog.clickhouse.http_port", "<your-clickhouse-port>")
      .config("spark.sql.catalog.clickhouse.user", "default")
      .config("spark.sql.catalog.clickhouse.password", "<your-password>")
      .config("spark.sql.catalog.clickhouse.database", "default")
      // for ClickHouse cloud
      .config("spark.sql.catalog.clickhouse.option.ssl", "true")
      .config("spark.sql.catalog.clickhouse.option.ssl_mode", "NONE")
      .getOrCreate

    val glueContext = new GlueContext(sparkSession.sparkContext)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    import sparkSession.implicits._

    val url = "s3://{path_to_cell_tower_data}/cell_towers.csv.gz"

    val schema = StructType(Seq(
      StructField("radio", StringType, nullable = false),
      StructField("mcc", IntegerType, nullable = false),
      StructField("net", IntegerType, nullable = false),
      StructField("area", IntegerType, nullable = false),
      StructField("cell", LongType, nullable = false),
      StructField("unit", IntegerType, nullable = false),
      StructField("lon", DoubleType, nullable = false),
      StructField("lat", DoubleType, nullable = false),
      StructField("range", IntegerType, nullable = false),
      StructField("samples", IntegerType, nullable = false),
      StructField("changeable", IntegerType, nullable = false),
      StructField("created", TimestampType, nullable = false),
      StructField("updated", TimestampType, nullable = false),
      StructField("averageSignal", IntegerType, nullable = false)
    ))

    val df = sparkSession.read
      .option("header", "true")
      .schema(schema)
      .csv(url)

    // Write to ClickHouse
    df.writeTo("clickhouse.default.cell_towers").append()


    // Read from ClickHouse
    val dfRead = spark.sql("select * from clickhouse.default.cell_towers")
    Job.commit()
  }
}

자세한 내용은 Spark 문서를 참조하십시오.