Git Product home page Git Product logo

s3committer's Introduction

Iceberg

I currently recommend using Iceberg tables instead of using these committers. Check out the Apache Iceberg project, apache/iceberg for details.

S3 output committers

This project has Hadoop OutputCommitter implementations for S3.

There are 3 main classes:

  • S3MultipartOutputCommitter is a base committer class that handles commit logic. This should not be used directly.
  • S3DirectoryOutputCommitter for writing unpartitioned data to S3 with conflict resolution.
  • S3PartitionedOutputCommitter for writing partitioned data to S3 with conflict resolution.

Callers should use S3DirectoryOutputCommitter for single-directory outputs, or S3PartitionedOutputCommitter for partitioned data.

Commit logic

These S3 committers work by writing task outputs to a temporary directory on the local FS. Task outputs are directed to the local FS by getTaskAttemptPath and getWorkPath.

On task commit, the committers look for files in the task attempt directory (ignoring hidden files). Each file is uploaded to S3 using the multi-part upload API, but the upload is not completed in S3. After task commit, files are not visible in S3. Instead the information for the upload is serialized to be completed during job commit. Information about a task's pending uploads is written to HDFS and committed using a FileOutputCommitter.

On job commit, the committer completes each multi-part upload started by tasks. This is safe because job commit only runs if all tasks completed successfully, and using the FileOutputCommitter to pass information to the job committer ensures that only one task attempt's output will be committed.

Conflict resolution

The single-directory and partitioned committers handle conflict resolution by checking whether target paths exist in S3 before uploading any data. There are 3 conflict resolution modes, controlled by setting s3.multipart.committer.conflict-mode:

  • fail: Fail a task if an output directory or partition already exists. (Default)
  • append: Upload data files without checking whether directories or partitions already exist.
  • replace: If an output directory exists, delete it so the new data replaces the current content.

The partitioned committer enforces the conflict mode when a conflict is detected with output data, not before the job runs. Conflict resolution differs from an output mode because it does not enforce the mode when there is no conflict. For example, overwriting a partition should remove all sub-partitions and data it contains, whether or not new output is created. Conflict resolution will only replace partitions that have output data.

When the conflict mode is replace, conflicting directories are removed during job commit. Data is only deleted if all tasks have completed successfully.

A UUID that identifies a write is added to filenames that are uploaded to S3. This allows rolling back data from jobs that fail during job commit (see failure cases below) and avoids file-level conflicts when appending data to existing directories.

Failure scenarios

Task commits delegate to the FileOutputCommitter to ensure that only one task's output reaches the job commit. If an upload fails, tasks will abort content already uploaded to S3 and will remove temporary files on the local FS. Similarly, if a task is aborted, temporary output on the local FS is removed.

If a task dies while the committer is running, it is possible for data to be left on the local FS or as unfinished parts in S3. Unfinished upload parts in S3 are not visible to table readers and are cleaned up following the rules in the target bucket's life-cycle policy.

Failures during job commit are handled by deleting any files that have already been completed and aborting the remaining uploads. Because uploads are completed individually, the files that are deleted were visible to readers.

If the process dies while the job committer is running, there are two possible failures:

  1. Some directories that would be replaced have been deleted, but no new data is visible.
  2. Some new data is visible but is not complete, and all replaced directories have been removed. Only complete files are visible.

If the process dies during job commit, cleaning up is a manual process. File names include a UUID for each write so that files can be identified and removed.

Configuration

  • s3.multipart.committer.conflict-mode -- how to resolve directory conflicts during commit: fail, append, or replace; defaults to fail.
  • s3.multipart.committer.uuid -- a UUID that identifies a write; spark.sql.sources.writeJobUUID is used if not set
  • s3.multipart.committer.upload.size -- size, in bytes, to use for parts of the upload to S3; defaults to 10MB.
  • s3.multipart.committer.num-threads -- number of threads to use to complete S3 uploads during job commit; defaults to 8.

Build

This project uses gradle. To build and test, run gradle build.

s3committer's People

Contributors

rdblue avatar scravy avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

s3committer's Issues

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException)

Hi,

I'm using this committer in spark. When I run one job everything is perfect. When start multiple jobs (from different threads) I get this error so often (not always).

`An error occurred while calling o923066.parquet.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:213)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:166)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:435)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:471)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:508)
at sun.reflect.GeneratedMethodAccessor78.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 7122.0 failed 4 times, most recent failure: Lost task 1.3 in stage 7122.0 (TID 785331, ip-10-5-6-91.dc.us-west-2.onap.io, executor 150): org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:270)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:189)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:188)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/application_1522760972051_0002/pending-uploads/_temporary/0/temporary/attempt_20180403150236_7122_m_000001_3 (inode 33261): File does not exist. Holder DFSClient_attempt_20180403145621_6558_m_000000_0-137396736_48 does not have any open files.
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3432)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3233)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3071)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3031)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)

at org.apache.hadoop.ipc.Client.call(Client.java:1475)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy34.addBlock(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418)
at sun.reflect.GeneratedMethodAccessor64.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy35.addBlock(Unknown Source)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1455)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1251)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:448)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1708)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1696)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1695)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1695)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1923)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1878)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1867)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:186)
... 44 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:270)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:189)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:188)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/application_1522760972051_0002/pending-uploads/_temporary/0/temporary/attempt_20180403150236_7122_m_000001_3 (inode 33261): File does not exist. Holder DFSClient_attempt_20180403145621_6558_m_000000_0-137396736_48 does not have any open files.
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3432)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3233)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3071)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3031)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)

at org.apache.hadoop.ipc.Client.call(Client.java:1475)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy34.addBlock(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418)
at sun.reflect.GeneratedMethodAccessor64.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy35.addBlock(Unknown Source)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1455)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1251)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:448)`

Files are uploaded, but not visible in S3

Thanks for this wonderful library. We want to try S3Committer to write Spark output directly to S3. And during our testing, we noticed that the job creates output files in /tmp/hadoop-root/mapred folder and S3 committer uploads them to S3 and says success, but these uploaded files are not visible in S3, event after few hours. My suspicion is that we may be missing some required config and went through the code, but couldn't find one. Here is the config we use in our Spark job.

--conf spark.hadoop.s3.multipart.committer.conflict-mode=APPEND --conf spark.hadoop.spark.sql.parquet.output.committer.class=com.netflix.bdp.s3.S3PartitionedOutputCommitter

Is there anything else required to be able to make this work?

Thanks,
Jegan

failing test TestMRJob

Assert.assertTrue("MR job should succeed", mrJob.waitForCompletion(true)) returns a false

java.lang.AssertionError: MR job should succeed
	at org.junit.Assert.fail(Assert.java:88)
	at org.junit.Assert.assertTrue(Assert.java:41)
	at com.netflix.bdp.s3.TestMRJob.testMRJob(TestMRJob.java:141)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64)
	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:50)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
	at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
	at org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
	at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:106)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
	at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:360)
	at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
	at org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

When I remove this assertion I get the a FileNotFoundException:

java.io.FileNotFoundException: File file:/tmp/junit3425734455093789338/committer.bin does not exist

failure when attempting to use the committer when writing into a hive table

I hated the slow nature of hive on S3 on EMR so I wanted to use this committer.
Unfortunately , I am hitting the issue shown below .

Any idea why?

Channel is closed. Failed to relay message
com.netflix.bdp.s3.AlreadyExistsException: Output path already exists: s3://perfteam/ftlitmustest/tpch_s3_tgt_102/.hive-staging_hive_2017-07-18_05-17-39_761_511309325071892267-1/-ext-10000
at com.netflix.bdp.s3.S3DirectoryOutputCommitter.commitJob(S3DirectoryOutputCommitter.java:72)
at com.infa.s3.committer.FTCustomCommitter.commitJob(FTCustomCommitter.java:33)
at org.apache.spark.sql.hive.SparkHiveWriterContainer.commitJob(hiveWriterContainers.scala:121)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.saveAsHiveFile(InsertIntoHiveTable.scala:160)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:259)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:170)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:347)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)

failing test TestS3PartitionedFileListing

Assert.assertEquals("File sets should match", expectedFiles, actualFiles);

expectedFiles have the same order as the actualFiles

java.lang.AssertionError: File sets should match expected:

<[dateint=20161115/hour=13/620acc75-14f1-44a8-8cdf-c7853d816c31.parquet, 
dateint=20161115/hour=14/059f4eab-cc21-4ab5-a4e8-ceedbc9e52cf.parquet, 
dateint=20161116/hour=13/8e87ab9d-f40f-4a5f-b61e-8f613b97fd6a.parquet, 
dateint=20161116/hour=14/af18c4db-5ade-40bc-add1-c9984c22b445.parquet]> 

but was:
<[dateint=20161116/hour=14/af18c4db-5ade-40bc-add1-c9984c22b445.parquet, 
dateint=20161116/hour=13/8e87ab9d-f40f-4a5f-b61e-8f613b97fd6a.parquet, 
dateint=20161115/hour=14/059f4eab-cc21-4ab5-a4e8-ceedbc9e52cf.parquet, 
dateint=20161115/hour=13/620acc75-14f1-44a8-8cdf-c7853d816c31.parquet]>

What Hadoop/Spark configuration should be set to use s3committer?

Hi,
The README does not state which configurations should be set to use s3committer.

I have tried using the following properties and nothing seems to work:
val conf = new SparkConf().set("spark.hadoop.mapred.output.committer.class","com.netflix.bdp.s3.S3DirectoryOutputCommitter ")
conf.set("spark.sql.sources.outputCommitterClass","com.netflix.bdp.s3.S3DirectoryOutputCommitter")
conf.set("spark.hadoop.mapreduce.use.directfileoutputcommitter","false”)
spark.sql(“SET spark.hadoop.mapred.output.committer.class= com.netflix.bdp.s3.S3DirectoryOutputCommitter”)

Please advise.

Thank you.
Daniel

Missing dependency nebula.javadoc-jar

running a gradle build will crash since its missing a dependency.

Updating the build.gradle with

buildscript {
  repositories { 
    jcenter() 
    maven {
      url "https://plugins.gradle.org/m2/"
    }
  }
  dependencies {
    classpath 'com.github.jengelman.gradle.plugins:shadow:1.2.3'
    classpath "com.netflix.nebula:nebula-publishing-plugin:4.9.1"
  }
}

Seems to fix this

Multipart upload of one large file instead of a directory with per-partition files

Just wondering if my interpretation of some relevant docs and the code here is correct:

  • it seems like uploading a single large file in parts is possible / supported by S3 (docs):

    In a distributed development environment, it is possible for your application to initiate several updates on the same object at the same time. Your application might initiate several multipart uploads using the same object key. For each of these uploads, your application can then upload parts and send a complete upload request to Amazon S3 to create the object

  • However, my reading of the code here is that s3committer doesn't support this; it only cares about jobs that output a directory containing one output file per task.

Is that all correct?

Anonymous users cannot initiate multipart uploads. Please authenticate.

Channel is closed. Failed to relay message
17/07/20 02:34:53 WARN TaskSetManager: Lost task 22.2 in stage 0.0 (TID 87, ip-172-31-22-106.us-west-2.compute.internal, executor 6): com.azonS3Exception: Anonymous users cannot initiate multipart uploads. Please authenticate. (Service: Amazon S3; Status Code: 403; Error Code:E1A0113F5ED10), S3 Extended Request ID: 7dGGvSrKI4bN9gIVjJYsPlmgF2Am5hP70eUGzJn1ihLPUBumgjPyRry1AK6f2tHg9LN+RT76ZYI=
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1389)
at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:902)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
at com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
at com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
at com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:2708)
at com.netflix.bdp.s3.S3Util.multipartUpload(S3Util.java:73)
at com.netflix.bdp.s3.S3MultipartOutputCommitter$8.run(S3MultipartOutputCommitter.java:438)
at com.netflix.bdp.s3.S3MultipartOutputCommitter$8.run(S3MultipartOutputCommitter.java:426)
at com.netflix.bdp.s3.Tasks$Builder.runSingleThreaded(Tasks.java:107)
at com.netflix.bdp.s3.Tasks$Builder.run(Tasks.java:93)

wrapped FileOutputCommitter used in the s3 committer leads to a execution failure with algorithm version [mapreduce.fileoutputcommitter.algorithm.version] of 2.

Caused by: org.apache.hadoop.ipc.RemoteException: No lease on /tmp/application_1500052999350_0147/pending-uploads/_temporary/0/temporary/attempt_20170722012104_0000_m_000003_3 (inode 84330): File does not exist. Holder DFSClient_NONMAPREDUCE-1813506953_55 does not have any open files.
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3432)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3233)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3071)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3031)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)

at org.apache.hadoop.ipc.Client.call(Client.java:1475)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy15.addBlock(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy16.addBlock(Unknown Source)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1455)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1251)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:448)
17/07/22 01:22:01 WARN TaskSetManager: Lost task 0.3 in stage 0.0 (TID 28, ip-172-31-22-106.us-west-2.compute.internal, executor 5): org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/application_1500052999350_0147/pending-uploads/_temporary/0/temporary/attempt_20170722012104_0000_m_000000_3 (inode 84328): File does not exist. Holder DFSClient_NONMAPREDUCE-1120817883_55 does not have any open files.
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3432)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3233)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3071)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3031)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.