Git Product home page Git Product logo

nomad-spark's Introduction

DEPRECATED

The Nomad Spark integration is deprecated and will no longer be maintained by HashiCorp. Spark lacks a pluggable scheduler interface, necessitating this independent fork. Maintaining this fork requires significant effort to keep current with upstream Spark, persisting correct behavior of our integration while tracking Spark's constantly-advancing feature set. Ultimately, we felt that the level of effort did not justify the result: a second-class Spark experience inconsistent with HashiCorp standards.

The Nomad team continues to be excited about the future of our ecosystem. We’re currently focusing our efforts on building first-class experiences, with features such as autoscaling and integrations with Spinnaker and Airflow on the horizon. As always, feel free to reach out on our discussion forums at https://discuss.hashicorp.com/c/nomad.

The previous source is available on the master branch.

nomad-spark's People

Contributors

cgbaker avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

nomad-spark's Issues

Passing Executor extraJavaOptions from Spark Submit

We've been attempting to collect executor metrics by running a .jar using the following configuration in our Spark submit:

--conf "spark.executor.extraJavaOptions=-javaagent:jmx_prometheus_javaagent-0.2.0.jar=7072:spark.yml"

It seems as if this configuration is being completely ignored by Nomad Spark and not being passed to the executor. We have to manually edit the job plan's SPARK_EXECUTOR_OPTS environment variable to include the javaagent statement to get the metrics to work.

Is there actually a way for us to pass this .jar to the executors from the spark submit without resorting to editing the job .json and doing an update? Moreover, if the Spark job gets killed, the environment variable is overwritten again, so doing it this way isn't very sustainable for us.

Executors do not scale down in dynamic allocation

Nomad 0.9.1
Pyspark 2.4.3

example pyspark-shell command running against Nomad cluster:

    --conf spark.nomad.sparkDistribution=local:/usr/lib/spark \
    --conf spark.dynamicAllocation.enabled=true \
    --conf spark.shuffle.service.enabled=true \
    --conf spark.dynamicAllocation.minExecutors=1 \
    --conf spark.dynamicAllocation.maxExecutors=30

In pyspark-shell load the following:

After spark.dynamicAllocation.executorIdleTimeout executors do not get killed and the following logs appear instead:

19/07/28 13:01:05 WARN NomadClusterSchedulerBackend: Ignoring request to kill 15 executor(s): {{EXECUTORS IDS SKIPPED}} (not yet implemented for Nomad)  
19/07/28 13:01:05 WARN ExecutorAllocationManager: Unable to reach the cluster manager to kill executor/s {{EXECUTORS IDS SKIPPED}} or no executor eligible to kill!

which is poiting to https://github.com/hashicorp/nomad-spark/blob/nomad-spark-2.4.3/resource-managers/nomad/src/main/scala/org/apache/spark/scheduler/cluster/nomad/NomadClusterSchedulerBackend.scala#L188 missing implementation

similar issue #20

[error] jackson-databind lib must be added manually

When i try to use the hashicorp/spark-nomad docker image (i.e. when i use cluster mode), i'm getting the following error

Exception in thread "dispatcher-event-loop-0" java.lang.NoSuchFieldError: UPPER_CAMEL_CASE
	at com.hashicorp.nomad.javasdk.NomadJson.<clinit>(NomadJson.java:26)
...
Exception in thread "main" java.lang.NoClassDefFoundError: Could not initialize class com.hashicorp.nomad.javasdk.NomadJson
	at com.hashicorp.nomad.javasdk.AllocationsApi.info(AllocationsApi.java:46)

These errors don't appear when I run directly on my spark distro (non-cluster mode). Could this be caused by a version mismatch?

full stack trace - https://pastebin.com/eDPxqNtc

Simple script to sinchronize spark mainline with spark on nomad fork

Due this fork not actively maintained, we want to contribute some of our work:

This simple script allow to quickly synchronize spark on nomad with minor spark mainline

#!/bin/bash

SPARKVERSION=2.3.4
SPARKVERSIONDIFF=-2 #SPARKONNOMADVERSION
SPARKBUILDDIR=/home/vagrant/build/spark-on-nomad-${SPARKVERSION}

# https://stackoverflow.com/questions/8653126/how-to-increment-version-number-in-a-shell-script
increment_version ()
{
  declare -a part=( ${1//\./ } )
  declare    new
  declare -i carry=$2

  for (( CNTR=${#part[@]}-1; CNTR>=0; CNTR-=1 )); do
    len=${#part[CNTR]}
    new=$((part[CNTR]+carry))
    [ ${#new} -gt $len ] && carry=1 || carry=0
    [ $CNTR -gt 0 ] && part[CNTR]=${new: -len} || part[CNTR]=${new}
  done
  new="${part[*]}"
  echo -e "${new// /.}"
}

if [ -z $SPARKONNOMADVERSION ]; then
	SPARKONNOMADVERSION=`increment_version $SPARKVERSION $SPARKVERSIONDIFF`
fi

echo Original spark: $SPARKVERSION, patches will by applyed from $SPARKONNOMADVERSION nomad spark fork

if [ -e $SPARKBUILDDIR ]; then
	echo  "Removing prev build dir"
	rm -fr $SPARKBUILDDIR
fi

git config --global user.email "[email protected]"
git config --global user.name "Playrix LLC"

mkdir -p /home/vagrant/build
git clone https://github.com/hashicorp/nomad-spark.git ${SPARKBUILDDIR}

cd ${SPARKBUILDDIR}
git remote add upstream https://github.com/apache/spark.git
git fetch upstream
git checkout origin/nomad-on-spark-${SPARKONNOMADVERSION}
git checkout -b nomad-on-spark-${SPARKVERSION}
git merge -m "merge with mainlain spark ${SPARKVERSION}" tags/v${SPARKVERSION}

cp /home/vagrant/build/spark-on-nomad/build.sh ${SPARKBUILDDIR}
chmod 0755 ${SPARKBUILDDIR}/build.sh

sed -ri "s!${SPARKONNOMADVERSION}!${SPARKVERSION}!" ${SPARKBUILDDIR}/resource-managers/nomad/pom.xml
sed -ri "s!${SPARKONNOMADVERSION}!${SPARKVERSION}!" ${SPARKBUILDDIR}/resource-managers/nomad/test-apps/pom.xml

sed -ri "s!0.8.6.1!0.7.0.2!" ${SPARKBUILDDIR}/resource-managers/nomad/pom.xml
sed -ri "s!0.8.6.1!0.7.0.2!" ${SPARKBUILDDIR}/resource-managers/nomad/test-apps/pom.xml

${SPARKBUILDDIR}/build.sh
cp ${SPARKBUILDDIR}/spark-${SPARKVERSION}-bin-nohadoop-nomad.tgz /home/vagrant/build/spark-on-nomad/

and this allow to build it build.sh (without inline hadoop, we use separate hadoop installation of version 2.9.2):

#!/bin/bash

./dev/make-distribution.sh --name nohadoop-nomad --tgz -Pmesos -Pnomad -Psparkr -Phive -Phadoop-2.7 -Phadoop-provided -Phive-thriftserver -DskipTests
#./build/mvn -Pmesos -Pnomad -Psparkr -Phive -Phadoop-2.7 -Phive-thriftserver -DskipTests clean package 

and here is our Vagrant file

# -*- mode: ruby -*-
# vi: set ft=ruby :

Vagrant.configure("2") do |config|
	config.vm.box = "ubuntu/xenial64"

	config.vm.define "build-spark-on-nomad" do |node|
		node.vm.hostname = "build-spark-on-nomad"
		node.vm.synced_folder "./", "/home/vagrant/build/spark-on-nomad/"

		node.vm.provider "virtualbox" do |vb|
			vb.memory = "4096"
		end

		node.vm.provision "shell", inline: <<-SHELL
			add-apt-repository ppa:webupd8team/java -y
			echo oracle-java8-installer shared/accepted-oracle-license-v1-1 select true | /usr/bin/debconf-set-selections
			apt-get update
			apt-get install -y oracle-java8-installer
			apt-key adv --keyserver keyserver.ubuntu.com --recv-keys E298A3A825C0D65DFD57CBB651716619E084DAB9
			add-apt-repository 'deb [arch=amd64,i386] https://cran.rstudio.com/bin/linux/ubuntu xenial/'
			apt-get update
			apt-get install -y r-base
			apt-get install -y git
			apt-get install -y mc
			ln -s "$(which python3)" /usr/local/python
		SHELL
	end
end

Is it abandoned?

Just wondered, if nomad-spark officially abandoned?
If so, could you please elaborate on reasons, it is really interesting

spark-submit (python) reports "Error: Unrecognized option: --driver-class-path="

spark-submit --master nomad:{addr of master} --deploy-mode cluster --conf spark.nomad.sparkDistribution=https://s3.amazonaws.com/nomad-spark/spark-2.1.0-bin-nomad.tgz --py-files {addr of hello.py} {addr of hello.py}

where hello.py just print("hello")

The batch job got scheduled by Nomad but then the container exited with code 1. The logs (nomad logs -stderr) show the following error:

Error: Unrecognized option: --driver-class-path=

Usage: spark-submit [options] <app jar | python file> [app arguments]
Usage: spark-submit --kill [submission ID] --master [spark://...]
Usage: spark-submit --status [submission ID] --master [spark://...]
Usage: spark-submit run-example [options] example-class [example args]

Options:
.......

As a comparison, the java example works with our setup:

spark-submit --master nomad:{addr of master} --deploy-mode cluster --conf spark.nomad.sparkDistribution=https://s3.amazonaws.com/nomad-spark/spark-2.1.0-bin-nomad.tgz --class org.apache.spark.examples.JavaSparkPi https://s3.amazonaws.com/nomad-spark/spark-examples_2.11-2.1.0-SNAPSHOT.jar

Executors still queued although job computation finished in dynamic allocation and exhausted resources

Nomad 0.9.1
Pyspark 2.4.3

example pyspark-shell command running against Nomad cluster:

    --conf spark.nomad.sparkDistribution=local:/usr/lib/spark \
    --conf spark.dynamicAllocation.enabled=true \
    --conf spark.shuffle.service.enabled=true \
    --conf spark.dynamicAllocation.minExecutors=1

In pyspark-shell load the following:

When NOT setting spark.dynamicAllocation.maxExecutors and cluster exhausts resources, executors are still queued although all calculation finished.

image

Kerberos support in nomad mode

Does the Nomad mode support Kerberos?

When I try to run a Spark job that reads from a kerberized HDFS cluster, I get the following error:

py4j.protocol.Py4JJavaError: An error occurred while calling o31.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.149.251.90, executor 2fa1c572-4942-56ad-30a3-1b53dc30194a-1545137273210): java.io.IOException: Failed on local exception: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host is: "/"; destination host is: "":8020;

On https://spark.apache.org/docs/latest/security.html#kerberos I read that "Delegation token support is currently only supported in YARN and Mesos modes." Does this mean that Kerberos is not supported in Nomad mode?

Using dynamic allocation hangs Spark job after autoscaling down Spark workers

Hi there I have been trying out nomad-spark for both versions v2.3.2 and v2.4.0 for Nomad 0.8.6 for dynamic allocation, but kept encountering issues when Nomad tries to perform auto-downscaling after the Spark executors have been idling for some time.

Basically what happened was that Nomad was able to auto-upscale fine when I ran spark-shell to read some large parquet files, e.g. 1 executor -> 5 executors pretty quickly, and the Spark job can be completed normally. Other jobs can also be completed if I were to purposely keep the executors busy.

However once I leave it for some time (maybe 60s based on my executorIdleTimeout and cachedExecutorIdleTimeout settings?), I believe Nomad tries to auto-downscale the number of executors, and I get the following warning messages within spark-shell:

19/01/26 16:34:32 WARN NomadClusterSchedulerBackend: Ignoring request to kill 1 executor(s): 3c508b0a-c46d-f8a4-6f52-99d28a4d250a-1548520286383 (not yet implemented for Nomad)
19/01/26 16:34:32 WARN ExecutorAllocationManager: Unable to reach the cluster manager to kill executor/s 3c508b0a-c46d-f8a4-6f52-99d28a4d250a-1548520286383 or no executor eligible to kill!
19/01/26 16:34:34 WARN NomadClusterSchedulerBackend: Ignoring request to kill 1 executor(s): 92755867-efb3-3abe-429a-1015c697a624-1548520286556 (not yet implemented for Nomad)
19/01/26 16:34:34 WARN ExecutorAllocationManager: Unable to reach the cluster manager to kill executor/s 92755867-efb3-3abe-429a-1015c697a624-1548520286556 or no executor eligible to kill!

I understand that from the source code, NomadClusterSchedulerBackend.scala doesn't actually implement the down-scaling part in doKillExecutors method (although it would really have been preferable to truly downscale the executors too), but strangely what happens after that new Spark jobs can still be submitted to the Spark driver and be observed via the Spark Web UI, but the so-called killed (but not killed) executors never pick up the new jobs any more, and neither will Nomad spawn new executors to run the new jobs, and so basically all new jobs will just pend forever.

Any anyone encountered this issue and know what could be the issue? Thank you.

The following are some of my configs (some are just consul-template values):

spark-defaults.conf

spark.master                                       nomad
spark.driver.bindAddress                           0.0.0.0
spark.driver.host                                  {{ with node }}{{ .Node.Address }}{{ end }}

spark.sql.catalogImplementation                    hive
spark.shuffle.service.enabled                      true
spark.dynamicAllocation.enabled                    true
spark.dynamicAllocation.initialExecutors           1
spark.dynamicAllocation.minExecutors               1
spark.dynamicAllocation.maxExecutors               50

spark.dynamicAllocation.executorIdleTimeout        60s
spark.dynamicAllocation.cachedExecutorIdleTimeout  60s

spark.executor.instances                           1
spark.executor.cores                               2
spark.executor.memory                              3g

spark.nomad.dockerImage                            {{ key "${spark_nomad_docker_image_key}" }}
spark.nomad.sparkDistribution                      {{ key "${spark_nomad_spark_distribution_key}" }}
spark.nomad.datacenters                            {{ key "${spark_nomad_datacenters_key}" }}
spark.nomad.job.template                           {{ key "${spark_nomad_job_template_key}" }}

job_template.json

{
    "Job": {
        "Constraints": [
            {
                "LTarget": "$${node.class}",
                "RTarget": "${node_class}",
                "Operand": "="
            }
        ],

        "Datacenters": ${az},
        "Region": "${region}",
        "Type": "service"
    }
}

spark 2.3.1

Hello

Is any plans to do this? or this project is dead?

Problem with dynamic allocation and external shuffle service

If we enable dynamic allocation and external shuffle service we get following error:

Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

We are able to run the below job only with dynamic allocation and external shuffle service disabled:

spark-submit --class com.company.app.runners.SingleContainerRunner --master nomad --deploy-mode cluster --conf spark.nomad.sparkDistribution=http://internal.repo/spark-2.3.2-bin-nomad-0.8.6-20181220.tgz --files http://internal.repo/app.yaml,http://internal.repo/app.properties,http://internal.repo/app-interactive-session.yaml http://internal.repo/app-executors-1.0.4-runtime.jar -container interactive -properties rel:app.properties -config rel:app.yaml

setExecutorCount doesn't allow set group count to 0, so spark job can't stop naturally

In last changes setExecutorCount was changed as:

  def setExecutorCount(count: Int): Unit = {
    jobManipulator.updateJob(startIfNotYetRunning = count > 0) { job =>
      val executorGroup = SparkNomadJob.find(job, ExecutorTaskGroup).get
      if (count > 0 && executorGroup.getCount < count) {
        executorGroup.setCount(count)
      }
    }
  }

but this is wrong because, no any chance to set nomad task group count to 0(this used to stop spark job), and right changes should be as follow

  def setExecutorCount(count: Int): Unit = {
    jobManipulator.updateJob(startIfNotYetRunning = count > 0) { job =>
      val executorGroup = SparkNomadJob.find(job, ExecutorTaskGroup).get
      if (count > 0 && executorGroup.getCount < count) {
        executorGroup.setCount(count)
      } else if (count == 0) {
        executorGroup.setCount(0)
      }
    }
  }

[bug] driver task group must be 'driver' for job monitoring to work correctly

I was noticing that my scheduling method (the one that calls spark-submit -monitorUntil=complete was never successfully detecting the job was complete. I traced this down to me using an old default template for submitting spark-nomad jobs, which has the driver a group name as driver-group-name.

I noticed that this is corrected in the template provided here: https://www.nomadproject.io/guides/spark/customizing.html

Perhaps the documentation should be updated to state that the group being named driver is a requirement for this to work. My impression from the docs is that only the appropriate task meta is a requirement.

I tracked it down to hard coded driver strings here

jobUtils.pollTaskGroupAllocation(evaluation.getJobId, "driver", waitForever) {

jobUtils.traceTask(evaluation.getJobId, "driver", "driver", waitForever) {

Executor page does not display on 2.2.0

When running a spark streaming application on nomad 2.2.0 the executor page does not display. When running the same application on 2.1.0, it displays as expected.

Upon inspecting the network calls, it seems the <service>/api/v1/applications returns the following exception.

HTTP ERROR 500
Problem accessing /api/v1/applications. Reason:

    Server Error
Caused by:
java.lang.NullPointerException
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388)
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341)
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228)
	at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:845)
	at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583)
	at org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
	at org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511)
	at org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
	at org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
	at org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:461)
	at org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
	at org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
	at org.spark_project.jetty.server.Server.handle(Server.java:524)
	at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:319)
	at org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:253)
	at org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273)
	at org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:95)
	at org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
	at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
	at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
	at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
	at org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
	at org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
	at java.lang.Thread.run(Thread.java:748)

Add abylity to pass driver Env vars in cluster mode

Now this is not prossible, despite mesos,k8s,yarn can do this throw follow params

spark.mesos.driverEnv.<ENVVAR> ----> mesos
spark.kubernetes.driverEnv.<ENVVAR> ---> k8s
spark.yarn.appMasterEnv.<ENVVAR> ---> yarn

With follow little patch we add like params to nomad (spark.nomad.driverEnv.<ENVVAR>)

From 0cc50b06410ee425017f34838bfe0967ddc4358d Mon Sep 17 00:00:00 2001
From: tantra35 <[email protected]>
Date: Thu, 19 Sep 2019 11:58:41 +0300
Subject: [PATCH] add ability to pass driver Env vars in cluster mode

---
 .../org/apache/spark/scheduler/cluster/nomad/DriverTask.scala  | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/resource-managers/nomad/src/main/scala/org/apache/spark/scheduler/cluster/nomad/DriverTask.scala b/resource-managers/nomad/src/main/scala/org/apache/spark/scheduler/cluster/nomad/DriverTask.scala
index b691e75c35..2f24802bf9 100644
--- a/resource-managers/nomad/src/main/scala/org/apache/spark/scheduler/cluster/nomad/DriverTask.scala
+++ b/resource-managers/nomad/src/main/scala/org/apache/spark/scheduler/cluster/nomad/DriverTask.scala
@@ -33,6 +33,7 @@ import org.apache.spark.scheduler.cluster.nomad.SparkNomadJob.{JOB_TEMPLATE, SPA
 import org.apache.spark.util.Utils
 
 private[spark] object DriverTask extends SparkNomadTaskType("driver", "driver", DRIVER_MEMORY) {
+  private val NOMAD_DRIVER_ENV_KEY = "spark.nomad.driverEnv."
 
   private val PROPERTIES_NOT_TO_FORWARD = scala.collection.Set(
     // spark: not appropriate/relevant
@@ -69,6 +70,8 @@ private[spark] object DriverTask extends SparkNomadTaskType("driver", "driver",
 
     super.configure(jobConf, conf, task, ports, "spark-submit")
 
+    conf.getAllWithPrefix(NOMAD_DRIVER_ENV_KEY).toSeq.foreach((task.addEnv _).tupled)
+
     val additionalJarUrls = Utils.getUserJars(conf).map(asFileIn(jobConf, task))
     if (additionalJarUrls.nonEmpty) {
       conf.set("spark.jars", additionalJarUrls.mkString(","))
-- 
2.23.0.windows.1

Distributing this fork as diff or patch

This allow very quickly build custom spark distributions with required versions of spark, and take to account mainline(for example spark 2.3.1 already upcoming, but this fork not ready for it)

Feature Request: Support Rkt containers.

rkt v 1.30 has just been released, and it would be great to be able to use Rkt containers to launch spark cluster. Something like:

spark.nomad.rktImage=taqtiqa.io/spark

Initial assumption could be that insecure images are not supported, and the image has been trusted by something like:

rkt trust --prefix=taqtiqa.io/spark

Seems that nomad on spark broken when build with nomad-scala-sdk 0.8.6.1

When we build lastest version spark on nomad when we launch spark job we got follow exeption

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: com.hashicorp.nomad.javasdk.ResponseParsingException: Unable to parse [simple type, class com.hashicorp.nomad.apimodel.Node] from response body JSON: {"Attributes":{"unique.consul.name":"ip-172-24-0-115","consul.version":"1.3.1","platform.aws.placement.availability-zone":"us-east-1a","unique.platform.aws.local-ipv4":"172.24.0.115","driver.docker.version":"18.09.2","driver.exec":"1","vault.version":"0.10.4","cpu.arch":"amd64","driver.docker":"1","nomad.revision":"fcc4149c55399eb4979cd3fe3fb983cfe6c8449a+CHANGES","consul.server":"false","unique.storage.volume":"/dev/xvda1","kernel.name":"linux","driver.docker.volumes.enabled":"1","unique.cgroup.mountpoint":"/sys/fs/cgroup","platform.aws.instance-type":"t2.xlarge","cpu.totalcompute":"9200","os.version":"16.04","nomad.version":"0.8.6","vault.accessible":"true","unique.platform.aws.local-hostname":"ip-172-24-0-115.ec2.internal","os.signals":"SIGBUS,SIGCONT,SIGALRM,SIGTRAP,SIGTSTP,SIGINT,SIGHUP,SIGSEGV,SIGSYS,SIGTTIN,SIGUSR1,SIGXCPU,SIGXFSZ,SIGFPE,SIGKILL,SIGQUIT,SIGIO,SIGCHLD,SIGILL,SIGSTOP,SIGTTOU,SIGWINCH,SIGABRT,SIGPIPE,SIGUSR2,SIGIOT,SIGPROF,SIGURG,SIGTERM","cpu.numcores":"4","unique.platform.aws.hostname":"ip-172-24-0-115.ec2.internal","kernel.version":"4.4.0-1075-aws","cpu.frequency":"2300","os.name":"ubuntu","vault.cluster_id":"61e38844-c8f3-eb77-1aa2-dc40390a5039","consul.datacenter":"amrv01","unique.storage.bytestotal":"16586264576","consul.revision":"f2b13f302","unique.network.ip-address":"172.24.0.115","vault.cluster_name":"vault-cluster-1546079e","memory.totalbytes":"16825647104","cpu.modelname":"Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz","unique.storage.bytesfree":"14180642816","unique.hostname":"ip-172-24-0-115","unique.platform.aws.instance-id":"i-0b3259c60b40c49f2","platform.aws.ami-id":"ami-0eace97d273ebb5c0","driver.docker.bridge_ip":"172.17.0.1","driver.raw_exec":"1"},"ComputedClass":"v1:10821272756959570835","CreateIndex":5370372,"Datacenter":"jupiterhub","Drain":false,"DrainStrategy":null,"Drivers":{"qemu":{"Attributes":null,"Detected":false,"HealthDescription":"Driver qemu is not detected","Healthy":false,"UpdateTime":"2019-03-19T23:19:03.629626062+03:00"},"rkt":{"Attributes":null,"Detected":false,"HealthDescription":"Driver rkt is not detected","Healthy":false,"UpdateTime":"2019-03-19T23:20:03.63414575+03:00"},"docker":{"Attributes":{"driver.docker.volumes.enabled":"1","driver.docker.bridge_ip":"172.17.0.1","driver.docker.version":"18.09.2"},"Detected":true,"HealthDescription":"Driver is available and responsive","Healthy":true,"UpdateTime":"2019-03-19T23:20:03.637791458+03:00"},"exec":{"Attributes":null,"Detected":true,"HealthDescription":"Driver exec is detected","Healthy":true,"UpdateTime":"2019-03-19T23:20:03.637143678+03:00"},"raw_exec":{"Attributes":null,"Detected":true,"HealthDescription":"Driver raw_exec is detected","Healthy":true,"UpdateTime":"2019-03-19T23:19:03.635327084+03:00"},"java":{"Attributes":null,"Detected":false,"HealthDescription":"Driver java is not detected","Healthy":false,"UpdateTime":"2019-03-19T23:19:03.635446043+03:00"}},"Events":[{"CreateIndex":0,"Details":null,"Message":"Node registered","Subsystem":"Cluster","Timestamp":"2019-03-19T23:19:03+03:00"}],"HTTPAddr":"172.24.0.115:4646","ID":"87a9491f-14ff-5465-8752-88fea65840ff","Links":{"consul":"amrv01.ip-172-24-0-115","aws.ec2":"us-east-1a.i-0b3259c60b40c49f2"},"Meta":{"CONSUL_ADDR":"172.24.0.115"},"ModifyIndex":5370381,"Name":"ip-172-24-0-115","NodeClass":"jupiterhub-dockerworker","Reserved":{"CPU":100,"DiskMB":0,"IOPS":0,"MemoryMB":256,"Networks":null},"Resources":{"CPU":9200,"DiskMB":13523,"IOPS":0,"MemoryMB":16046,"Networks":[{"CIDR":"172.24.0.115/32","Device":"eth0","DynamicPorts":null,"IP":"172.24.0.115","MBits":750,"ReservedPorts":null}]},"SchedulingEligibility":"eligible","SecretID":"","Status":"ready","StatusDescription":"","StatusUpdatedAt":1553027073,"TLSEnabled":false}
	at com.hashicorp.nomad.javasdk.JsonParser.extractValue(JsonParser.java:26)
	at com.hashicorp.nomad.javasdk.ResponseAdapter.apply(ResponseAdapter.java:32)
	at com.hashicorp.nomad.javasdk.NomadApiClient.execute(NomadApiClient.java:376)
	at com.hashicorp.nomad.javasdk.ApiBase.executeServerQueryRaw(ApiBase.java:203)
	at com.hashicorp.nomad.javasdk.ApiBase.executeServerQuery(ApiBase.java:116)
	at com.hashicorp.nomad.javasdk.ApiBase.executeServerQuery(ApiBase.java:101)
	at com.hashicorp.nomad.javasdk.NodesApi.info(NodesApi.java:102)
	at com.hashicorp.nomad.scalasdk.ScalaNodesApi.info(ScalaNodesApi.scala:43)
	at org.apache.spark.scheduler.cluster.nomad.NomadJobManipulator.fetchLogUrlsForTask(NomadJobManipulator.scala:63)
	at org.apache.spark.scheduler.cluster.nomad.SparkNomadJobController$$anonfun$fetchDriverLogUrls$1$$anonfun$apply$1.apply(SparkNomadJobController.scala:54)
	at org.apache.spark.scheduler.cluster.nomad.SparkNomadJobController$$anonfun$fetchDriverLogUrls$1$$anonfun$apply$1.apply(SparkNomadJobController.scala:53)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.scheduler.cluster.nomad.SparkNomadJobController$$anonfun$fetchDriverLogUrls$1.apply(SparkNomadJobController.scala:53)
	at org.apache.spark.scheduler.cluster.nomad.SparkNomadJobController$$anonfun$fetchDriverLogUrls$1.apply(SparkNomadJobController.scala:52)
	at scala.Option.flatMap(Option.scala:171)
	at org.apache.spark.scheduler.cluster.nomad.SparkNomadJobController.fetchDriverLogUrls(SparkNomadJobController.scala:52)
	at org.apache.spark.scheduler.cluster.nomad.NomadClusterSchedulerBackend.getDriverLogUrls(NomadClusterSchedulerBackend.scala:98)
	at org.apache.spark.SparkContext.postApplicationStart(SparkContext.scala:2384)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:556)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.databind.exc.InvalidFormatException: Can not construct instance of java.util.Date from String value ("2019-03-19T23:19:03.629626062+03:00"): not a valid representation (error: Failed to parse Date value '2019-03-19T23:19:03.629626062+03:00': Can not parse date "2019-03-19T23:19:03.629626062+0300": while it seems to fit format 'yyyy-MM-dd'T'HH:mm:ss.SSSZ', parsing fails (leniency? null))
 at [Source: {"Attributes":{"unique.consul.name":"ip-172-24-0-115","consul.version":"1.3.1","platform.aws.placement.availability-zone":"us-east-1a","unique.platform.aws.local-ipv4":"172.24.0.115","driver.docker.version":"18.09.2","driver.exec":"1","vault.version":"0.10.4","cpu.arch":"amd64","driver.docker":"1","nomad.revision":"fcc4149c55399eb4979cd3fe3fb983cfe6c8449a+CHANGES","consul.server":"false","unique.storage.volume":"/dev/xvda1","kernel.name":"linux","driver.docker.volumes.enabled":"1","unique.cgroup.mountpoint":"/sys/fs/cgroup","platform.aws.instance-type":"t2.xlarge","cpu.totalcompute":"9200","os.version":"16.04","nomad.version":"0.8.6","vault.accessible":"true","unique.platform.aws.local-hostname":"ip-172-24-0-115.ec2.internal","os.signals":"SIGBUS,SIGCONT,SIGALRM,SIGTRAP,SIGTSTP,SIGINT,SIGHUP,SIGSEGV,SIGSYS,SIGTTIN,SIGUSR1,SIGXCPU,SIGXFSZ,SIGFPE,SIGKILL,SIGQUIT,SIGIO,SIGCHLD,SIGILL,SIGSTOP,SIGTTOU,SIGWINCH,SIGABRT,SIGPIPE,SIGUSR2,SIGIOT,SIGPROF,SIGURG,SIGTERM","cpu.numcores":"4","unique.platform.aws.hostname":"ip-172-24-0-115.ec2.internal","kernel.version":"4.4.0-1075-aws","cpu.frequency":"2300","os.name":"ubuntu","vault.cluster_id":"61e38844-c8f3-eb77-1aa2-dc40390a5039","consul.datacenter":"amrv01","unique.storage.bytestotal":"16586264576","consul.revision":"f2b13f302","unique.network.ip-address":"172.24.0.115","vault.cluster_name":"vault-cluster-1546079e","memory.totalbytes":"16825647104","cpu.modelname":"Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz","unique.storage.bytesfree":"14180642816","unique.hostname":"ip-172-24-0-115","unique.platform.aws.instance-id":"i-0b3259c60b40c49f2","platform.aws.ami-id":"ami-0eace97d273ebb5c0","driver.docker.bridge_ip":"172.17.0.1","driver.raw_exec":"1"},"ComputedClass":"v1:10821272756959570835","CreateIndex":5370372,"Datacenter":"jupiterhub","Drain":false,"DrainStrategy":null,"Drivers":{"qemu":{"Attributes":null,"Detected":false,"HealthDescription":"Driver qemu is not detected","Healthy":false,"UpdateTime":"2019-03-19T23:19:03.629626062+03:00"},"rkt":{"Attributes":null,"Detected":false,"HealthDescription":"Driver rkt is not detected","Healthy":false,"UpdateTime":"2019-03-19T23:20:03.63414575+03:00"},"docker":{"Attributes":{"driver.docker.volumes.enabled":"1","driver.docker.bridge_ip":"172.17.0.1","driver.docker.version":"18.09.2"},"Detected":true,"HealthDescription":"Driver is available and responsive","Healthy":true,"UpdateTime":"2019-03-19T23:20:03.637791458+03:00"},"exec":{"Attributes":null,"Detected":true,"HealthDescription":"Driver exec is detected","Healthy":true,"UpdateTime":"2019-03-19T23:20:03.637143678+03:00"},"raw_exec":{"Attributes":null,"Detected":true,"HealthDescription":"Driver raw_exec is detected","Healthy":true,"UpdateTime":"2019-03-19T23:19:03.635327084+03:00"},"java":{"Attributes":null,"Detected":false,"HealthDescription":"Driver java is not detected","Healthy":false,"UpdateTime":"2019-03-19T23:19:03.635446043+03:00"}},"Events":[{"CreateIndex":0,"Details":null,"Message":"Node registered","Subsystem":"Cluster","Timestamp":"2019-03-19T23:19:03+03:00"}],"HTTPAddr":"172.24.0.115:4646","ID":"87a9491f-14ff-5465-8752-88fea65840ff","Links":{"consul":"amrv01.ip-172-24-0-115","aws.ec2":"us-east-1a.i-0b3259c60b40c49f2"},"Meta":{"CONSUL_ADDR":"172.24.0.115"},"ModifyIndex":5370381,"Name":"ip-172-24-0-115","NodeClass":"jupiterhub-dockerworker","Reserved":{"CPU":100,"DiskMB":0,"IOPS":0,"MemoryMB":256,"Networks":null},"Resources":{"CPU":9200,"DiskMB":13523,"IOPS":0,"MemoryMB":16046,"Networks":[{"CIDR":"172.24.0.115/32","Device":"eth0","DynamicPorts":null,"IP":"172.24.0.115","MBits":750,"ReservedPorts":null}]},"SchedulingEligibility":"eligible","SecretID":"","Status":"ready","StatusDescription":"","StatusUpdatedAt":1553027073,"TLSEnabled":false}; line: 1, column: 1973] (through reference chain: com.hashicorp.nomad.apimodel.Node["Drivers"]->java.util.LinkedHashMap["qemu"]->com.hashicorp.nomad.apimodel.DriverInfo["UpdateTime"])
	at com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java:74)
	at com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1022)
	at com.fasterxml.jackson.databind.deser.std.StdDeserializer._parseDate(StdDeserializer.java:788)
	at com.fasterxml.jackson.databind.deser.std.DateDeserializers$DateBasedDeserializer._parseDate(DateDeserializers.java:175)
	at com.fasterxml.jackson.databind.deser.std.DateDeserializers$DateDeserializer.deserialize(DateDeserializers.java:261)
	at com.fasterxml.jackson.databind.deser.std.DateDeserializers$DateDeserializer.deserialize(DateDeserializers.java:245)
	at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:495)
	at com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:95)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:260)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:125)
	at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringMap(MapDeserializer.java:496)
	at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:342)
	at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:26)
	at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:495)
	at com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:95)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:260)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:125)
	at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3807)
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2835)
	at com.hashicorp.nomad.javasdk.NomadJson.deserialize(NomadJson.java:105)
	at com.hashicorp.nomad.javasdk.JsonParser.extractValue(JsonParser.java:24)
	... 30 more

As you may see sdk cant parse field with type Date, because it try to apply format
yyyy-MM-dd'T'HH:mm:ss.SSSZ, to string 2019-03-19T23:19:03.629626062+03:00, but as we can see in doc https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html, last symbol in format string Z will pars only strings like this +0300 whish is differs from what we actualy get from nomad +03:00, so real format string must looks like this yyyy-MM-dd'T'HH:mm:ss.SSSX. So we made a conclusion, that nomad-scala-sdk 0.8.6 is broken. As workaround for this we simple downgrade nomad scala sdk to version 0.7.0.2

When job with dynamic resource allocations are near to finish many ERROR TaskSchedulerImpl: Lost executor appears

when we use follow spark conf options in spark-submit(i.e. use dynamic allocations)

spark.dynamicAllocation.enabled          true
spark.dynamicAllocation.initialExecutors  2
spark.shuffle.service.enabled            true
spark.executor.instances                  2 

When job near at it finish we see many

ERROR TaskSchedulerImpl: Lost executor

errors. As a result spark begins lost tasks

WARN TaskSetManager: Lost task 371.3 in stage

and sometimes spark-job will fail due lost task after 4 attempts (default for spark)
As i understand this happens because at near job finish moment, nomad(through spark) begins shrink spark-executors pool, but spark at this moment still placing task on spark-executors, as result some times may happen that some spark task exceeded all attempts when spark try to place them on killed executors

For now workaround for us is increase spark.task.maxFailures to 20, or doesn't use dynamic resource allocations, due nomad can't shrink job by simple remove allocations by id

It's not using my docker driver settings if i don't include --conf spark.nomad.dockerImage

I would like to use a docker driver stanza to set my image information, but if i don't include --conf spark.nomad.dockerImage in my spark-submit it acts like it's trying to run directly on a nomad resource node. For example it will just repeat the following over and over:

18/07/02 15:28:34 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

More details:
I have a template that looks something like this:

job "template" {
  meta {
    "spark.nomad.role" = "application"
  }
  group "executor-template" {
    task "executor-template" {
      meta {
        "spark.nomad.role" = "executor"
      }
      driver = "docker"
      config {
        image = "java"
        auth{
          server_address ="hub.docker.com"
        }
      }
    }
  }

i want to be able to just run:

spark-submit \
--class org.apache.spark.examples.JavaSparkPi \
--master nomad
--conf spark.nomad.datacenters=us-east-1a,us-east-1b,us-east-1c \
--conf spark.nomad.job.template=/spark.json \
--conf spark.nomad.sparkDistribution=https://github.com/hashicorp/nomad-spark/releases/download/v2.3.0-nomad-0.7.0-20180618/spark-2.3.0-bin-nomad-0.7.0-20180618.tgz \
--verbose \
spark-examples_2.11-2.1.0-SNAPSHOT.jar 

but as said that doesn't work and i'm forced to instead run:

spark-submit \
--class org.apache.spark.examples.JavaSparkPi \
--master nomad \
--conf spark.nomad.docker.serverAddress=hub.docker.com \
--conf spark.nomad.dockerImage=java \
--conf spark.nomad.datacenters=us-east-1a,us-east-1b,us-east-1c \
--conf spark.nomad.job.template=/spark.json \
--conf spark.nomad.sparkDistribution=https://github.com/hashicorp/nomad-spark/releases/download/v2.3.0-nomad-0.7.0-20180618/spark-2.3.0-bin-nomad-0.7.0-20180618.tgz \
--verbose \
spark-examples_2.11-2.1.0-SNAPSHOT.jar 

This will overwrite anything i have set in the docker driver stanza which means that i can not use any type of interpolation.

[question] [feature-request] Passing vault-token to spark-submit

I've been using the (wonderful) spark-nomad featureset, and I've hit a bit of a stumbling block. I want to be able to pass a vault token to my job to enable policy blocks in the template file. for example, my template

job "template" {
    group "executor-group-name" {
        task "executor-task-name" {
            # grant this task the permission to access certain policies
            # this will hopefully inject a VAULT_TOKEN environment var into each jvm executor
            vault {
                policies = ["s3-databucket-readonly"]
            }
            meta {
                "spark.nomad.role" = "executor" 
            } 
        } 
    }
}

And I schedule my job via

/usr/local/spark/bin/spark-submit --class com.demo.spark.ReadDataFromS3 \
    --master nomad:http://nomad.service.consul:4646 \
    --conf spark.nomad.sparkDistribution=local:///usr/local/spark   \
    --conf spark.nomad.datacenters=dc1  \
    --conf spark.nomad.job.template=/home/ubuntu/test.template \
    /opt/deployments/sparkjobs-assembly-1.0.jar

However, I couldn't see an option to include a VAULT_TOKEN into this method. Perhaps I can suggest something like

--conf spark.nomad.vaultToken=<TOKEN>

[request] Provide source for dockerfile

I was wondering if someone could release the dockerfile for hashicorp/spark-nomad? I've having some issues due to different versions of driver/executor distros.

Executors killed prematurely in dynamic allocation

Nomad 0.9.1
Pyspark 2.4.3

example pyspark-shell command running against Nomad cluster:

    --conf spark.nomad.sparkDistribution=local:/usr/lib/spark \
    --conf spark.dynamicAllocation.enabled=true \
    --conf spark.shuffle.service.enabled=true

In pyspark-shell load the following:

Behavior does not follow documentation https://www.nomadproject.io/guides/analytical-workloads/spark/dynamic.html, mainly When the executor exits, the shuffle service continues running so that it can serve any results produced by the executor.

Nearly immediately when executors are done with execution all executors get cancelled ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL INT and all shuffle services get cancelled ERROR ExternalShuffleService: RECEIVED SIGNAL INT

This causes many logs like these:

19/07/28 13:08:11 ERROR TaskSchedulerImpl: Lost executor 979643d9-7e5e-4d76-6f5d-bb10c39ba767-1564319271483 on 173.208.60.207: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
19/07/28 13:08:11 WARN TaskSetManager: Lost task 4.0 in stage 6.3 (TID 2861, 173.208.60.207, executor 979643d9-7e5e-4d76-6f5d-bb10c39ba767-1564319271483): ExecutorLostFailure (executor 979643d9-7e5e-4d76-6f5d-bb10c39ba767-1564319271483 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
19/07/28 13:08:11 ERROR TaskSchedulerImpl: Lost executor 1f0a3afe-a317-4b5d-38cf-2ab49b57799b-1564319271165 on 173.208.60.207: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

Executors are dying because they cannot access shuffle services.
All persistent data is lost.

To properly replicate, in most of the cases, the last command df.show() needs to be repeated.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.