git clone https://github.com/apache/spark.git
git branch spark-week9 v3.2.1
git checkout spark-week9
找到SqlBase.g4文件, 路径为sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 并添加SHOW VERSION 命令
statement
| SHOW VERSION #showVersion
ansiNonReserved
| VERSION
nonReserved
| VERSION
//--SPARK-KEYWORD-LIST-START
VERSION: 'VERSION';
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.types.StringType
case class ShowVersionCommand() extends LeafRunnableCommand {
override def output: Seq[Attribute] = Seq(AttributeReference("version", StringType)())
override def run(sparkSession: SparkSession): Seq[Row] = {
val sparkVersion = sparkSession.version
val javaVersion = System.getProperty("java.version")
val outputString = "Spark Version: %s, Java Version: %s".format(sparkVersion, javaVersion)
Seq(Row(outputString))
}
}
因为spark项目有代码规范,每个类的header部分需要有类似这种注释
/**
* {@inheritDoc }
*
* <p>The default implementation returns the result of calling
* {@link # visitChildren} on {@code ctx}.</p>
*/
override def visitShowVersion(ctx: ShowVersionContext): LogicalPlan = withOrigin(ctx) {
ShowVersionCommand()
}
mvn clean package -DskipTests -Phive -Phive-thriftserver
22/05/07 20:09:03 WARN Utils: Your hostname, MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.18 instead (on interface en0)
22/05/07 20:09:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/07 20:09:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/05/07 20:09:07 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
22/05/07 20:09:07 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
22/05/07 20:09:11 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
22/05/07 20:09:11 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore [email protected]
Spark master: local[*], Application Id: local-1651925345838
spark-sql> show version;
Spark Version: 3.2.1, Java Version: 1.8.0_151
Time taken: 3.23 seconds, Fetched 1 row(s)
- CombineFilters
- CollapseProject
- BooleanSimplification
CREATE TABLE tmp(a INT, b INT) USING PARQUET;
SELECT a1, (b + 1) AS b1
FROM (
SELECT (a + 1) AS a1, b FROM tmp WHERE a > 10
) WHERE a1 < 20 and 1 = 1;
object PushDownPredicates extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
_.containsAnyPattern(FILTER, JOIN)) {
CombineFilters.applyLocally
.orElse(PushPredicateThroughNonJoin.applyLocally)
.orElse(PushPredicateThroughJoin.applyLocally)
}
}
- ConstantFolding
- PushDownPredicates
- ReplaceDistinctWithAggregate
- ReplaceExceptWithAntiJoin
- FoldablePropagation
CREATE TABLE tmp(a INT, b INT) USING PARQUET;
SELECT a1, (b + 1) AS b1, 1.0 as c
FROM (
SELECT (a + 1) AS a1, b FROM tmp WHERE a > 10
) WHERE a1 < 20 and 1 = 1
EXCEPT DISTINCT SELECT (a + 1) as A, b, 1.0 as c from tmp where b > 30 order by c;
mvn clean scala:compile package -DskipTests
spark-sql --jars target/week9-1.0-SNAPSHOT.jar --conf spark.sql.extensions=com.ygs.MySparkSessionExtension
CREATE TABLE tmp(a INT, b INT) USING PARQUET;
SELECT a, b + 1 from tmp;