Git Product home page Git Product logo

amazon-s3-tagging-spark-util's Issues

Tagging util does not work with glue 3

Hello,
I have tried the library which is built with Scala 2.12 amazon-s3-tagging-spark-util-assembly_2.12-1.0.jar on pyspark glue 3 job. The job reads parquets from given path and it should write the parquets with given tags.

df = spark_session.read.parquet(path)

df\
    .write\
    .mode("overwrite")\
    .format("s3.parquet")\
    .option("tags","{\"ProjectTeam\": \"Team-A\", \"FileType\":\"parquet\"}")\
    .save(write_path)

The jobs fails with expection The specified key does not exist.

2022-01-14 09:14:29,904 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(73)): Error from Python:Traceback (most recent call last):
  File "/tmp/data-ingest_a_glue3", line 19, in <module>
    df.write.mode("append").format("s3.parquet").option("tag", "{\"ProjectTeam\": \"Team-A\", \"FileType\":\"parquet\"}").save(write_path)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1109, in save
    self._jwrite.save(path)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o80.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:195)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1.0 (TID 14) (10.114.116.119 executor 1): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:296)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The specified key does not exist. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchKey; Request ID: GGF611NKETEHN64H; S3 Extended Request ID: S6EGLJzEClNo76hUUKXd+lDNc2zBXVPFPmkp3nDcC38K0g1j/HuA32WwTYJe+zeRyVNYz45ZiwU=; Proxy: null), S3 Extended Request ID: S6EGLJzEClNo76hUUKXd+lDNc2zBXVPFPmkp3nDcC38K0g1j/HuA32WwTYJe+zeRyVNYz45ZiwU=
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206)
	at com.amazonaws.services.s3.AmazonS3Client.setObjectTagging(AmazonS3Client.java:1676)
	at com.amazonaws.s3.utils.S3TaggingHelper$.setTags(S3TaggingHelper.scala:46)
	at org.apache.spark.sql.execution.datasources.parquet.S3ParquetOutputWriter.close(S3ParquetOutputWriter.scala:14)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:58)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:75)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:280)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)
	... 9 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2679)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:200)
	... 38 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:296)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The specified key does not exist. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchKey; Request ID: GGF611NKETEHN64H; S3 Extended Request ID: S6EGLJzEClNo76hUUKXd+lDNc2zBXVPFPmkp3nDcC38K0g1j/HuA32WwTYJe+zeRyVNYz45ZiwU=; Proxy: null), S3 Extended Request ID: S6EGLJzEClNo76hUUKXd+lDNc2zBXVPFPmkp3nDcC38K0g1j/HuA32WwTYJe+zeRyVNYz45ZiwU=
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206)
	at com.amazonaws.services.s3.AmazonS3Client.setObjectTagging(AmazonS3Client.java:1676)
	at com.amazonaws.s3.utils.S3TaggingHelper$.setTags(S3TaggingHelper.scala:46)
	at org.apache.spark.sql.execution.datasources.parquet.S3ParquetOutputWriter.close(S3ParquetOutputWriter.scala:14)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:58)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:75)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:280)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)
	... 9 more

I was able to identified that it probably fails when it tries to access the temporary file.

2022-01-14 09:14:29,658 DEBUG [Thread-5] handler.RequestIdLogger (RequestIdLogger.java:afterResponse(36)): https://data-ingest-data-dev.s3.amazonaws.com/?list-type=2&delimiter=%2F&max-keys=1&prefix=data-ingest_ingest_s3_csv_prihodav_curated%2Fmytable%2F.spark-staging-5d886f7b-ac47-47fd-8925-fbedccf6d287%2F&fetch-owner=false x-amz-request-id=GGFBQ5RNRV71MJPD x-amz-id-2=z+jnJHCOTRHqc8aK6xjUiemhOb5Z6Nh7a9M+5GNCTeMrTgdkfrbdNgcVWG7dpP/rim0tiWQyQK0=
2022-01-14 09:14:29,658 DEBUG [Thread-5] s3n.S3NativeFileSystem (S3NativeFileSystem.java:getFileStatus(531)): getFileStatus could not find key 'data-ingest_ingest_s3_csv_prihodav_curated/mytable/.spark-staging-5d886f7b-ac47-47fd-8925-fbedccf6d287'
2022-01-14 09:14:29,658 DEBUG [Thread-5] s3n.S3NativeFileSystem (S3NativeFileSystem.java:delete(384)): Delete called for 's3://data-ingest-data-dev/data-ingest_ingest_s3_csv_prihodav_curated/mytable/.spark-staging-5d886f7b-ac47-47fd-8925-fbedccf6d287' but file does not exist, so returning false

Could you please provide us support with this issue?

Files over 100mb are not getting tagged

Hi,
I'm trying to tag my df using s3.parquet.
What happens is when:
output file size is < 100mb - they are getting tagged
output file size is > 100mb - they are NOT getting tagged

Infra:
EMR 6.14.0
Spark 3.4.1

  df \
  .repartition(600) \
  .write \
  .partitionBy(YEAR_COLUMN, MONTH_COLUMN, DAY_COLUMN) \
  .mode("overwrite") \
  .format("s3.parquet") \
  .option("tags", tags) \
  .save(path)

LMK if any extra info is needed

Can we use the external jar on EMR Cluster and call write method using pyspark ?

Can we use the external jar on EMR Cluster and call write method using pyspark ?

2 ways ----

  1. Add path in spark-submit of jar files
    ./bin/spark-submit --jars jarfile.jar spark_script.py
    or set the enviroment variable SPARK_CLASSPATH

  2. add path in the classpath
    SPARK_CLASSPATH='/path/jarfile.jar:/path/jarfile.jar' spark_script.py

or else
after spark session
spark.sparkContext.addPyFile("/path/to/jar/jarfile.jar")

Originally posted by @simranjeet97 in #1 (comment)

Getting error while using s3 spark tagging util

Hi,

I am running spark jobs on glue 3.0 with pyspark, and spark tagging util jar file download from release page https://github.com/awslabs/amazon-s3-tagging-spark-util/releases , amazon-s3-tagging-spark-util-assembly_2.12-1.0.jar. I am passing the jar as external argument of glue job as "--extra-jars" : "s3://$BUCKET/$PREFIX/amazon-s3-tagging-spark-util-assembly_2.12-1.0.jar".

glue start job command :
$ aws glue start-job-run --job-name "CSV to CSV" --arguments='--extra-jars="s3://$BUCKET/$PREFIX/amazon-s3-tagging-spark-util-assembly_2.12-1.0.jar"'

The jar register successfully in glue job , able to see the jars in spark config ('spark.glue.extra-jars', 's3://$BUCKET/$PREFIX/amazon-s3-tagging-spark-util-assembly_2.12-1.0.jar')

First I am try to reading files from s3 bucket, and reading file successfully.

df=spark.read.csv('s3://file',header=True,inferschema=True)

and then after Writing the file back to s3

df.write
.format("s3.csv")
.option("tag", "{"ProjectTeam": "Team-A", "FileType":"parquet"}")
.save("s3://$DATA_BUCKET/$TABLE_NAME")

But getting error during the write the file:
File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328 in get_return_value
format(target_id,".",name),value)

py4j.protocol.Py4JJavaError: An error occurred while calling o165.save
: java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/csv/CSVOptions

Can you please help me out on this?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.