Git Product home page Git Product logo

amazon-s3-tagging-spark-util's Introduction

Apache Spark based Amazon S3 object tagging

Arch

This is a library built on top of Apache Spark for tagging Amazon S3 objects. This library helps you to tag objects at table level or partition level. This library supports the following file format options

  • CSV - s3.csv
  • JSON - s3.json
  • Parquet - s3.parquet
  • ORC - s3.orc
  • text - s3.text
  • Avro - s3.avro

Requirements

  • Java 8
  • SBT 1.x.x+ (for building)
  • Scala 2.11.x, Scala 2.12.x (for building)

How to build the library ?

The project is compiled using SBT. The library depends on Java 8 and is known to work with Apache Spark versions 2.4.3

Spark 2.4:

  • To compile the project, run sbt spark_24/compile
  • To generate the connector jar run sbt spark_24/compile
  • The above commands will generate the following JAR:
spark_24/target/scala-2.11/amazon-s3-tagging-spark-util-spark_24-scala-2.11-lib-2.0.jar

Spark 3.1:

  • To compile the project, run sbt spark_31/compile
  • To generate the connector jar run sbt spark_31/compile
  • The above commands will generate the following JAR:
spark_31/target/scala-2.12/amazon-s3-tagging-spark-util-spark_31-scala-2.12-lib-2.0.jar

Spark 3.3:

  • To compile the project, run sbt spark_33/compile
  • To generate the connector jar run sbt spark_33/compile
  • The above commands will generate the following JAR:
spark_33/target/scala-2.12/amazon-s3-tagging-spark-util-spark_33-scala-2.12-lib-2.0.jar

Spark 3.4:

  • To compile the project, run sbt spark_34/compile
  • To generate the connector jar run sbt spark_34/compile
  • The above commands will generate the following JAR:
spark_34/target/scala-2.12/amazon-s3-tagging-spark-util-spark_34-scala-2.12-lib-2.0.jar

This JAR includes the spark-avro and commons-lang3 and its dependencies. They need to be put in Spark's extra classpath.

Note:- The released JARs are available in the releases page.

Configure AWS Glue ETL Job

Copy the JAR for the corresponding Scala version and spark version into Amazon S3 bucket

aws s3 cp spark_24/target/scala-2.11/amazon-s3-tagging-spark-util-spark_24-scala-2.11-lib-2.0.jar s3://$BUCKET/$PREFIX

Create a Glue ETL job with following special parameters. For more details on AWS Glue Special Parameters.

Glue 2.0 Configuration:

"--extra-jars" : "s3://$BUCKET/$PREFIX/amazon-s3-tagging-spark-util-spark_24-scala-2.11-lib-2.0.jar" // change the jar for spark version

Glue 3.0 Configuration:

"--extra-jars" : "s3://$BUCKET/$PREFIX/amazon-s3-tagging-spark-util-spark_31-scala-2.12-lib-2.0.jar" // change the jar for spark version
"-enable-s3-parquet-optimized-committer": "false"

Glue 4.0 Configuration:

"--extra-jars" : "s3://$BUCKET/$PREFIX/amazon-s3-tagging-spark-util-spark_33-scala-2.12-lib-2.0.jar" // change the jar for spark version
"-enable-s3-parquet-optimized-committer": "false"

Configure AWS EMR ETL Job

Copy the JAR for the corresponding Scala version and spark version into Amazon S3 bucket

aws s3 cp spark_24/target/scala-2.11/amazon-s3-tagging-spark-util-spark_24-scala-2.11-lib-2.0.jar s3://$BUCKET/$PREFIX

EMR Job Spark Configuration:

"spark.jars": "s3://$BUCKET/$PREFIX/amazon-s3-tagging-spark-util-spark_33-scala-2.12-lib-2.0.jar"
"spark.sql.parquet.fs.optimized.committer.optimization-enabled": "false"
"spark.sql.sources.commitProtocolClass": "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol"
"spark.hadoop.mapreduce.output.fs.optimized.committer.enabled": "false"

Please update the jar file based on EMR version like below.

Supported Glue and EMR Versions and Library

Glue Supported Version Supported Library
Glue 2.0 amazon-s3-tagging-spark-util-spark_24-scala-2.11-lib-2.0.jar
Glue 3.0 amazon-s3-tagging-spark-util-spark_31-scala-2.12-lib-2.0.jar
Glue 4.0 amazon-s3-tagging-spark-util-spark_33-scala-2.12-lib-2.0.jar
EMR Supported Version Supported Library
EMR 6.8.0 amazon-s3-tagging-spark-util-spark_33-scala-2.12-lib-2.0.jar
EMR 6.8.1 amazon-s3-tagging-spark-util-spark_33-scala-2.12-lib-2.0.jar
EMR 6.9.0 amazon-s3-tagging-spark-util-spark_33-scala-2.12-lib-2.0.jar
EMR 6.9.1 amazon-s3-tagging-spark-util-spark_33-scala-2.12-lib-2.0.jar
EMR 6.10.0 amazon-s3-tagging-spark-util-spark_33-scala-2.12-lib-2.0.jar
EMR 6.10.1 amazon-s3-tagging-spark-util-spark_33-scala-2.12-lib-2.0.jar
EMR 6.11.0 amazon-s3-tagging-spark-util-spark_33-scala-2.12-lib-2.0.jar
EMR 6.11.1 amazon-s3-tagging-spark-util-spark_33-scala-2.12-lib-2.0.jar
EMR 6.12.0 amazon-s3-tagging-spark-util-spark_34-scala-2.12-lib-2.0.jar
EMR 6.13.0 amazon-s3-tagging-spark-util-spark_34-scala-2.12-lib-2.0.jar
EMR 6.14.0 amazon-s3-tagging-spark-util-spark_34-scala-2.12-lib-2.0.jar

Sample Scala Spark Job

Sample Scala Spark Code: For this example, we assume that we work on some kind of customer data, where every it has customer id, name , street, city and country.

case class Customer(id: Long, name: String, street: String, city: String, country: String)

Our library is built on Apache Spark and is designed to work with very large datasets that typically live in a distributed filesystem. For the sake of simplicity in this example, we just generate a few records though.

val rdd = spark.parallelize(Seq(
  Customer(1, "James Butt", "627 Walford Ave", "Dallas", "Dallas"),
  Customer(2, "Gearldine Gellinger", "4 Bloomfield Ave", "Irving", "Dallas"),
  Customer(3, "Ozell Shealy", "8 Industry Ln", "New York", "New York"),
  Customer(4, "Haydee Denooyer", "25346 New Rd", "New York", "New York"),
  Customer(5, "Mirta Mallett", "7 S San Marcos Rd", "New York", "New York")))

val customerDataFrame = spark.createDataFrame(rdd)

Store the data into S3 and tag the objects using additional options.

// Option 1: Static Tagging
customerDataFrame
  .write
  .format("s3.parquet")
  .option("tags", "{\"ProjectTeam\": \"Team-A\", \"FileType\":\"parquet\"}")
  .save("s3://$DATA_BUCKET/$TABLE_NAME")

// Option 2: Dynamic Tagging using partition value
customerDataFrame
  .write
  .partitionBy("country")
  .format("s3.parquet")
  .option("tags", "{\"ProjectTeam\": \"Team-A\", \"Country\":\"${country}\"}")
  .save("s3://$DATA_BUCKET/$TABLE_NAME")

Sample PySpark Job

Our library is built on Apache Spark and is designed to work with very large datasets that typically live in a distributed filesystem. For the sake of simplicity in this example, we just generate a few records though.

rdd = spark.sparkContext.parallelize([
    Row(id=1, name="James Butt", street="627 Walford Ave", city="Dallas", country="USA"),
    Row(id=2, name="Gearldine Gellinger", street="4 Bloomfield Ave", city="Irving", country="USA"),
    Row(id=3, name="Ozell Shealy", street="8 Industry Ln", city="New York", country="USA"),
    Row(id=4, name="Haydee Denooyer", street="25346 New Rd", city="New York", country="USA"),
    Row(id=5, name="Mirta Mallett", street="7 S San Marcos Rd", city="New York", country="USA")
])

# Create a DataFrame from the RDD
customer_data_frame = spark.createDataFrame(rdd)

Store the data into S3 and tag the objects using additional options.

# Option 1: Static Tagging
customer_data_frame \
    .write \
    .format("s3.parquet") \
    .option("tags", "{\"ProjectTeam\": \"Team-A\", \"FileType\":\"parquet\"}") \
    .save("s3://$DATA_BUCKET/$TABLE_NAME")

# Option 2: Dynamic Tagging using partition value
customer_data_frame \
    .write \
    .partitionBy("country") \
    .format("s3.parquet") \
    .option("tags", "{\"ProjectTeam\": \"Team-A\", \"Country\":\"${country}\"}") \
    .save("s3://$DATA_BUCKET/$TABLE_NAME")

Contributing Guidelines

See CONTRIBUTING for more information.

License

This library is licensed under the Apache 2.0 License.

amazon-s3-tagging-spark-util's People

Contributors

amazon-auto avatar rumeshkrish avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

amazon-s3-tagging-spark-util's Issues

Tagging util does not work with glue 3

Hello,
I have tried the library which is built with Scala 2.12 amazon-s3-tagging-spark-util-assembly_2.12-1.0.jar on pyspark glue 3 job. The job reads parquets from given path and it should write the parquets with given tags.

df = spark_session.read.parquet(path)

df\
    .write\
    .mode("overwrite")\
    .format("s3.parquet")\
    .option("tags","{\"ProjectTeam\": \"Team-A\", \"FileType\":\"parquet\"}")\
    .save(write_path)

The jobs fails with expection The specified key does not exist.

2022-01-14 09:14:29,904 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(73)): Error from Python:Traceback (most recent call last):
  File "/tmp/data-ingest_a_glue3", line 19, in <module>
    df.write.mode("append").format("s3.parquet").option("tag", "{\"ProjectTeam\": \"Team-A\", \"FileType\":\"parquet\"}").save(write_path)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1109, in save
    self._jwrite.save(path)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o80.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:195)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1.0 (TID 14) (10.114.116.119 executor 1): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:296)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The specified key does not exist. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchKey; Request ID: GGF611NKETEHN64H; S3 Extended Request ID: S6EGLJzEClNo76hUUKXd+lDNc2zBXVPFPmkp3nDcC38K0g1j/HuA32WwTYJe+zeRyVNYz45ZiwU=; Proxy: null), S3 Extended Request ID: S6EGLJzEClNo76hUUKXd+lDNc2zBXVPFPmkp3nDcC38K0g1j/HuA32WwTYJe+zeRyVNYz45ZiwU=
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206)
	at com.amazonaws.services.s3.AmazonS3Client.setObjectTagging(AmazonS3Client.java:1676)
	at com.amazonaws.s3.utils.S3TaggingHelper$.setTags(S3TaggingHelper.scala:46)
	at org.apache.spark.sql.execution.datasources.parquet.S3ParquetOutputWriter.close(S3ParquetOutputWriter.scala:14)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:58)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:75)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:280)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)
	... 9 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2679)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:200)
	... 38 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:296)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The specified key does not exist. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchKey; Request ID: GGF611NKETEHN64H; S3 Extended Request ID: S6EGLJzEClNo76hUUKXd+lDNc2zBXVPFPmkp3nDcC38K0g1j/HuA32WwTYJe+zeRyVNYz45ZiwU=; Proxy: null), S3 Extended Request ID: S6EGLJzEClNo76hUUKXd+lDNc2zBXVPFPmkp3nDcC38K0g1j/HuA32WwTYJe+zeRyVNYz45ZiwU=
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206)
	at com.amazonaws.services.s3.AmazonS3Client.setObjectTagging(AmazonS3Client.java:1676)
	at com.amazonaws.s3.utils.S3TaggingHelper$.setTags(S3TaggingHelper.scala:46)
	at org.apache.spark.sql.execution.datasources.parquet.S3ParquetOutputWriter.close(S3ParquetOutputWriter.scala:14)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:58)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:75)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:280)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)
	... 9 more

I was able to identified that it probably fails when it tries to access the temporary file.

2022-01-14 09:14:29,658 DEBUG [Thread-5] handler.RequestIdLogger (RequestIdLogger.java:afterResponse(36)): https://data-ingest-data-dev.s3.amazonaws.com/?list-type=2&delimiter=%2F&max-keys=1&prefix=data-ingest_ingest_s3_csv_prihodav_curated%2Fmytable%2F.spark-staging-5d886f7b-ac47-47fd-8925-fbedccf6d287%2F&fetch-owner=false x-amz-request-id=GGFBQ5RNRV71MJPD x-amz-id-2=z+jnJHCOTRHqc8aK6xjUiemhOb5Z6Nh7a9M+5GNCTeMrTgdkfrbdNgcVWG7dpP/rim0tiWQyQK0=
2022-01-14 09:14:29,658 DEBUG [Thread-5] s3n.S3NativeFileSystem (S3NativeFileSystem.java:getFileStatus(531)): getFileStatus could not find key 'data-ingest_ingest_s3_csv_prihodav_curated/mytable/.spark-staging-5d886f7b-ac47-47fd-8925-fbedccf6d287'
2022-01-14 09:14:29,658 DEBUG [Thread-5] s3n.S3NativeFileSystem (S3NativeFileSystem.java:delete(384)): Delete called for 's3://data-ingest-data-dev/data-ingest_ingest_s3_csv_prihodav_curated/mytable/.spark-staging-5d886f7b-ac47-47fd-8925-fbedccf6d287' but file does not exist, so returning false

Could you please provide us support with this issue?

Getting error while using s3 spark tagging util

Hi,

I am running spark jobs on glue 3.0 with pyspark, and spark tagging util jar file download from release page https://github.com/awslabs/amazon-s3-tagging-spark-util/releases , amazon-s3-tagging-spark-util-assembly_2.12-1.0.jar. I am passing the jar as external argument of glue job as "--extra-jars" : "s3://$BUCKET/$PREFIX/amazon-s3-tagging-spark-util-assembly_2.12-1.0.jar".

glue start job command :
$ aws glue start-job-run --job-name "CSV to CSV" --arguments='--extra-jars="s3://$BUCKET/$PREFIX/amazon-s3-tagging-spark-util-assembly_2.12-1.0.jar"'

The jar register successfully in glue job , able to see the jars in spark config ('spark.glue.extra-jars', 's3://$BUCKET/$PREFIX/amazon-s3-tagging-spark-util-assembly_2.12-1.0.jar')

First I am try to reading files from s3 bucket, and reading file successfully.

df=spark.read.csv('s3://file',header=True,inferschema=True)

and then after Writing the file back to s3

df.write
.format("s3.csv")
.option("tag", "{"ProjectTeam": "Team-A", "FileType":"parquet"}")
.save("s3://$DATA_BUCKET/$TABLE_NAME")

But getting error during the write the file:
File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328 in get_return_value
format(target_id,".",name),value)

py4j.protocol.Py4JJavaError: An error occurred while calling o165.save
: java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/csv/CSVOptions

Can you please help me out on this?

Can we use the external jar on EMR Cluster and call write method using pyspark ?

Can we use the external jar on EMR Cluster and call write method using pyspark ?

2 ways ----

  1. Add path in spark-submit of jar files
    ./bin/spark-submit --jars jarfile.jar spark_script.py
    or set the enviroment variable SPARK_CLASSPATH

  2. add path in the classpath
    SPARK_CLASSPATH='/path/jarfile.jar:/path/jarfile.jar' spark_script.py

or else
after spark session
spark.sparkContext.addPyFile("/path/to/jar/jarfile.jar")

Originally posted by @simranjeet97 in #1 (comment)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.