- 데이터 수집
- Apache Spark
- Spark JDBC
Spark JDBC
ClickHouse Supported
JDBC는 Spark에서 가장 흔히 사용되는 데이터 소스 중 하나입니다. 이 섹션에서는 Spark에서 ClickHouse 공식 JDBC 커넥터를 사용하는 방법을 자세히 설명합니다.
데이터 읽기
- Java
- Scala
- Python
- Spark SQL
public static void main(String[] args) {
// Spark 세션을 초기화합니다
SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();
String jdbcURL = "jdbc:ch://localhost:8123/default";
String query = "select * from example_table where id > 2";
//---------------------------------------------------------------------------------------------------
// jdbc 메서드를 사용하여 ClickHouse에서 테이블을 로드합니다
//---------------------------------------------------------------------------------------------------
Properties jdbcProperties = new Properties();
jdbcProperties.put("user", "default");
jdbcProperties.put("password", "123456");
Dataset<Row> df1 = spark.read().jdbc(jdbcURL, String.format("(%s)", query), jdbcProperties);
df1.show();
//---------------------------------------------------------------------------------------------------
// load 메서드를 사용하여 ClickHouse에서 테이블을 로드합니다
//---------------------------------------------------------------------------------------------------
Dataset<Row> df2 = spark.read()
.format("jdbc")
.option("url", jdbcURL)
.option("user", "default")
.option("password", "123456")
.option("query", query)
.load();
df2.show();
// Spark 세션을 중지합니다
spark.stop();
}
object ReadData extends App {
// Spark 세션을 초기화합니다
val spark: SparkSession = SparkSession.builder.appName("example").master("local").getOrCreate
val jdbcURL = "jdbc:ch://localhost:8123/default"
val query: String = "select * from example_table where id > 2"
//---------------------------------------------------------------------------------------------------
// jdbc 메서드를 사용하여 ClickHouse에서 테이블을 로드합니다
//---------------------------------------------------------------------------------------------------
val connectionProperties = new Properties()
connectionProperties.put("user", "default")
connectionProperties.put("password", "123456")
val df1: Dataset[Row] = spark.read.
jdbc(jdbcURL, s"($query)", connectionProperties)
df1.show()
//---------------------------------------------------------------------------------------------------
// load 메서드를 사용하여 ClickHouse에서 테이블을 로드합니다
//---------------------------------------------------------------------------------------------------
val df2: Dataset[Row] = spark.read
.format("jdbc")
.option("url", jdbcURL)
.option("user", "default")
.option("password", "123456")
.option("query", query)
.load()
df2.show()
// Spark 세션을 중지합니다// Spark 세션을 중지합니다
spark.stop()
}
from pyspark.sql import SparkSession
jar_files = [
"jars/clickhouse-jdbc-X.X.X-SNAPSHOT-all.jar"
]
# JAR 파일을 사용하여 Spark 세션을 초기화합니다
spark = SparkSession.builder \
.appName("example") \
.master("local") \
.config("spark.jars", ",".join(jar_files)) \
.getOrCreate()
url = "jdbc:ch://localhost:8123/default"
user = "your_user"
password = "your_password"
query = "select * from example_table where id > 2"
driver = "com.clickhouse.jdbc.ClickHouseDriver"
df = (spark.read
.format('jdbc')
.option('driver', driver)
.option('url', url)
.option('user', user)
.option('password', password).option(
'query', query).load())
df.show()
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:ch://localhost:8123/default",
dbtable "schema.tablename",
user "username",
password "password",
driver "com.clickhouse.jdbc.ClickHouseDriver"
);
SELECT * FROM jdbcTable;
데이터 쓰기
- Java
- Scala
- Python
- Spark SQL
public static void main(String[] args) {
// Initialize Spark session
SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();
// JDBC connection details
String jdbcUrl = "jdbc:ch://localhost:8123/default";
Properties jdbcProperties = new Properties();
jdbcProperties.put("user", "default");
jdbcProperties.put("password", "123456");
// Create a sample DataFrame
StructType schema = new StructType(new StructField[]{
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name", DataTypes.StringType, false)
});
List<Row> rows = new ArrayList<Row>();
rows.add(RowFactory.create(1, "John"));
rows.add(RowFactory.create(2, "Doe"));
Dataset<Row> df = spark.createDataFrame(rows, schema);
//---------------------------------------------------------------------------------------------------
// Write the df to ClickHouse using the jdbc method
//---------------------------------------------------------------------------------------------------
df.write()
.mode(SaveMode.Append)
.jdbc(jdbcUrl, "example_table", jdbcProperties);
//---------------------------------------------------------------------------------------------------
// Write the df to ClickHouse using the save method
//---------------------------------------------------------------------------------------------------
df.write()
.format("jdbc")
.mode("append")
.option("url", jdbcUrl)
.option("dbtable", "example_table")
.option("user", "default")
.option("password", "123456")
.save();
// Stop the Spark session
spark.stop();
}
object WriteData extends App {
val spark: SparkSession = SparkSession.builder.appName("example").master("local").getOrCreate
// JDBC connection details
val jdbcUrl: String = "jdbc:ch://localhost:8123/default"
val jdbcProperties: Properties = new Properties
jdbcProperties.put("user", "default")
jdbcProperties.put("password", "123456")
// Create a sample DataFrame
val rows = Seq(Row(1, "John"), Row(2, "Doe"))
val schema = List(
StructField("id", DataTypes.IntegerType, nullable = false),
StructField("name", StringType, nullable = true)
)
val df: DataFrame = spark.createDataFrame(
spark.sparkContext.parallelize(rows),
StructType(schema)
)
//---------------------------------------------------------------------------------------------------//---------------------------------------------------------------------------------------------------
// Write the df to ClickHouse using the jdbc method
//---------------------------------------------------------------------------------------------------//---------------------------------------------------------------------------------------------------
df.write
.mode(SaveMode.Append)
.jdbc(jdbcUrl, "example_table", jdbcProperties)
//---------------------------------------------------------------------------------------------------//---------------------------------------------------------------------------------------------------
// Write the df to ClickHouse using the save method
//---------------------------------------------------------------------------------------------------//---------------------------------------------------------------------------------------------------
df.write
.format("jdbc")
.mode("append")
.option("url", jdbcUrl)
.option("dbtable", "example_table")
.option("user", "default")
.option("password", "123456")
.save()
// Stop the Spark session// Stop the Spark session
spark.stop()
}
from pyspark.sql import SparkSession
from pyspark.sql import Row
jar_files = [
"jars/clickhouse-jdbc-X.X.X-SNAPSHOT-all.jar"
]
# Initialize Spark session with JARs
spark = SparkSession.builder \
.appName("example") \
.master("local") \
.config("spark.jars", ",".join(jar_files)) \
.getOrCreate()
# Create DataFrame
data = [Row(id=11, name="John"), Row(id=12, name="Doe")]
df = spark.createDataFrame(data)
url = "jdbc:ch://localhost:8123/default"
user = "your_user"
password = "your_password"
driver = "com.clickhouse.jdbc.ClickHouseDriver"
# Write DataFrame to ClickHouse
df.write \
.format("jdbc") \
.option("driver", driver) \
.option("url", url) \
.option("user", user) \
.option("password", password) \
.option("dbtable", "example_table") \
.mode("append") \
.save()
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:ch://localhost:8123/default",
dbtable "schema.tablename",
user "username",
password "password",
driver "com.clickhouse.jdbc.ClickHouseDriver"
);
-- resultTable could be created with df.createTempView or with Spark SQL
INSERT INTO TABLE jdbcTable
SELECT * FROM resultTable;
병렬성
Spark JDBC를 사용할 때 Spark는 단일 파티션을 사용하여 데이터를 읽습니다. 더 높은 동시성을 확보하려면
partitionColumn, lowerBound, upperBound, numPartitions를 지정해야 하며, 이는 여러 워커가 병렬로
데이터를 읽을 때 테이블을 어떻게 파티션할지 정의합니다.
자세한 내용은 Apache Spark의 공식 문서
JDBC configurations를 참조하십시오.
JDBC limitations
- Spark JDBC는 ClickHouse 전용 dialect가 없어 복합 타입(MAP, ARRAY, STRUCT)을 지원하지 않습니다. 복합 타입을 완전하게 지원하려면 네이티브 Spark-ClickHouse 커넥터를 사용하십시오.
- 현재는 JDBC를 사용하여 기존 테이블에만 데이터를 삽입할 수 있습니다. (Spark의 다른 커넥터처럼 DataFrame(DF)을 삽입할 때 테이블을 자동으로 생성하는 기능은 아직 제공되지 않습니다.)