본문으로 바로가기
본문으로 바로가기

Spark JDBC

ClickHouse Supported

JDBC는 Spark에서 가장 흔히 사용되는 데이터 소스 중 하나입니다. 이 섹션에서는 Spark에서 ClickHouse 공식 JDBC 커넥터를 사용하는 방법을 자세히 설명합니다.

데이터 읽기

public static void main(String[] args) {
        // Spark 세션을 초기화합니다
        SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();

        String jdbcURL = "jdbc:ch://localhost:8123/default";
        String query = "select * from example_table where id > 2";

        //---------------------------------------------------------------------------------------------------
        // jdbc 메서드를 사용하여 ClickHouse에서 테이블을 로드합니다
        //---------------------------------------------------------------------------------------------------
        Properties jdbcProperties = new Properties();
        jdbcProperties.put("user", "default");
        jdbcProperties.put("password", "123456");

        Dataset<Row> df1 = spark.read().jdbc(jdbcURL, String.format("(%s)", query), jdbcProperties);

        df1.show();

        //---------------------------------------------------------------------------------------------------
        // load 메서드를 사용하여 ClickHouse에서 테이블을 로드합니다
        //---------------------------------------------------------------------------------------------------
        Dataset<Row> df2 = spark.read()
                .format("jdbc")
                .option("url", jdbcURL)
                .option("user", "default")
                .option("password", "123456")
                .option("query", query)
                .load();

        df2.show();

        // Spark 세션을 중지합니다
        spark.stop();
    }

데이터 쓰기

 public static void main(String[] args) {
        // Initialize Spark session
        SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();

        // JDBC connection details
        String jdbcUrl = "jdbc:ch://localhost:8123/default";
        Properties jdbcProperties = new Properties();
        jdbcProperties.put("user", "default");
        jdbcProperties.put("password", "123456");

        // Create a sample DataFrame
        StructType schema = new StructType(new StructField[]{
                DataTypes.createStructField("id", DataTypes.IntegerType, false),
                DataTypes.createStructField("name", DataTypes.StringType, false)
        });

        List<Row> rows = new ArrayList<Row>();
        rows.add(RowFactory.create(1, "John"));
        rows.add(RowFactory.create(2, "Doe"));

        Dataset<Row> df = spark.createDataFrame(rows, schema);

        //---------------------------------------------------------------------------------------------------
        // Write the df to ClickHouse using the jdbc method
        //---------------------------------------------------------------------------------------------------

        df.write()
                .mode(SaveMode.Append)
                .jdbc(jdbcUrl, "example_table", jdbcProperties);

        //---------------------------------------------------------------------------------------------------
        // Write the df to ClickHouse using the save method
        //---------------------------------------------------------------------------------------------------

        df.write()
                .format("jdbc")
                .mode("append")
                .option("url", jdbcUrl)
                .option("dbtable", "example_table")
                .option("user", "default")
                .option("password", "123456")
                .save();

        // Stop the Spark session
        spark.stop();
    }

병렬성

Spark JDBC를 사용할 때 Spark는 단일 파티션을 사용하여 데이터를 읽습니다. 더 높은 동시성을 확보하려면 partitionColumn, lowerBound, upperBound, numPartitions를 지정해야 하며, 이는 여러 워커가 병렬로 데이터를 읽을 때 테이블을 어떻게 파티션할지 정의합니다. 자세한 내용은 Apache Spark의 공식 문서 JDBC configurations를 참조하십시오.

JDBC limitations

  • Spark JDBC는 ClickHouse 전용 dialect가 없어 복합 타입(MAP, ARRAY, STRUCT)을 지원하지 않습니다. 복합 타입을 완전하게 지원하려면 네이티브 Spark-ClickHouse 커넥터를 사용하십시오.
  • 현재는 JDBC를 사용하여 기존 테이블에만 데이터를 삽입할 수 있습니다. (Spark의 다른 커넥터처럼 DataFrame(DF)을 삽입할 때 테이블을 자동으로 생성하는 기능은 아직 제공되지 않습니다.)