Git Product home page Git Product logo

tpch-spark's Introduction

tpch-spark

TPC-H queries implemented in Spark using the DataFrames API.

The TPC-H is a decision support benchmark. It consists of a suite of business oriented ad-hoc queries and concurrent data modifications. The queries and the data populating the database have been chosen to have broad industry-wide relevance. This benchmark illustrates decision support systems that examine large volumes of data, execute queries with a high degree of complexity, and give answers to critical business questions. โ€” https://www.tpc.org/tpch


Getting started

Prerequisites: Apache Spark

tpch-spark requires that Spark is installed on your machine. You can download Spark from https://spark.apache.org/downloads.html. At a high level, to install Spark you have to:

# Step 1:
tar xvfz <the tgz file you downloaded>

# Step 2:
# [optionally move/rename the untarred directory wherever you want, say, $HOME/spark]

# Step 3:
export PATH=$PATH:$HOME/spark/bin
# or better yet, add the above to your bashrc (or equivalent) and source it.

A. Get the code

git clone https://github.com/ssavvides/tpch-spark
cd tpch-spark

B. Generate input data tables

Navigate to the data generator directory dbgen and build the data generator:

cd dbgen
make

This should generate an executable called dbgen. Use the -h flag to see the various options the tool offers.

./dbgen -h

The simplest case is running the dbgen executable with no flags.

./dbgen

The above generates tables with extension .tbl with scale 1 (default) for a total of roughly 1GB size across all tables.

$ ls -hl *.tbl
-rw-rw-r-- 1 savvas savvas  24M May 28 12:39 customer.tbl
-rw-rw-r-- 1 savvas savvas 725M May 28 12:39 lineitem.tbl
-rw-rw-r-- 1 savvas savvas 2.2K May 28 12:39 nation.tbl
-rw-rw-r-- 1 savvas savvas 164M May 28 12:39 orders.tbl
-rw-rw-r-- 1 savvas savvas 114M May 28 12:39 partsupp.tbl
-rw-rw-r-- 1 savvas savvas  24M May 28 12:39 part.tbl
-rw-rw-r-- 1 savvas savvas  389 May 28 12:39 region.tbl
-rw-rw-r-- 1 savvas savvas 1.4M May 28 12:39 supplier.tbl

For different size tables you can use the -s (scale) option. For example,

./dbgen -s 10

will generate roughly 10GB of input data.

Note that by default, dbgen uses a | as a column separator, and includes a | at the end of each entry.

$ cat region.tbl 
0|AFRICA|lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to |
1|AMERICA|hs use ironic, even requests. s|
2|ASIA|ges. thinly even pinto beans ca|
3|EUROPE|ly final courts cajole furiously final excuse|
4|MIDDLE EAST|uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl|

You can find the schemas of the generated tables in the TPC-H specification

C. Build tpch-spark

tpch-spark is written in Scala as a self-contained Spark application.

Use the provided sbt file to build tpch-spark as a spark application.

cd tpch-spark
sbt package

The above command will package the application into a jar file, e.g., ./target/scala-2.12/spark-tpc-h-queries_2.12-1.0.jar which you will be needing in the next step.

D. Run tpch-spark

You can run all TPC-H queries from Q01 to Q22 by running:

spark-submit --class "main.scala.TpchQuery" target/scala-2.12/spark-tpc-h-queries_2.12-1.0.jar

If you want to run a specific query you can use

spark-submit --class "main.scala.TpchQuery" target/scala-2.12/spark-tpc-h-queries_2.12-1.0.jar <query number>

where <query number> is the number of the query to run, i.e., 1, 2, ..., 22.

N.B.:

  • By default, tpch-spark will look for the input data files (the *.tbl files generated by dbgen) in "<current working directory>/dbgen". You can point to another location by setting the environment variable TPCH_INPUT_DATA_DIR.
  • By default, the query results will be stored in "${TPCH_INPUT_DATA_DIR}/output/{Q01, Q02, ...}, or to whatever location TPCH_QUERY_OUTPUT_DIR is set.
  • The execution times for each query run will be stored in a file with path "<current working directory>/tpch_execution_times.txt" or to whatever file path TPCH_EXECUTION_TIMES points to.

For example, to replace the default locations you can use:

export TPCH_INPUT_DATA_DIR="$HOME/tpch-data"
export TPCH_QUERY_OUTPUT_DIR="$HOME/tpch-results"
export TPCH_EXECUTION_TIMES="$HOME/tpch-times.txt"

Other Implementations

  1. Data generator (http://www.tpc.org/tpch/)
  2. TPC-H for Hive (https://issues.apache.org/jira/browse/hive-600)
  3. TPC-H for PIG (https://github.com/ssavvides/tpch-pig)

tpch-spark's People

Contributors

jaehc avatar parsifal-47 avatar ssavvides avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

tpch-spark's Issues

Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:

Hi,

I am trying to run through following command

spark-submit --class "main.scala.TpchQuery" --master spark://spark-master-host:7077 target/scala-2.10/spark-tpc-h-queries_2.10-1.0.jar 01

but it throws following exception.
....
16/04/19 11:08:33 INFO storage.BlockManagerMasterEndpoint: Registering block manager spark-master-host:37912 with 511.1 MB RAM, BlockManagerId(0, spark-master-host, 37912)
Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange rangepartitioning(l_returnflag#16 ASC,l_linestatus#17 ASC,200), None
+- ConvertToSafe
+- TungstenAggregate(key=[l_returnflag#16,l_linestatus#17], functions=[(sum(l_quantity#12),mode=Final,isDistinct=false),(sum(l_extendedprice#13),mode=Final,isDistinct=false),(sum(UDF(l_extendedprice#13,l_discount#14)),mode=Final,isDistinct=false),(sum(if (isnull(UDF(l_extendedprice#13,l_discount#14))) null else UDF(UDF(l_extendedprice#13,l_discount#14),l_tax#15)),mode=Final,isDistinct=false),(avg(l_quantity#12),mode=Final,isDistinct=false),(avg(l_extendedprice#13),mode=Final,isDistinct=false),(avg(l_discount#14),mode=Final,isDistinct=false),(count(1),mode=Final,isDistinct=false)], output=[l_returnflag#16,l_linestatus#17,sum(l_quantity)#61,sum(l_extendedprice)#62,sum(UDF(l_extendedprice,l_discount))#63,sum(UDF(UDF(l_extendedprice,l_discount),l_tax))#64,avg(l_quantity)#65,avg(l_extendedprice)#66,avg(l_discount)#67,count(l_quantity)#68L])
+- TungstenExchange hashpartitioning(l_returnflag#16,l_linestatus#17,200), None
+- TungstenAggregate(key=[l_returnflag#16,l_linestatus#17], functions=[(sum(l_quantity#12),mode=Partial,isDistinct=false),(sum(l_extendedprice#13),mode=Partial,isDistinct=false),(sum(UDF(l_extendedprice#13,l_discount#14)),mode=Partial,isDistinct=false),(sum(if (isnull(UDF(l_extendedprice#13,l_discount#14))) null else UDF(UDF(l_extendedprice#13,l_discount#14),l_tax#15)),mode=Partial,isDistinct=false),(avg(l_quantity#12),mode=Partial,isDistinct=false),(avg(l_extendedprice#13),mode=Partial,isDistinct=false),(avg(l_discount#14),mode=Partial,isDistinct=false),(count(1),mode=Partial,isDistinct=false)], output=[l_returnflag#16,l_linestatus#17,sum#88,sum#89,sum#90,sum#91,sum#92,count#93L,sum#94,count#95L,sum#96,count#97L,count#98L])
+- Project [l_quantity#12,l_linestatus#17,l_extendedprice#13,l_tax#15,l_returnflag#16,l_discount#14]
+- Filter (l_shipdate#18 <= 1998-09-02)
+- Scan ExistingRDD[l_orderkey#8,l_partkey#9,l_suppkey#10,l_linenumber#11,l_quantity#12,l_extendedprice#13,l_discount#14,l_tax#15,l_returnflag#16,l_linestatus#17,l_shipdate#18,l_commitdate#19,l_receiptdate#20,l_shipinstruct#21,l_shipmode#22,l_comment#23]

    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
    at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:247)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.ConvertToUnsafe.doExecute(rowFormatConverters.scala:38)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.Sort.doExecute(Sort.scala:64)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:109)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
    at org.apache.spark.sql.DataFrameWriter.json(DataFrameWriter.scala:323)
    at main.scala.TpchQuery.outputDF(TpchQuery.scala:136)
    at main.scala.Q01.execute(Q01.scala:30)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at main.scala.TpchQuery$.executeQuery(TpchQuery.scala:147)
    at main.scala.TpchQuery$.main(TpchQuery.scala:152)
    at main.scala.TpchQuery.main(TpchQuery.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
TungstenAggregate(key=[l_returnflag#16,l_linestatus#17], functions=[(sum(l_quantity#12),mode=Final,isDistinct=false),(sum(l_extendedprice#13),mode=Final,isDistinct=false),(sum(UDF(l_extendedprice#13,l_discount#14)),mode=Final,isDistinct=false),(sum(if (isnull(UDF(l_extendedprice#13,l_discount#14))) null else UDF(UDF(l_extendedprice#13,l_discount#14),l_tax#15)),mode=Final,isDistinct=false),(avg(l_quantity#12),mode=Final,isDistinct=false),(avg(l_extendedprice#13),mode=Final,isDistinct=false),(avg(l_discount#14),mode=Final,isDistinct=false),(count(1),mode=Final,isDistinct=false)], output=[l_returnflag#16,l_linestatus#17,sum(l_quantity)#61,sum(l_extendedprice)#62,sum(UDF(l_extendedprice,l_discount))#63,sum(UDF(UDF(l_extendedprice,l_discount),l_tax))#64,avg(l_quantity)#65,avg(l_extendedprice)#66,avg(l_discount)#67,count(l_quantity)#68L])
+- TungstenExchange hashpartitioning(l_returnflag#16,l_linestatus#17,200), None
+- TungstenAggregate(key=[l_returnflag#16,l_linestatus#17], functions=[(sum(l_quantity#12),mode=Partial,isDistinct=false),(sum(l_extendedprice#13),mode=Partial,isDistinct=false),(sum(UDF(l_extendedprice#13,l_discount#14)),mode=Partial,isDistinct=false),(sum(if (isnull(UDF(l_extendedprice#13,l_discount#14))) null else UDF(UDF(l_extendedprice#13,l_discount#14),l_tax#15)),mode=Partial,isDistinct=false),(avg(l_quantity#12),mode=Partial,isDistinct=false),(avg(l_extendedprice#13),mode=Partial,isDistinct=false),(avg(l_discount#14),mode=Partial,isDistinct=false),(count(1),mode=Partial,isDistinct=false)], output=[l_returnflag#16,l_linestatus#17,sum#88,sum#89,sum#90,sum#91,sum#92,count#93L,sum#94,count#95L,sum#96,count#97L,count#98L])
+- Project [l_quantity#12,l_linestatus#17,l_extendedprice#13,l_tax#15,l_returnflag#16,l_discount#14]
+- Filter (l_shipdate#18 <= 1998-09-02)
+- Scan ExistingRDD[l_orderkey#8,l_partkey#9,l_suppkey#10,l_linenumber#11,l_quantity#12,l_extendedprice#13,l_discount#14,l_tax#15,l_returnflag#16,l_linestatus#17,l_shipdate#18,l_commitdate#19,l_receiptdate#20,l_shipinstruct#21,l_shipmode#22,l_comment#23]

    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:80)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.scala:56)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.Exchange.prepareShuffleDependency(Exchange.scala:164)
    at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:254)
    at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:248)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
    ... 53 more

Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
TungstenExchange hashpartitioning(l_returnflag#16,l_linestatus#17,200), None
+- TungstenAggregate(key=[l_returnflag#16,l_linestatus#17], functions=[(sum(l_quantity#12),mode=Partial,isDistinct=false),(sum(l_extendedprice#13),mode=Partial,isDistinct=false),(sum(UDF(l_extendedprice#13,l_discount#14)),mode=Partial,isDistinct=false),(sum(if (isnull(UDF(l_extendedprice#13,l_discount#14))) null else UDF(UDF(l_extendedprice#13,l_discount#14),l_tax#15)),mode=Partial,isDistinct=false),(avg(l_quantity#12),mode=Partial,isDistinct=false),(avg(l_extendedprice#13),mode=Partial,isDistinct=false),(avg(l_discount#14),mode=Partial,isDistinct=false),(count(1),mode=Partial,isDistinct=false)], output=[l_returnflag#16,l_linestatus#17,sum#88,sum#89,sum#90,sum#91,sum#92,count#93L,sum#94,count#95L,sum#96,count#97L,count#98L])
+- Project [l_quantity#12,l_linestatus#17,l_extendedprice#13,l_tax#15,l_returnflag#16,l_discount#14]
+- Filter (l_shipdate#18 <= 1998-09-02)
+- Scan ExistingRDD[l_orderkey#8,l_partkey#9,l_suppkey#10,l_linenumber#11,l_quantity#12,l_extendedprice#13,l_discount#14,l_tax#15,l_returnflag#16,l_linestatus#17,l_shipdate#18,l_commitdate#19,l_receiptdate#20,l_shipinstruct#21,l_shipmode#22,l_comment#23]

    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
    at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:247)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:86)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:80)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
    ... 67 more

Caused by: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://hdfs-namenode-host:9000/TPCHBench/Input/lineitem.tbl
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.ShuffleDependency.(Dependency.scala:91)
at org.apache.spark.sql.execution.Exchange.prepareShuffleDependency(Exchange.scala:220)
at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:254)
at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:248)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
... 75 more
16/04/19 11:08:35 INFO spark.SparkContext: Invoking stop() from shutdown hook
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static/sql,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/SQL/execution/json,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/SQL/execution,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/SQL/json,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/SQL,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null}
16/04/19 11:08:35 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null}
16/04/19 11:08:35 INFO ui.SparkUI: Stopped Spark web UI at http://172.17.0.4:4040
16/04/19 11:08:35 INFO cluster.SparkDeploySchedulerBackend: Shutting down all executors
16/04/19 11:08:35 INFO cluster.SparkDeploySchedulerBackend: Asking each executor to shut down
16/04/19 11:08:35 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/04/19 11:08:35 INFO storage.MemoryStore: MemoryStore cleared
16/04/19 11:08:35 INFO storage.BlockManager: BlockManager stopped
16/04/19 11:08:35 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
16/04/19 11:08:35 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/04/19 11:08:35 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/04/19 11:08:35 INFO spark.SparkContext: Successfully stopped SparkContext
16/04/19 11:08:35 INFO util.ShutdownHookManager: Shutdown hook called
16/04/19 11:08:35 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/04/19 11:08:35 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-6cdf759e-d785-4fe2-b7ed-6559f9ddd7e6/httpd-ece2c02f-dfdc-46f7-8ce9-28ef946dad7a
16/04/19 11:08:35 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-6cdf759e-d785-4fe2-b7ed-6559f9ddd7e6
16/04/19 11:08:35 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
root@spark-master-host:/home/tpch-spark#

I am using spark 1.6.1

Regards
Shuja

Running the code

May I know some understanding of executing this code after cloning it to my machine? I actually would like to generate some data flow graphs using this code. Thank you.

SparkSession not closed

The SparkSession is not closed after the end of all jobs. This causes Kubernetes executions not to end the executors and driver pods.

Fix: add spark.close() at line 106 in /src/main/scala/TpchQuery.scala

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.