Git Product home page Git Product logo

spark-clickhouse-connector's Introduction

Spark ClickHouse Connector

Maven Central License

Build on Apache Spark DataSourceV2 API.

Overview

Usage

See the documentation for how to use this connector.

Requirements

  • Java 8 or 17
  • Scala 2.12 or 2.13
  • Apache Spark 3.3 or 3.4 or 3.5

Notes:

  1. As of 0.5.0, this connector switches from ClickHouse raw gRPC Client to ClickHouse Official Java Client, which brings HTTP protocol support, extends the range of supported versions of ClickHouse Server.
  2. Since 0.6.0, HTTP becomes the default protocol.
  3. Since 0.7.0, gRPC is deprecated and not recommended, it may be removed in the future.
  4. Since 0.8.0, gRPC is removed.

Compatible Matrix

Version Compatible Spark Versions ClickHouse JDBC version
main Spark 3.3, 3.4, 3.5 0.6.0
0.7.3 Spark 3.3, 3.4 0.4.6
0.6.0 Spark 3.3 0.3.2-patch11
0.5.0 Spark 3.2, 3.3 0.3.2-patch11
0.4.0 Spark 3.2, 3.3 Not depend on
0.3.0 Spark 3.2, 3.3 Not depend on
0.2.1 Spark 3.2 Not depend on
0.1.2 Spark 3.2 Not depend on

Build

Build w/o test

./gradlew clean build -x test

Test

The project leverage Testcontainers and Docker Compose to do integration tests, you should install Docker and Docker Compose before running test, and check more details on Testcontainers document if you'd like to run test with remote Docker daemon.

Run all test

./gradlew clean test

Run all test w/ Spark 3.3 and Scala 2.13

./gradlew clean test -Dspark_binary_version=3.3 -Dscala_binary_version=2.13

Run single test

./gradlew test --tests=ConvertDistToLocalWriteSuite

Test against custom ClickHouse image

CLICKHOUSE_IMAGE=custom-org/clickhouse-server:custom-tag ./gradlew test

spark-clickhouse-connector's People

Contributors

1456173400 avatar camper42 avatar chaosmin avatar cookcocck avatar cxzl25 avatar eye-gu avatar harryshi10 avatar heartdance avatar martin-g avatar mylanpangzi avatar mzitnik avatar pan3793 avatar sketchmind avatar tangyl avatar veiasai avatar yxang avatar zhouyifan279 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-clickhouse-connector's Issues

build failed with error `package javax.annotation does not exist`

Welcome to Gradle 7.4.1!

Here are the highlights of this release:
 - Aggregated test and JaCoCo reports
 - Marking additional test source directories as tests in IntelliJ
 - Support for Adoptium JDKs in Java toolchains

For more details see https://docs.gradle.org/7.4.1/release-notes.html

Starting a Gradle Daemon (subsequent builds will be faster)

> Task :clickhouse-core:compileJava
/data/home/caofengyu/spark-clickhouse-connector/clickhouse-core/build/generated/source/proto/main/grpc/xenon/protocol/grpc/ClickHouseGrpc.java:7: error: package javax.annotation does not exist
@javax.annotation.Generated(
                 ^
1 error

> Task :clickhouse-core:compileJava FAILED

FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':clickhouse-core:compileJava'.
> Compilation failed; see the compiler error output for details.

* Try:
> Run with --stacktrace option to get the stack trace.
> Run with --info or --debug option to get more log output.
> Run with --scan to get full insights.

* Get more help at https://help.gradle.org

BUILD FAILED in 2m 12s
9 actionable tasks: 5 executed, 4 up-to-date

openjdk 11.0.13_p8

Spark query Clickhouse does not support multi row sort

clickhouse create table :

CREATE TABLE ty_test.ty_data_test1 ON CLUSTER cluster_brd
(
    day bigint,
    minute String,
    index_name1 UInt64,
    index_name2 UInt64,
    index_name3 UInt64,
    index_name4 UInt64,
    index_name5 UInt64,
    index_name6 UInt64,
    index_name7 UInt64,
    index_name8 UInt64,
    index_name9 UInt64,
    index_name10 UInt64
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/ty_data_test1', '{replica}')
PARTITION BY (day)
ORDER BY (minute,index_name2,index_name3,index_name4,index_name5,index_name6,index_name7)
SETTINGS index_granularity = 8192,
use_minimalistic_part_header_in_zookeeper = 1;

spark query:

show create table clickhouse.ty_test.ty_data_test1;

error msg:

Error: Error operating EXECUTE_STATEMENT: java.lang.IllegalArgumentException: Unsupported ColumnExpr: [ColumnExprTupleContext] (minute,index_name2,index_name3,index_name4,index_name5,index_name6,index_name7)
	at xenon.clickhouse.parse.AstVisitor.visitColumnExpr(AstVisitor.scala:116)
	at xenon.clickhouse.parse.AstVisitor.visitOrderExpr(AstVisitor.scala:155)
	at xenon.clickhouse.parse.AstVisitor.$anonfun$visitOrderByClause$1(AstVisitor.scala:150)
	at scala.collection.immutable.List.map(List.scala:293)
	at xenon.clickhouse.parse.AstVisitor.visitOrderByClause(AstVisitor.scala:150)
	at xenon.clickhouse.parse.AstVisitor.$anonfun$visitEngineClause$4(AstVisitor.scala:41)
	at scala.Option.map(Option.scala:230)
	at xenon.clickhouse.parse.AstVisitor.visitEngineClause(AstVisitor.scala:41)
	at xenon.clickhouse.parse.SQLParser.$anonfun$parseEngineClause$1(SQLParser.scala:30)
	at xenon.clickhouse.parse.SQLParser.parse(SQLParser.scala:48)
	at xenon.clickhouse.parse.SQLParser.parseEngineClause(SQLParser.scala:30)
	at xenon.clickhouse.spec.TableEngineUtils$.resolveTableEngine(TableEngineUtils.scala:23)
	at xenon.clickhouse.ClickHouseCatalog.loadTable(ClickHouseCatalog.scala:124)
	at xenon.clickhouse.ClickHouseCatalog.loadTable(ClickHouseCatalog.scala:35)
	at org.apache.spark.sql.connector.catalog.CatalogV2Util$.loadTable(CatalogV2Util.scala:281)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTables$$anonfun$apply$14.applyOrElse(Analyzer.scala:1081)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTables$$anonfun$apply$14.applyOrElse(Analyzer.scala:1064)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$2(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
	at org.apache.spark.sql.catalyst.plans.logical.ShowCreateTable.mapChildren(v2Commands.scala:835)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTables$.apply(Analyzer.scala:1064)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTables$.apply(Analyzer.scala:1062)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:215)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:209)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:172)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:193)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:192)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:88)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:88)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:86)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:78)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
	at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:100)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.withLocalProperties(ExecuteStatement.scala:159)
	at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.org$apache$kyuubi$engine$spark$operation$ExecuteStatement$$executeStatement(ExecuteStatement.scala:94)
	at org.apache.kyuubi.engine.spark.operation.ExecuteStatement$$anon$1.run(ExecuteStatement.scala:127)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748) (state=,code=0)

Support ClickHouse distributed table shard by rand()

Using rand() function, spark reads null error. If fields are used for fragmentation, there may be a problem of uneven data distribution.
Error message:

java.lang.NullPointerException
	at xenon.clickhouse.parse.AstVisitor.visitColumnExprFunction(AstVisitor.scala:135)
	at xenon.clickhouse.parse.AstVisitor.visitColumnExpr(AstVisitor.scala:114)
	at xenon.clickhouse.parse.AstVisitor.visitColumnsExpr(AstVisitor.scala:163)
	at xenon.clickhouse.parse.AstVisitor.$anonfun$visitEngineClause$3(AstVisitor.scala:39)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at xenon.clickhouse.parse.AstVisitor.visitEngineClause(AstVisitor.scala:39)
	at xenon.clickhouse.parse.SQLParser.$anonfun$parseEngineClause$1(SQLParser.scala:30)
	at xenon.clickhouse.parse.SQLParser.parse(SQLParser.scala:48)
	at xenon.clickhouse.parse.SQLParser.parseEngineClause(SQLParser.scala:30)
	at xenon.clickhouse.spec.TableEngineUtils$.resolveTableEngine(TableEngineUtils.scala:23)
	at xenon.clickhouse.ClickHouseCatalog.loadTable(ClickHouseCatalog.scala:124)
	at xenon.clickhouse.ClickHouseCatalog.loadTable(ClickHouseCatalog.scala:35)
	at org.apache.spark.sql.connector.catalog.CatalogV2Util$.loadTable(CatalogV2Util.scala:281)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTables$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveTables$$lookupV2Relation(Analyzer.scala:1116)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTables$$anonfun$apply$14.applyOrElse(Analyzer.scala:1066)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTables$$anonfun$apply$14.applyOrElse(Analyzer.scala:1064)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$2(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
	at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTables$.apply(Analyzer.scala:1064)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTables$.apply(Analyzer.scala:1062)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:215)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:209)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:172)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:193)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:192)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:88)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:88)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:86)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:78)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
	at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:67)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:384)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:504)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:498)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:498)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:287)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
java.lang.NullPointerException
	at xenon.clickhouse.parse.AstVisitor.visitColumnExprFunction(AstVisitor.scala:135)
	at xenon.clickhouse.parse.AstVisitor.visitColumnExpr(AstVisitor.scala:114)
	at xenon.clickhouse.parse.AstVisitor.visitColumnsExpr(AstVisitor.scala:163)
	at xenon.clickhouse.parse.AstVisitor.$anonfun$visitEngineClause$3(AstVisitor.scala:39)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at xenon.clickhouse.parse.AstVisitor.visitEngineClause(AstVisitor.scala:39)
	at xenon.clickhouse.parse.SQLParser.$anonfun$parseEngineClause$1(SQLParser.scala:30)
	at xenon.clickhouse.parse.SQLParser.parse(SQLParser.scala:48)
	at xenon.clickhouse.parse.SQLParser.parseEngineClause(SQLParser.scala:30)
	at xenon.clickhouse.spec.TableEngineUtils$.resolveTableEngine(TableEngineUtils.scala:23)
	at xenon.clickhouse.ClickHouseCatalog.loadTable(ClickHouseCatalog.scala:124)
	at xenon.clickhouse.ClickHouseCatalog.loadTable(ClickHouseCatalog.scala:35)
	at org.apache.spark.sql.connector.catalog.CatalogV2Util$.loadTable(CatalogV2Util.scala:281)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTables$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveTables$$lookupV2Relation(Analyzer.scala:1116)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTables$$anonfun$apply$14.applyOrElse(Analyzer.scala:1066)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTables$$anonfun$apply$14.applyOrElse(Analyzer.scala:1064)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$2(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
	at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTables$.apply(Analyzer.scala:1064)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTables$.apply(Analyzer.scala:1062)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:215)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:209)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:172)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:193)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:192)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:88)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:88)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:86)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:78)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
	at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:67)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:384)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:504)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:498)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:498)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:287)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Support Spark FloatType/DoubleType

solution:Add two new pattern matchs in xenon.clickhouse.SchemaUtils#toClickHouseType(line 75)
case FloatType => "Float32"
case DoubleType => "Float64"

What hash functions are currently supported?

We encountered xenon.clickhouse.exception.ClickHouseClientException: [-1] Unsupported ClickHouse expression: FuncExpr[cityHash64(XXXX)] while writing to a table with a sample by expression.

same exception with murmurHash2_64

What hash functions are currently supported? https://clickhouse.com/docs/en/sql-reference/functions/hash-functions/

create table stmt:

CREATE TABLE dev.sample_test
(
    `did` String,
    `load_date` Date
)
ENGINE = MergeTree
PRIMARY KEY xxHash64(did)
PARTITION BY load_date
SAMPLE BY xxHash64(did)
SETTINGS index_granularity = 8192

Transfer project to @housepower organization

Hi @housepower members,

I'd like to propose transfer this project from my own repository to @housepower's repository.

The project is an Apache Spark DataSource V2 API implementation for ClickHouse, it could be considered as a data source plugin of Apache Spark.

The vision of this project is to provide a seamless user experience for Spark users to access ClickHouse.

The project is currently in a very early stage. Basically, it uses gRPC protocol and JSON-related serialization to communicate to the ClickHouse server, support write/read both single ClickHouse node and ClickHouse cluster, see more detail in the integration tests which under module clickhouse-spark-31-integration.

Spark DataSourceV2 API is on rapid development, due to the limitation of development resources, the project currently only supports Spark 3.1.x.

Looking forward to your feedback and thank you very much.

Error on `Date` type column

CREATE TABLE IF NOT EXISTS default.test_date (`date` Date) Engine Log();
INSERT INTO default.test_date (*) VALUES ('2022-04-12');

# spark sql
spark.sql("select * from `ck-datasupport-1`.dev.test_shard_uid").toPandas()`

task failed with error

java.lang.ClassCastException: class java.lang.Long cannot be cast to class java.lang.Integer (java.lang.Long and java.lang.Integer are in module java.base of loader 'bootstrap')
	at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:103)
	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getInt(rows.scala:41)
	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getInt$(rows.scala:41)
	at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:195)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

the error my colleague encountered was:

CREATE TABLE IF NOT EXISTS `default`.`test_uid` (`user_id` INT NOT NULL, `load_date` DATE NOT NULL) ENGINE MergeTree
PARTITION BY load_date
 ORDER BY user_id
SETTINGS index_granularity = '8192'

INSERT INTO `default`.`test_uid` (*) VALUES (1, '2022-04-12');

# spark sql
spark.sql("select * from `ck-datasupport-1`.`default`.`test_uid` ").show()

error log:

xenon.clickhouse.exception.ClickHouseServerException: [43] DB::Exception: Illegal types of arguments (Date, Int64) of function equals: While processing load_date = ((2022 - 4) - 12)
	at xenon.clickhouse.grpc.GrpcNodeClient.$anonfun$syncStreamQueryAndCheck$3(GrpcNodeClient.scala:193)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at xenon.clickhouse.format.JSONCompactEachRowWithNamesAndTypesStreamOutput.hasNext(JSONOutputFormat.scala:106)
	at xenon.clickhouse.read.ClickHouseReader.next(ClickHouseReader.scala:71)
	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:93)
	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:130)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Add new conf WRITE_REPARTITION_NUM

val WRITE_REPARTITION_NUM: ConfigEntry[Int] =
    buildConf("write.repartitionNum")
      .doc("Repartition data to meet the distributions of ClickHouse table is required before writing, " +
        "use this conf to specific the repartition number, value less than 1 mean no requirement.")
      .intConf
      .createWithDefault(0)

Support ClickHouse table with multi partition columns

clickhouse 创建脚本如下:

CREATE TABLE ty_test.tbl_sql (
  create_time TIMESTAMP NOT NULL,
  d           INT       NOT NULL COMMENT 'part key',
  m           INT       NOT NULL COMMENT 'part key',
  id          BIGINT    NOT NULL COMMENT 'sort key',
  value       STRING
) USING ClickHouse
PARTITIONED BY (d,m)
TBLPROPERTIES (
  engine = "ReplicatedMergeTree('/clickhouse/ty_test/tbl_sql/{shard}', '{replica}')",
  order_by = '(id)',
  settings.index_granularity = 8192
);

spark查询报错信息:

java.lang.IllegalArgumentException: Unsupported ColumnExpr: [ColumnExprTupleContext] (d,m)
	at xenon.clickhouse.parse.AstVisitor.visitColumnExpr(AstVisitor.scala:116)
	at xenon.clickhouse.parse.AstVisitor.$anonfun$visitEngineClause$8(AstVisitor.scala:43)
	at scala.Option.map(Option.scala:230)
	at xenon.clickhouse.parse.AstVisitor.visitEngineClause(AstVisitor.scala:43)
	at xenon.clickhouse.parse.SQLParser.$anonfun$parseEngineClause$1(SQLParser.scala:30)
	at xenon.clickhouse.parse.SQLParser.parse(SQLParser.scala:48)
	at xenon.clickhouse.parse.SQLParser.parseEngineClause(SQLParser.scala:30)
	at xenon.clickhouse.spec.TableEngineUtils$.resolveTableEngine(TableEngineUtils.scala:23)
	at xenon.clickhouse.ClickHouseCatalog.loadTable(ClickHouseCatalog.scala:124)
	at xenon.clickhouse.ClickHouseCatalog.createTable(ClickHouseCatalog.scala:231)
	at xenon.clickhouse.ClickHouseCatalog.createTable(ClickHouseCatalog.scala:35)
	at org.apache.spark.sql.execution.datasources.v2.CreateTableExec.run(CreateTableExec.scala:42)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)

Support reading from or writing to specific nodes

Currently, when accessing Distributed/Replicated tables, Spark always uses a random strategy to access all nodes of tables.

Assume a Distributed table has 4 shards and 2 replicas, we may want to

  1. read and write the local table directly
    1.1. limit the spark read and write to the 1st replica to avoid impacting the 2nd replica which may serve online services.
    1.2. always write to the 1st nodes and read from 2nd replica to isolate the read and write nodes

  2. read and write distributed table
    2.1 limit the spark read and write to the specific nodes as other nodes may serve online services.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.