Git Product home page Git Product logo

raydp's Introduction

RayDP provides simple APIs for running Spark on Ray and integrating Spark with AI libraries, making it simple to build distributed data and AI pipeline in a single python program.

INTRODUCTION

Problem Statement

A large-scale AI workflow usually involves multiple systems, for example Spark for data processing and PyTorch or Tensorflow for distributed training. A common setup is to use two separate clusters and stitch together multiple programs using glue code or a workflow orchestrator such as AirFlow or KubeFlow. However, in many cases this adds costs in terms of system efficiency and operations. The setup overhead of the workflow tasks adds latency. Data exchange among frameworks has to rely on external storage system which also adds latency. On operation side, managing two separate clusters introduces additional cost. Writing the pipeline using workflow orchestrator usually is also more complex than writing a single python program.

Solution with Ray and RayDP

To solve the above challenges, more and more companies have adopted Ray as a single substrate for data processing, model training, serving and more. Ray makes it simple to build the data and AI pipeline in a single python program and scale from laptop to a cluster seamlessly. Ray has built a rich ecosystem by providing high quality libraries and integrating with other popular ones.

Spark as a popular big data framework plays an important role in data and AI pipelines. RayDP brings Spark to the Ray ecosystem by supporting running Spark on top of Ray. By using RayDP, you can easily write PySpark code together with other Ray libraries in the same python program which improves productivity and expressivity. RayDP makes it simple to build distributed end-to-end data analytics and AI pipeline. RayDP supports exchanging data between Spark and other frameworks using Ray's in-memory object to provide best performance.

Who will use RayDP

  • ML infrastructure team can build a modern ML platform on top of Ray, utilize RayDP to run Spark on Ray and unify with other AI components.
  • Data scientists can use RayDP to write PySpark code together with other AI libraries, scale from laptop to cloud seamlessly.
  • Data engineers can use RayDP to run on-demand Spark job in cloud without a need to setup a Spark cluster manually. The Ray cluster launcher helps to start a Ray cluster in cloud and RayDP allows you to run Spark in that cluster with auto scaling.

Presentations

ARCHITECTURE

RayDP provides simple APIs for running Spark on Ray and APIs for converting a Spark DataFrame to a Ray Dataset which can be consumed by XGBoost, Ray Train, Horovod on Ray, etc. RayDP also provides high level scikit-learn style Estimator APIs for distributed training with PyTorch or Tensorflow.

RayDP supports Ray as a Spark resource manager and runs Spark executors in Ray actors. The communication between Spark executors still uses Spark's internal protocol.

image

QUICK START

Installation

You can install latest RayDP using pip. RayDP requires Ray and PySpark. Please also make sure java is installed and JAVA_HOME is set properly.

pip install raydp

Or you can install RayDP nightly build:

pip install --pre raydp

NOTICE: formerly used raydp-nightly will no longer be updated.

If you'd like to build and install the latest master, use the following command:

./build.sh
pip install dist/raydp*.whl

Spark on Ray

RayDP provides an API for starting a Spark job on Ray. To create a Spark session, call the raydp.init_spark API. After that, you can use any Spark API as you want. For example:

import ray
import raydp

# connect to ray cluster
ray.init(address='auto')

# create a Spark cluster with specified resource requirements
spark = raydp.init_spark(app_name='RayDP Example',
                         num_executors=2,
                         executor_cores=2,
                         executor_memory='4GB')

# normal data processesing with Spark
df = spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
df.show()
word_count = df.groupBy('word').count()
word_count.show()

# stop the spark cluster
raydp.stop_spark()

Spark features such as dynamic resource allocation, spark-submit script, etc are also supported. Please refer to Spark on Ray for more details.

Spark + AI Pipeline on Ray

RayDP provides APIs for converting a Spark DataFrame to a Ray Dataset which can be consumed by XGBoost, Ray Train, Horovod on Ray, etc. RayDP also provides high level scikit-learn style Estimator APIs for distributed training with PyTorch or Tensorflow. To get started with end-to-end Spark + AI pipeline, the easiest way is to run the following tutorials on Google Collab. More examples are also available in the examples folder.

Spark DataFrame & Ray Dataset conversion

You can use ray.data.from_spark and ds.to_spark to convert between Spark DataFrame and Ray Dataset.

import ray
import raydp

ray.init()
spark = raydp.init_spark(app_name="RayDP Example",
                         num_executors=2,
                         executor_cores=2,
                         executor_memory="4GB")

# Spark Dataframe to Ray Dataset
df1 = spark.range(0, 1000)
ds1 = ray.data.from_spark(df1)

# Ray Dataset to Spark Dataframe
ds2 = ray.data.from_items([{"id": i} for i in range(1000)])
df2 = ds2.to_spark(spark)

Ray dataset converted from Spark dataframe this way will be no longer accessible after raydp.stop_spark(). If you want to access the data after spark is shutdown, please use raydp.stop_spark(cleanup_data=False).

Please refer to Spark+XGBoost on Ray for a full example.

Estimator API

The Estimator APIs allow you to train a deep neural network directly on a Spark DataFrame, leveraging Ray’s ability to scale out across the cluster. The Estimator APIs are wrappers of Ray Train and hide the complexity of converting a Spark DataFrame to a PyTorch/Tensorflow dataset and distributing the training. RayDP provides raydp.torch.TorchEstimator for PyTorch and raydp.tf.TFEstimator for Tensorflow. The following is an example of using TorchEstimator.

import ray
import raydp
from raydp.torch import TorchEstimator

ray.init(address="auto")
spark = raydp.init_spark(app_name="RayDP Example",
                         num_executors=2,
                         executor_cores=2,
                         executor_memory="4GB")
                         
# Spark DataFrame Code 
df = spark.read.parquet(…) 
train_df = df.withColumn(…)

# PyTorch Code 
model = torch.nn.Sequential(torch.nn.Linear(2, 1)) 
optimizer = torch.optim.Adam(model.parameters())

estimator = TorchEstimator(model=model, optimizer=optimizer, ...) 
estimator.fit_on_spark(train_df)

raydp.stop_spark()

Please refer to NYC Taxi PyTorch Estimator and NYC Taxi Tensorflow Estimator for full examples.

Fault Tolerance

The ray dataset converted from spark dataframe like above is not fault-tolerant. This is because we implement it using Ray.put combined with spark mapPartitions. Objects created by Ray.put is not recoverable in Ray.

RayDP now supports converting data in a way such that the resulting ray dataset is fault-tolerant. This feature is currently experimental. Here is how to use it:

import ray
import raydp

ray.init(address="auto")
# set fault_tolerance_mode to True to enable the feature
# this will connect pyspark driver to ray cluster
spark = raydp.init_spark(app_name="RayDP Example",
                         num_executors=2,
                         executor_cores=2,
                         executor_memory="4GB",
                         fault_tolerance_mode=True)
# df should be large enough so that result will be put into plasma
df = spark.range(100000)
# use this API instead of ray.data.from_spark
ds = raydp.spark.from_spark_recoverable(df)
# ds is now fault-tolerant.

Notice that from_spark_recoverable will persist the converted dataframe. You can provide the storage level through keyword parameter storage_level. In addition, this feature is not available in ray client mode. If you need to use ray client, please wrap your application in a ray actor, as described in the ray client chapter.

Getting Involved

To report bugs or request new features, please open a github issue.

raydp's People

Contributors

augray avatar bowen0729 avatar buaazhwb avatar carsonwang avatar clarkzinzow avatar coneyliu avatar deegue avatar edoakes avatar gptbert avatar guseggert avatar guykhazma avatar harborn avatar hezhaozhao-git avatar jiafuzha avatar jjyao avatar kepingyan avatar kira-lin avatar kiranp-d11 avatar marin-ma avatar max-509 avatar minmingzhu avatar mjschock avatar n1cks4x0 avatar pang-wu avatar raviranak avatar romeokienzler avatar wuisawesome avatar wybryan avatar yard1 avatar zuston avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

raydp's Issues

SLF4J: Class path contains multiple SLF4J bindings

It seems like there are multiple versions of SLF4J coming from Ray and PySpark, which lead to the following error being shown:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/ubuntu/anaconda3/lib/python3.7/site-packages/ray/jars/ray_dist.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/ubuntu/anaconda3/lib/python3.7/site-packages/pyspark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

I'm using raydp in conjunction with the sparkmonitor project, which displays a progress bar for spark jobs, and this warning generates a lot of output which pollutes the results. Any suggestions for how to suppress this output or prevent it from happening in the first place?

image

Auto determine the number of processes for data loading

Ideally, we could get the best performance when the time of data loading overlap with forward+backward time. This is a little similar to LR scheduling. We could use the history metrics to determine the reasonable processes number for data loading. This could also be added to Horovod.

Error when running estimator.fit_on_spark()

I was able to run all the steps following the pytorch example, until:

# Train the model
estimator.fit_on_spark(train_df, test_df)

Got this error:

2020-12-01 00:03:26,525	WARNING worker.py:1031 -- Failed to unpickle actor class 'DistributedTorchRunner' for actor ID 62223d8501000000. Traceback:
Traceback (most recent call last):
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/ray/function_manager.py", line 494, in _load_actor_class_from_gcs
    actor_class = pickle.loads(pickled_class)
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/ray/util/sgd/__init__.py", line 1, in <module>
    from ray.util.sgd.torch import TorchTrainer
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/ray/util/sgd/torch/__init__.py", line 12, in <module>
    from ray.util.sgd.torch.torch_trainer import (TorchTrainer,
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/ray/util/sgd/torch/torch_trainer.py", line 13, in <module>
    from ray.tune import Trainable
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/ray/tune/__init__.py", line 2, in <module>
    from ray.tune.tune import run_experiments, run
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/ray/tune/tune.py", line 13, in <module>
    from ray.tune.ray_trial_executor import RayTrialExecutor
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/ray/tune/ray_trial_executor.py", line 15, in <module>
    from ray.tune.durable_trainable import DurableTrainable
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/ray/tune/durable_trainable.py", line 5, in <module>
    from ray.tune.syncer import get_cloud_sync_client
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/ray/tune/syncer.py", line 90, in <module>
    class SyncConfig:
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/dataclasses.py", line 958, in dataclass
    return wrap(_cls)
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/dataclasses.py", line 950, in wrap
    return _process_class(cls, init, repr, eq, order, unsafe_hash, frozen)
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/dataclasses.py", line 800, in _process_class
    cls_fields = [_get_field(cls, name, type)
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/dataclasses.py", line 800, in <listcomp>
    cls_fields = [_get_field(cls, name, type)
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/dataclasses.py", line 659, in _get_field
    if (_is_classvar(a_type, typing)
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/dataclasses.py", line 550, in _is_classvar
    return type(a_type) is typing._ClassVar
AttributeError: module 'typing' has no attribute '_ClassVar'

raydp.init_spark error

I'm following the example at: examples/pytorch/pytorch_nyctaxi.ipynb

Got this error when execute the raydp.init_spark

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-4-cb3a24832a0a> in <module>
      4 cores_per_executor = 1
      5 memory_per_executor = "2GB"
----> 6 spark = raydp.init_spark(app_name, num_executors, cores_per_executor, memory_per_executor)

~/anaconda3/lib/python3.8/site-packages/raydp/context.py in init_spark(app_name, num_executors, executor_cores, executor_memory, configs)
    120             _global_spark_context = _SparkContext(
    121                 app_name, num_executors, executor_cores, executor_memory, configs)
--> 122             return _global_spark_context.get_or_create_session()
    123         else:
    124             raise Exception("The spark environment has inited.")

~/anaconda3/lib/python3.8/site-packages/raydp/context.py in get_or_create_session(self)
     67         if self._spark_session is not None:
     68             return self._spark_session
---> 69         spark_cluster = self._get_or_create_spark_cluster()
     70         self._spark_session = spark_cluster.get_spark_session(
     71             self._app_name,

~/anaconda3/lib/python3.8/site-packages/raydp/context.py in _get_or_create_spark_cluster(self)
     61         if self._spark_cluster is not None:
     62             return self._spark_cluster
---> 63         self._spark_cluster = RayCluster()
     64         return self._spark_cluster
     65 

~/anaconda3/lib/python3.8/site-packages/raydp/spark/ray_cluster.py in __init__(self)
     37         super().__init__(None)
     38         self._app_master_bridge = None
---> 39         self._set_up_master(None, None)
     40         self._spark_session: SparkSession = None
     41 

~/anaconda3/lib/python3.8/site-packages/raydp/spark/ray_cluster.py in _set_up_master(self, resources, kwargs)
     43         # TODO: specify the app master resource
     44         self._app_master_bridge = RayClusterMaster()
---> 45         self._app_master_bridge.start_up()
     46 
     47     def _set_up_worker(self, resources: Dict[str, float], kwargs: Dict[str, str]):

~/anaconda3/lib/python3.8/site-packages/raydp/spark/ray_cluster_master.py in start_up(self, popen_kwargs)
     48         self._set_properties()
     49         self._host = ray.services.get_node_ip_address()
---> 50         self._create_app_master(extra_classpath)
     51         self._started_up = True
     52 

~/anaconda3/lib/python3.8/site-packages/raydp/spark/ray_cluster_master.py in _create_app_master(self, extra_classpath)
    156         if self._started_up:
    157             return
--> 158         self._app_master_java_bridge.startUpAppMaster(extra_classpath)
    159 
    160     def get_master_url(self):

~/anaconda3/lib/python3.8/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

~/anaconda3/lib/python3.8/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o0.startUpAppMaster.
: java.lang.NoSuchMethodError: java.nio.ByteBuffer.flip()Ljava/nio/ByteBuffer;
	at io.ray.api.id.JobId.fromInt(JobId.java:47)
	at io.ray.runtime.gcs.GcsClient.nextJobId(GcsClient.java:186)
	at io.ray.runtime.RayNativeRuntime.start(RayNativeRuntime.java:105)
	at io.ray.runtime.DefaultRayRuntimeFactory.createRayRuntime(DefaultRayRuntimeFactory.java:38)
	at io.ray.api.Ray.init(Ray.java:42)
	at io.ray.api.Ray.init(Ray.java:28)
	at org.apache.spark.deploy.raydp.AppMasterJavaBridge.startUpAppMaster(AppMasterJavaBridge.scala:41)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

conda-forge package?

Hi, are there any plans to automatically publish conda-forge packages beside of pip packages?

not stable when using ray client mode

I'm using raydp in ray's client mode, trying to submit a raydp operation with ray.remote,
it works during the initial test, but when I re-run this code, it doesn't print anything, which seems failed quitely.

import ray
import raydp
import logging
example_cluster = "xxx.xxx.xxx.xxx:50051" # set this to your ray client address
try:
    ray.util.connect(example_cluster)
    logging.info(f"Connected to pre-provisioned cluster:{example_cluster}")
except Exception as e:
    logging.info(f"Error in client connection:{e}")
    pass

class SparkOp:
    def __init__(self):
        self.spark = None
        self.started = False
    def start(self, config= None):
        try:
            #todo: pass user config into init_spark
            self.spark = raydp.init_spark('word_count',
                                       num_executors=2,
                                       executor_cores=1,
                                       executor_memory='1G')
            self.started = True
        except Exception as e:
            logging.info(f"starting spark failed:{e}")
            self.started = False    

    def run(self, input_data=None):
        if not self.started:
            self.start()
            logging.info("starting spark now...")
        if not self.started:
            logging.info("spark is not started, exit")
            return
        try:
            df = self.spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
            df.show()
            word_count = df.groupBy('word').count()
            word_count.show()
            logging.info("returnning dataframe")
            #todo: add support for RayFlow Tensor Cache
            #return df.collect() 
        except Exception as e:
            logging.info(f"running spark job failed:{e}, stopping spark now")
            self.stop()
    def stop(self):
        raydp.stop_spark()
        
spark = ray.remote(SparkOp).remote()
ray.get(spark.run.remote())
ray.get(spark.stop.remote())

Possible unhandled error from worker: ray::ParallelIteratorWorker.par_iter_next_batch()

The following erros are just error prints. It is a bug in ray and will be fixed in future.

2020-12-01 20:44:59,081	ERROR worker.py:977 -- Possible unhandled error from worker: ray::ParallelIteratorWorker.par_iter_next_batch() (pid=24362, ip=192.168.3.6)
  File "python/ray/_raylet.pyx", line 464, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 419, in ray._raylet.execute_task.function_executor
  File "/Users/xianyang/miniconda3/envs/torch/lib/python3.7/site-packages/ray/util/iter.py", line 1158, in par_iter_next_batch
    batch.append(self.par_iter_next())
  File "/Users/xianyang/miniconda3/envs/torch/lib/python3.7/site-packages/ray/util/iter.py", line 1152, in par_iter_next
    return next(self.local_it)
StopIteration

Can't init spark on multi-nodes ray cluster on k8s

Hi I was able to run raydp on my laptop and on K8S (by initializing a single node ray cluster, e.g., ray.init()),
but on K8S, if I run raydp by connecting to a multi-node existing ray cluster, e.g., ray.init(address="auto"), I got the errors, and ray head will crash.

Here is my test code:

import ray,raydp
ray.init(address="auto")
raydp.init_spark(app_name, num_executors, cores_per_executor, memory_per_executor)
(base) root@ray-head-7f4bc4b6df-nx697:/ray-raydp# python spark_init.py 1
2020-12-23 13:14:59,426	INFO worker.py:655 -- Connecting to existing Ray cluster at address: 10.40.1.13:6379
number of workers:1
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/ray-raydp/ray/python/ray/jars/ray_dist.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/anaconda3/lib/python3.7/site-packages/pyspark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
(pid=raylet) [2020-12-23 13:15:01,006 C 35 35] worker_pool.cc:952:  Check failed: state != states_by_lang_.end() Required Language isn't supported.
(pid=raylet) [2020-12-23 13:15:01,006 E 35 35] logging.cc:414: *** Aborted at 1608758101 (unix time) try "date -d @1608758101" if you are using GNU date ***
(pid=raylet) [2020-12-23 13:15:01,006 E 35 35] logging.cc:414: PC: @                0x0 (unknown)
(pid=raylet) [2020-12-23 13:15:01,006 E 35 35] logging.cc:414: *** SIGABRT (@0x23) received by PID 35 (TID 0x7f14e82f7800) from PID 35; stack trace: ***
(pid=raylet) [2020-12-23 13:15:01,006 E 35 35] logging.cc:414:     @     0x556a2af32d97 google::(anonymous namespace)::FailureSignalHandler()
(pid=raylet) [2020-12-23 13:15:01,007 E 35 35] logging.cc:414:     @     0x7f14e85293c0 (unknown)
(pid=raylet) [2020-12-23 13:15:01,007 E 35 35] logging.cc:414:     @     0x7f14e834218b gsignal
(pid=raylet) [2020-12-23 13:15:01,007 E 35 35] logging.cc:414:     @     0x7f14e8321859 abort
(pid=raylet) [2020-12-23 13:15:01,007 E 35 35] logging.cc:414:     @     0x556a2a9e00ad _ZN3ray6RayLogD2Ev.cold
(pid=raylet) [2020-12-23 13:15:01,009 E 35 35] logging.cc:414:     @     0x556a2aa66a3c ray::raylet::WorkerPool::GetStateForLanguage()
(pid=raylet) [2020-12-23 13:15:01,010 E 35 35] logging.cc:414:     @     0x556a2aa6e003 ray::raylet::WorkerPool::RegisterDriver()
(pid=raylet) [2020-12-23 13:15:01,012 E 35 35] logging.cc:414:     @     0x556a2aac9096 ray::raylet::NodeManager::ProcessRegisterClientRequestMessage()
(pid=raylet) [2020-12-23 13:15:01,014 E 35 35] logging.cc:414:     @     0x556a2aadcce2 ray::raylet::NodeManager::ProcessClientMessage()
(pid=raylet) [2020-12-23 13:15:01,014 E 35 35] logging.cc:414:     @     0x556a2aa29625 _ZNSt17_Function_handlerIFvSt10shared_ptrIN3ray16ClientConnectionEElRKSt6vectorIhSaIhEEEZNS1_6raylet6Raylet12HandleAcceptERKN5boost6system10error_codeEEUlS3_lS8_E0_E9_M_invokeERKSt9_Any_dataOS3_OlS8_
(pid=raylet) [2020-12-23 13:15:01,015 E 35 35] logging.cc:414:     @     0x556a2ae81792 ray::ClientConnection::ProcessMessage()
(pid=raylet) [2020-12-23 13:15:01,016 E 35 35] logging.cc:414:     @     0x556a2ae7eabc boost::asio::detail::reactive_socket_recv_op<>::do_complete()
(pid=raylet) [2020-12-23 13:15:01,016 E 35 35] logging.cc:414:     @     0x556a2b26b701 boost::asio::detail::scheduler::do_run_one()
(pid=raylet) [2020-12-23 13:15:01,017 E 35 35] logging.cc:414:     @     0x556a2b26ce41 boost::asio::detail::scheduler::run()
(pid=raylet) [2020-12-23 13:15:01,019 E 35 35] logging.cc:414:     @     0x556a2b26f4bb boost::asio::io_context::run()
(pid=raylet) [2020-12-23 13:15:01,020 E 35 35] logging.cc:414:     @     0x556a2a9fbdc2 main
(pid=raylet) [2020-12-23 13:15:01,020 E 35 35] logging.cc:414:     @     0x7f14e83230b3 __libc_start_main
(pid=raylet) [2020-12-23 13:15:01,022 E 35 35] logging.cc:414:     @     0x556a2aa1189e _start
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/root/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1207, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/root/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1033, in send_command
    response = connection.send_command(command)
  File "/root/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1212, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
An error occurred while calling o0.startUpAppMaster
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:25333)
Traceback (most recent call last):
  File "/root/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 977, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/root/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused

Java runtime compatibility

I am playing raydp inside head pod on k8s.

import ray
import raydp

# connect to the cluster
ray.init(address='auto')

2021-04-08 15:26:01,060	INFO worker.py:654 -- Connecting to existing Ray cluster at address: 192.168.13.21:6379
{'node_ip_address': '192.168.13.21', 'raylet_ip_address': '192.168.13.21', 'redis_address': '192.168.13.21:6379', 'object_store_address': '/tmp/ray/session_2021-04-08_15-02-34_751440_101/sockets/plasma_store', 'raylet_socket_name': '/tmp/ray/session_2021-04-08_15-02-34_751440_101/sockets/raylet', 'webui_url': '0.0.0.0:8265', 'session_dir': '/tmp/ray/session_2021-04-08_15-02-34_751440_101', 'metrics_export_port': 49804, 'node_id': 'adb6bbd4e36d880a70ee156fd05895c0b8a4bad92e88f14a733b035c'}

ray.nodes()

[{'NodeID': 'bbf00a25f2fc5fecb28245c63b57d18700e5819532233dbc0bc1d936', 'Alive': True, 'NodeManagerAddress': '192.168.13.22', 'NodeManagerHostname': 'example-cluster-ray-worker-jr8js', 'NodeManagerPort': 41061, 'ObjectManagerPort': 43811, 'ObjectStoreSocketName': '/tmp/ray/session_2021-04-08_15-02-34_751440_101/sockets/plasma_store', 'RayletSocketName': '/tmp/ray/session_2021-04-08_15-02-34_751440_101/sockets/raylet', 'MetricsExportPort': 50616, 'alive': True, 'Resources': {'memory': 3006477107.0, 'CPU': 2.0, 'object_store_memory': 1273641369.0, 'bar': 1.0, 'foo': 1.0, 'node:192.168.13.22': 1.0}}, {'NodeID': 'adb6bbd4e36d880a70ee156fd05895c0b8a4bad92e88f14a733b035c', 'Alive': True, 'NodeManagerAddress': '192.168.13.21', 'NodeManagerHostname': 'example-cluster-ray-head-xnk64', 'NodeManagerPort': 35547, 'ObjectManagerPort': 45743, 'ObjectStoreSocketName': '/tmp/ray/session_2021-04-08_15-02-34_751440_101/sockets/plasma_store', 'RayletSocketName': '/tmp/ray/session_2021-04-08_15-02-34_751440_101/sockets/raylet', 'MetricsExportPort': 49804, 'alive': True, 'Resources': {'memory': 3006477107.0, 'node:192.168.13.21': 1.0, 'CPU': 2.0, 'object_store_memory': 1273423872.0}}, {'NodeID': '02df037ac63f257c289e1d16762cadff26c9f8b0e5714738416c09f9', 'Alive': True, 'NodeManagerAddress': '192.168.13.23', 'NodeManagerHostname': 'example-cluster-ray-worker-5rknz', 'NodeManagerPort': 42665, 'ObjectManagerPort': 36957, 'ObjectStoreSocketName': '/tmp/ray/session_2021-04-08_15-02-34_751440_101/sockets/plasma_store', 'RayletSocketName': '/tmp/ray/session_2021-04-08_15-02-34_751440_101/sockets/raylet', 'MetricsExportPort': 64861, 'alive': True, 'Resources': {'memory': 3006477107.0, 'object_store_memory': 1273514803.0, 'foo': 1.0, 'node:192.168.13.23': 1.0, 'bar': 1.0, 'CPU': 2.0}}]

Ray environment is correct, however, it encounter some issues when I use raydp.
Seems Java run time is not compatible with current ray.

Java Version: openjdk 11.0.10 2021-01-19
Ray Cluster: 2.0.0-dev
Raydp: 0.2.0


spark = raydp.init_spark("K8S test", num_executors=2, executor_cores=1, executor_memory="512M")


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
.....
2021-04-08 15:27:23 ERROR Inbox:94 - An error happened while processing message in the inbox for RAY_APP_MASTER
java.lang.NoSuchMethodError: 'io.ray.api.call.ActorCreator io.ray.api.call.ActorCreator.setJvmOptions(java.lang.String)'
	at org.apache.spark.raydp.AppMasterJavaUtils.createExecutorActor(AppMasterJavaUtils.java:47)
	at org.apache.spark.deploy.raydp.RayAppMaster$RayAppMasterEndpoint.org$apache$spark$deploy$raydp$RayAppMaster$RayAppMasterEndpoint$$requestNewExecutor(RayAppMaster.scala:190)
	at org.apache.spark.deploy.raydp.RayAppMaster$RayAppMasterEndpoint.$anonfun$schedule$1(RayAppMaster.scala:177)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
	at org.apache.spark.deploy.raydp.RayAppMaster$RayAppMasterEndpoint.org$apache$spark$deploy$raydp$RayAppMaster$RayAppMasterEndpoint$$schedule(RayAppMaster.scala:176)
	at org.apache.spark.deploy.raydp.RayAppMaster$RayAppMasterEndpoint$$anonfun$receive$1.applyOrElse(RayAppMaster.scala:104)
	at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
	at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
	at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
	at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "dispatcher-event-loop-1" java.lang.NoSuchMethodError: 'io.ray.api.call.ActorCreator io.ray.api.call.ActorCreator.setJvmOptions(java.lang.String)'
	at org.apache.spark.raydp.AppMasterJavaUtils.createExecutorActor(AppMasterJavaUtils.java:47)
	at org.apache.spark.deploy.raydp.RayAppMaster$RayAppMasterEndpoint.org$apache$spark$deploy$raydp$RayAppMaster$RayAppMasterEndpoint$$requestNewExecutor(RayAppMaster.scala:190)
	at org.apache.spark.deploy.raydp.RayAppMaster$RayAppMasterEndpoint.$anonfun$schedule$1(RayAppMaster.scala:177)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
	at org.apache.spark.deploy.raydp.RayAppMaster$RayAppMasterEndpoint.org$apache$spark$deploy$raydp$RayAppMaster$RayAppMasterEndpoint$$schedule(RayAppMaster.scala:176)
	at org.apache.spark.deploy.raydp.RayAppMaster$RayAppMasterEndpoint$$anonfun$receive$1.applyOrElse(RayAppMaster.scala:104)
	at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
	at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
	at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
	at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

action doesn't get any resources. It keeps waiting here. I can confirm that my cluster has enough resources. I think this is still related to spark session initialization

>>> spark.range(0, 100).count()
21/04/08 15:28:52 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
21/04/08 15:29:07 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
21/04/08 15:29:22 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
...

Support for spark.jars.packages parameter

Hi,

I am trying to specify custom jars in the spark init using the spark.jars.packages for example:

spark = raydp.init_spark(
    "Sample APP",
    1,
    2,
    "2G", 
    {
        "spark.jars.packages": "com.ibm.stocator:stocator:1.1.3"
    })

The jar is download but seems that it isn't propagated to the driver/executer class path so I get class java.lang.ClassNotFoundException

if I use spark.jars with a local jar then things work fine.

Have you seen the same behaviour?

Investigate store pyarrow.Table data in ray object store

Currently, we serialize the pandas DataFrame and store it into ray object store in spark pandas udf, then get the object from ray (zero-copy) and deserialize back to pandas DataFrame in the TorchTrainer. For each block of data, we have to do:

  1. serialize the DataFrame into byte array.
  2. copy the byte array to ray object store.
  3. zero-copy the byte array and deserialize back to pandas DataFrame. The deserialization could trigger extra copy.

So, we need serialize/deserialize and two times memory copy. This is not efficient.

We can leverage pyarrow to exchange the data between pandas DataFrame <--> ray object store <--> pandas DataFrame. With pyarrow:

  1. convert pandas DataFrame to pyarrow Table, this should be fast and support multi-threads.
  2. store the pyarrow Table into ray object store, this needs to investigate the overhead whether we need to serialize the Table or just memory copy.
  3. zero-copy the Table and convert back to pandas DataFrame. This should be fast too.

pandas DataFrame convert to/back pyarrow Table in zero-copy mode seems only works for limited data types, however, deep learning frameworks only support number like type. This should work.

PandasUDFType has no attribute "MAP_ITER"

In function save_to_ray, the pandas udf type "MAP_ITER" is not supported in spark 3.0

    def save_to_ray(df: Any) -> BlockSet:
        ...
        @pandas_udf(return_type, PandasUDFType.MAP_ITER)
        def save(batch_iter):```

Resource availability issues

I am running the PyTorch_nyctaxi example on a node with 80 CPUs, each with 20 physical cores. If I set the number of executors to a number larger than 16 and the number of workers in TorchEstimator to more than 20, the code won't run and I would get resource unavailable errors, such as these ones:

(raylet) terminate called after throwing an instance of 'std::system_error'
(raylet) what(): Resource temporarily unavailable
2021-02-26 11:42:09,982 WARNING worker.py:1090 -- The node with node id e7d998b5c895816e6446685482cb0392ad76245a8dab474562b232d9 has been marked dead because the detector has missed too many heartbeats from it. This can happen when a raylet crashes unexpectedly or has lagging heartbeats.

Obviously, I am utilizing a small portion of the computing resources I have available on the node. I do not specify the num_cpus to use in ray.init(), so it should automatically detect all 80.

Any idea why is this happening?

Can not build raydp with Java

When I enable Java for Ray, I couldn't build it successfully on MacOS

cd oap-raydp/dev/.tmp_dir/ray/python
export RAY_INSTALL_JAVA=1
python3 setup.py -q bdist_wheel

The errors are:

INFO: Analyzed 2 targets (2 packages loaded, 484 targets configured).
INFO: Found 2 targets...
ERROR: /oap-raydp/dev/.tmp_dir/ray/BUILD.bazel:1818:10: C++ compilation of rule '//:libcore_worker_library_java.so' failed (Exit 1): cc_wrapper.sh failed: error executing command

and

Use --sandbox_debug to see verbose messages from the sandbox
In file included from src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc:17:
src/ray/core_worker/lib/java/jni_utils.h:341:20: error: loop variable 'item' is always a copy because the range of type 'const std::vector<bool>' does not return a reference [-Werror,-Wrange-loop-analysis]
1 error generated.
INFO: Elapsed time: 24.630s, Critical Path: 24.28s
INFO: 13 processes: 10 internal, 3 darwin-sandbox.
FAILED: Build did NOT complete successfully
Traceback (most recent call last):
  File "setup.py", line 440, in <module>
    setuptools.setup(
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/setuptools/__init__.py", line 165, in setup
    return distutils.core.setup(**attrs)
  File "/Users/dr6jl/anaconda3/lib/python3.8/distutils/core.py", line 148, in setup
    dist.run_commands()
  File "/Users/dr6jl/anaconda3/lib/python3.8/distutils/dist.py", line 966, in run_commands
    self.run_command(cmd)
  File "/Users/dr6jl/anaconda3/lib/python3.8/distutils/dist.py", line 985, in run_command
    cmd_obj.run()
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/wheel/bdist_wheel.py", line 223, in run
    self.run_command('build')
  File "/Users/dr6jl/anaconda3/lib/python3.8/distutils/cmd.py", line 313, in run_command
    self.distribution.run_command(command)
  File "/Users/dr6jl/anaconda3/lib/python3.8/distutils/dist.py", line 985, in run_command
    cmd_obj.run()
  File "/Users/dr6jl/anaconda3/lib/python3.8/distutils/command/build.py", line 135, in run
    self.run_command(cmd_name)
  File "/Users/dr6jl/anaconda3/lib/python3.8/distutils/cmd.py", line 313, in run_command
    self.distribution.run_command(command)
  File "/Users/dr6jl/anaconda3/lib/python3.8/distutils/dist.py", line 985, in run_command
    cmd_obj.run()
  File "setup.py", line 433, in run
    return pip_run(self)
  File "setup.py", line 342, in pip_run
    build(True, BUILD_JAVA)
  File "setup.py", line 299, in build
    return bazel_invoke(
  File "setup.py", line 185, in bazel_invoke
    result = invoker([cmd] + cmdline, *args, **kwargs)
  File "/Users/dr6jl/anaconda3/lib/python3.8/subprocess.py", line 364, in check_call
    raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '['bazel', 'build', '--verbose_failures', '--', '//:ray_pkg', '//java:ray_java_pkg']' returned non-zero exit status 1.

If I disable Java, the wheel can be built. But in the examples, I couldn't run this:

ray.init(include_java=True)

Cannot start Spark session on my laptop

I was using very simple code:

import ray
import raydp                                                                
ray.init()
spark = raydp.init_spark(app_name="RayDP example", num_executors=2, executor_cores=2, executor_memory="4GB")

My laptop is MacBook (Big Sur). Tried Ray nightly and 1.2. PySpark 3.0.1. Tried Java 11 and 15.

Got the below error:

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/ray/jars/ray_dist.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pyspark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "main" py4j.Py4JNetworkException
	at py4j.GatewayServer.startSocket(GatewayServer.java:788)
	at py4j.GatewayServer.start(GatewayServer.java:763)
	at py4j.GatewayServer.start(GatewayServer.java:746)
	at org.apache.spark.deploy.raydp.AppMasterEntryPoint$.main(AppMasterEntryPoint.scala:39)
	at org.apache.spark.deploy.raydp.AppMasterEntryPoint.main(AppMasterEntryPoint.scala)
Caused by: java.net.BindException: Address already in use
	at java.base/sun.nio.ch.Net.bind0(Native Method)
	at java.base/sun.nio.ch.Net.bind(Net.java:550)
	at java.base/sun.nio.ch.Net.bind(Net.java:539)
	at java.base/sun.nio.ch.NioSocketImpl.bind(NioSocketImpl.java:643)
	at java.base/java.net.ServerSocket.bind(ServerSocket.java:396)
	at py4j.GatewayServer.startSocket(GatewayServer.java:786)
	... 4 more

model.train() reports an error when running estimator.fit_on_spark(df)

Thanks to the author's efforts, this is a very good project.
I tried to run the following brief test program.
------------code-------------
import raydp
from raydp.torch import TorchEstimator
#start Spark job on Ray
spark = raydp.init_spark(
app_name = "example test",
num_executors = 3,
executor_cores = 3,
executor_memory="10GB"
)
##prepare the data
customers = [(1,'James',21,6), (2, "Liz",25,8), (3, "John", 31, 6),
(4, "Jennifer", 45, 7), (5, "Robert", 41, 5), (6, "Sandra", 45, 8)]
df = spark.createDataFrame(customers, ["cID", "name", "age", "grade"])
df.show()

import torch
##create model
model = torch.nn.Sequential(torch.nn.Linear(2, 1))
optimizer = torch.optim.Adam(model.parameters())
loss = torch.nn.MSELoss()
#config
estimator = TorchEstimator(
model = model,
optimizer = optimizer,
loss = loss,
num_workers = 3,
num_epochs = 5,
feature_columns = ["age"],
label_column = ["grade"]
)
print("before fit on spark, the type of model:", type(model))
estimator.fit_on_spark(df)
##get the trained model
pytorch_model = estimator.get_model()
------------code-------------
But when performing model training model.train() in training_operator.py, an error message was reported: AttributeError:'list' object has no attribute'train'.
--------error-----------
Traceback (most recent call last):
File "", line 1, in
File "/root/miniconda3/lib/python3.7/site-packages/raydp/torch/estimator.py", line 311, in fit_on_spark
train_ds, evaluate_ds, num_steps, profile, reduce_results, max_retries, info)
File "/root/miniconda3/lib/python3.7/site-packages/raydp/torch/estimator.py", line 284, in fit
info=info)
File "/root/miniconda3/lib/python3.7/site-packages/ray/util/sgd/torch/torch_trainer.py", line 410, in train
num_steps=num_steps, profile=profile, info=info, dataset=dataset)
File "/root/miniconda3/lib/python3.7/site-packages/ray/util/sgd/torch/worker_group.py", line 325, in train
success = check_for_failure(remote_worker_stats)
File "/root/miniconda3/lib/python3.7/site-packages/ray/util/sgd/utils.py", line 244, in check_for_failure
finished = ray.get(finished)
File "/root/miniconda3/lib/python3.7/site-packages/ray/worker.py", line 1379, in get
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(AttributeError): ray::DistributedTorchRunner.train_epoch() (pid=54182, ip=10.3.68.117)
File "python/ray/_raylet.pyx", line 463, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 415, in ray._raylet.execute_task.function_executor
File "/root/miniconda3/lib/python3.7/site-packages/ray/util/sgd/torch/distributed_torch_runner.py", line 112, in train_epoch
num_steps=num_steps, profile=profile, info=info, iterator=iterator)
File "/root/miniconda3/lib/python3.7/site-packages/ray/util/sgd/torch/torch_runner.py", line 140, in train_epoch
train_stats = self.training_operator.train_epoch(iterator, info)
File "/root/miniconda3/lib/python3.7/site-packages/ray/util/sgd/torch/training_operator.py", line 504, in train_epoch
model.train()
AttributeError: 'list' object has no attribute 'train'
--------error-----------
I was confused, and then executed output before mode.train( line 504 in training_operator.py): print(type(model)), and found that the output was: <class'list'>.
But I executed output after model = outer._model(line 174 in estimator.py): print(type(model)), and found that the output was: <class'torch.nn.modules.container.Sequential'>.
It seems that the type of model has changed.
But I cannot see the specific process of the change, can you help me solve it?
You can run the above program directly.
Looking forward to your reply~~

raydp.init_spark fails

This line of code fails:
spark = raydp.init_spark(app_name="RayDP example",
num_executors=2,
executor_cores=2,
executor_memory="4GB")

I am getting the following errors (linux server with a standalone Spark cluster):

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.spark.deploy.raydp.AppMasterEntryPoint.main(AppMasterEntryPoint.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.internal.Logging
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 13 more
Traceback (most recent call last):
File "/home/guryaniv/try_raydp_simple.py", line 4, in
spark = raydp.init_spark(app_name="RayDP example", num_executors=2, executor_cores=2, executor_memory="4GB")
File "/opt/anaconda3/envs/raydp/lib/python3.6/site-packages/raydp/context.py", line 122, in init_spark
return _global_spark_context.get_or_create_session()
File "/opt/anaconda3/envs/raydp/lib/python3.6/site-packages/raydp/context.py", line 68, in get_or_create_session
spark_cluster = self._get_or_create_spark_cluster()
File "/opt/anaconda3/envs/raydp/lib/python3.6/site-packages/raydp/context.py", line 62, in _get_or_create_spark_cluster
self._spark_cluster = SparkCluster()
File "/opt/anaconda3/envs/raydp/lib/python3.6/site-packages/raydp/spark/ray_cluster.py", line 31, in init
self._set_up_master(None, None)
File "/opt/anaconda3/envs/raydp/lib/python3.6/site-packages/raydp/spark/ray_cluster.py", line 37, in _set_up_master
self._app_master_bridge.start_up()
File "/opt/anaconda3/envs/raydp/lib/python3.6/site-packages/raydp/spark/ray_cluster_master.py", line 52, in start_up
self._gateway = self._launch_gateway(extra_classpath, popen_kwargs)
File "/opt/anaconda3/envs/raydp/lib/python3.6/site-packages/raydp/spark/ray_cluster_master.py", line 115, in _launch_gateway
raise Exception("Java gateway process exited before sending its port number")
Exception: Java gateway process exited before sending its port number

To run the code I am using spark-submit with the spark master:
spark-submit --master spark://:7077 raydp_example.py

Upgrade to Ray 1.3

We can create and maintain a branch to support Ray 2.0.0-dev. Ray 2.0.0-dev java package is not available in Maven so users need to build Ray before building RayDP.

Unable to create nodes in an autoscaling cluster

Hi there! I'm trying to figure out how to put together a proper Spark config in an auto-scaling cluster.

I'm trying something very simple:

import ray
import raydp

ray.init(address='auto')

spark = raydp.init_spark(
    'word_count',
     num_executors=16,
     executor_cores=5,
     executor_memory='1G'
)

I'm running this with an autoscaling config that starts with a single m4.16xlarge node (64 cores, 160GB RAM) and can auto-scale a few more worker nodes. However, I'm seeing the following error from the autoscaler:

Demands:
 {'memory': 20.0, 'CPU': 5.0}: 4+ pending tasks/actors
2021-03-12 23:24:25,447 WARNING resource_demand_scheduler.py:642 -- The autoscaler could 
not find a node type to satisfy the request: [{'CPU': 5.0, 'memory': 20.0}, {'CPU': 5.0, 'memory': 20.0}, 
{'CPU': 5.0, 'memory': 20.0}, {'CPU': 5.0, 'memory': 20.0}]. If this request is related to placement 
groups the resource request will resolve itself, otherwise please specify a node type with the
 necessary resource https://docs.ray.io/en/master/cluster/autoscaling.html#multiple-node-type-autoscaling.
2021-03-12 23:24:25,559 INFO autoscaler.py:305 --
======== Autoscaler status: 2021-03-12 23:24:25.559754 ========
Node status
---------------------------------------------------------------
Healthy:
 1 ray-legacy-head-node-type
Pending:
 (no pending nodes)
Recent failures:
 (no failures)

Resources
---------------------------------------------------------------

Usage:
 60.0/64.0 CPU
 11.72/163.184 GiB memory
 0.00/50.977 GiB object_store_memory

Demands:
 {'memory': 20.0, 'CPU': 5.0}: 4+ pending tasks/actors

I guess I have 2 questions about this:

  • How does the requested amount of RAM translate into Ray demands? I asked for 1GB, but that seems to have translated to 20GB
  • Any idea why the autoscaler can't fulfill the request? I've successfully used autoscaling on this cluster with Ray Tune.

Thanks for all the awesome work!

RayDP : Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

Hi ,

First of all thanks for this implementation which is of great use .

I am a beginner to this , facing below issue when i am trying to execute raydp on minikube with cpu 10 and memory 10GB

Is there any explict memory that needs to be set , because for any action execution in spark i get the following issue
21/04/07 05:02:09 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

Any help is greatly appreciated 👍

Spark 3.1.0+ not compatible

Setting spark version to 3.1.0+ in pom.xml of core will result in this:

[ERROR] /home/lzhi/Projects/raydp/core/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala:196: error: method doRequestTotalExecutors overrides nothing.
[ERROR] Note: the super classes of class RayCoarseGrainedSchedulerBackend contain the following, non final members named doRequestTotalExecutors:
[ERROR] protected def doRequestTotalExecutors(resourceProfileToTotalExecs: Map[org.apache.spark.resource.ResourceProfile,Int]): scala.concurrent.Future[Boolean]
[ERROR]   override protected def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {

@ConeyLiu

Accessing files in S3 with Spark's S3A connector

Hi there,

I'm trying to use RayDP on an EC2 Ray cluster. It took me some time to figure out how to access S3 correctly. The solution involved downloading a custom version of Spark (with Hadoop 3), and then downloading some AWS jars:

# Install a custom version of PySpark which plays nicely with S3
wget 'https://mirrors.ocf.berkeley.edu/apache/spark/spark-3.0.2/spark-3.0.2-bin-hadoop3.2.tgz'
tar -xvzf spark-3.0.2-bin-hadoop3.2.tgz
pushd spark-3.0.2-bin-hadoop3.2/python/
python setup.py sdist
pip install dist/pyspark-3.0.2.tar.gz
popd

# Install the Spark-on-Ray project
pip install raydp-nightly

# Get the S3 connector for Spark
pushd "$(python -c 'import os, pyspark; print(os.path.dirname(pyspark.__file__))')/jars"
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.375/aws-java-sdk-bundle-1.11.375.jar
popd

As a result of this, I am able to use the S3A connector to do things like spark.read.parquet("s3a://....."). This connector automatically figures credentials from the head node's instance role, so no explicit authentication is needed.

However, when I try to do the same in an auto-scaling cluster, the worker nodes are not able to get the credentials. The following error shows up:

Py4JJavaError: An error occurred while calling o166.json.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 71 in stage 0.0 failed 4 times, 
most recent failure: Lost task 71.3 in stage 0.0 (TID 88, 172.31.38.110, executor 12): 
java.nio.file.AccessDeniedException: <<BUCKET_NAME>>: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: 
No AWS Credentials provided by SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider 
InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: The requested metadata is not
 found at http://xxx.xxx.xxx.xxx/latest/meta-data/iam/security-credentials/

I guess this problem is related to this one: ray-project/ray#3115.

However, I am wondering if there's any workaround specific to RayDP that could help with this issue.

Spark executor memory is not set correctly

In [3]: import raydp

In [4]: spark = raydp.init_spark(
   ...:   app_name = "example test",
   ...:   num_executors = 3,
   ...:   executor_cores = 3,
   ...:   executor_memory="5GB")

with jinfo:

VM Flags:
Non-default VM flags: -XX:CICompilerCount=18 -XX:InitialHeapSize=2147483648 -XX:MaxHeapSize=32210157568 -XX:MaxNewSize=10736369664 -XX:MinHeapDeltaBytes=524288 -XX:NewSize=715653120 -XX:OldSize=1431830528 -XX:+UseCompressedClassPointers -XX:+UseCompressedOops -XX:+UseParallelGC

The -XX:MaxHeapSize=32210157568 is not correctly. It seems like the ray java actor has not set the VM flags correctly.

Having worker nodes specific for Spark jobs

Hi there, I am continuing to enjoy using this project, it's great!

I wish to have a heterogeneous clusters where I have some GPU worker nodes, a few CPU-heavy worker nodes for HPO jobs, and many smaller instances for Spark jobs. I know that it's possible to declare custom resource markers on nodes in Ray. However, I don't think I see a way to get raydp to request those custom resources. Am I missing something, or is this not possible?

Thanks!

Failed to create RayClusterMaster since global worker doesn't have node attr

Problem

I submit a simple spark example to RayCluster on Kubernetes and meet this problem.

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
  File "raydp-spark.py", line 15, in <module>
    executor_memory='1G')
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/context.py", line 122, in init_spark
    return _global_spark_context.get_or_create_session()
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/context.py", line 68, in get_or_create_session
    spark_cluster = self._get_or_create_spark_cluster()
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/context.py", line 62, in _get_or_create_spark_cluster
    self._spark_cluster = SparkCluster()
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster.py", line 32, in __init__
    self._set_up_master(None, None)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster.py", line 38, in _set_up_master
    self._app_master_bridge.start_up()
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster_master.py", line 55, in start_up
    self._set_properties()
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster_master.py", line 144, in _set_properties
    options["ray.node-ip"] = node.node_ip_address
AttributeError: 'NoneType' object has no attribute 'node_ip_address'

Looks like ray.worker.global_worker.node return None and _set_properties failed afterwards.

node = ray.worker.global_worker.node

I am new to ray and not sure if this is a RayDP issue or Ray environment issue?

Environment

I follow ray instruction and use Ray operator to spin up an example cluster in my environment.

Job Spec

apiVersion: batch/v1
kind: Job
metadata:
  generateName: raydp-spark-
  namespace: ray
spec:
  backoffLimit: 1
  template:
    spec:
      restartPolicy: Never
      containers:
        - name: ray
          image: seedjeffwan/ray:0.1
          imagePullPolicy: Always
          command: [ "/bin/bash", "-c", "--" ]
          args:
            - "wget https://gist.githubusercontent.com/Jeffwan/e10c08dcd4aa2751c361e136896bc35f/raw/9d7a8558290768b042f85695aa509aa35874571f/raydp-spark.py &&
              python raydp-spark.py"
          resources:
            requests:
              cpu: 100m
              memory: 256Mi

RayDP code

import os
import ray
import raydp

HEAD_SERVICE_IP_ENV = "EXAMPLE_CLUSTER_RAY_HEAD_SERVICE_HOST"
HEAD_SERVICE_CLIENT_PORT_ENV = "EXAMPLE_CLUSTER_RAY_HEAD_SERVICE_PORT_CLIENT"

head_service_ip = os.environ[HEAD_SERVICE_IP_ENV]
client_port = os.environ[HEAD_SERVICE_CLIENT_PORT_ENV]
ray.util.connect(f"{head_service_ip}:{client_port}")

spark = raydp.init_spark('word_count',
                         num_executors=2,
                         executor_cores=2,
                         executor_memory='1G')

df = spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
df.show()
word_count = df.groupBy('word').count()
word_count.show()

raydp.stop_spark()

refactor Dataset API

Refactor Dataset API into multiple subclasses:

  1. GeneralDataset. This class is can process any python object and support any transform functions. And user needs to provide the function to convert the GeneralDataset to torch.data.Dataset or tf.data.Dataset.
  2. TextDataset. This class aims to process structure data. Spark is a good choice for processing structure data. So this class is a wrapper of Spark Dataframe.
  3. ImageDataset. This class aims to process image data.

Eg:
GeneralDataset

import raydp

general_ds: GeneralDataset = raydp.from_generator(data_generator, num_shards)
# data process

...
def create_torch_ds(ds: GeneralDataset):
    #...
    return torch.data.Dataset(...)

torch_ds = general_ds.to_torch(create_torch_ds)
estimator.fit(torch_ds)

TextDataset

import raydp

text_ds: TextDataset = raydp.read_parquet(...)
# data process

...
torch_ds = text_ds.to_torch(...)
estimator.fit(torch_ds)

ImageDataset

import raydp

img_ds: ImageDataset = raydp.read_imgs(...)
# data process

...
torch_ds = img_ds.to_torch(...)
estimator.fit(torch_ds)

raydp.modin module to integrate Modin?

TL;DR:
How does one zero-copy convert a PySpark dataframe to a Modin dataframe?

I am currently searching for a way to manipulate PySpark dataframes without materializing them as a Pandas dataframe.
Since my experience with Ray has been quite good, I wonder whether it would be possible to solve this with Modin.

If yes, I think a raydp.modin module would be a perfect addition to this project :)

Not working on Mac

I followed the README to install RayDP.
Got error when running ./build.sh

+ python setup.py bdist_wheel
  File "setup.py", line 31
    JARS_PATH = glob.glob(os.path.join(CORE_DIR, f"target/raydp-*.jar"))
                                                                     ^invalid syntax

I have:

  • Python: 3.8
  • Bazel 3.7.0-homebrew
  • Jdk-15.0.1.jdk
  • Mac: 10.15

Then I just manually re-run the python setup.py and it seems ok:

python setup.py bdist_wheel

And I copied all wheels in dist/

ls 
pyspark-3.0.0-py2.py3-none-any.whl        spark
raydp-0.1.dev0-py2.py3-none-any.whl

And installed RayDP and PySpark with pip:

pip install raydp-0.1.dev0-py2.py3-none-any.whl
pip install pyspark-3.0.0-py2.py3-none-any.whl

Then I run the following code on terminal:

import ray
import os
import pandas as pd, numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F

from pyspark.sql.functions import *

import raydp
from raydp.torch.estimator import TorchEstimator
from raydp.utils import random_split

ray.init()
app_name = "NYC Taxi Fare Prediction with RayDP"
num_executors = 4
cores_per_executor = 1
memory_per_executor = "1GB"
spark = raydp.init_spark(app_name, num_executors, cores_per_executor, memory_per_executor)

Got this error:

>>> spark = raydp.init_spark(app_name, num_executors, cores_per_executor, memory_per_executor)
File descriptor limit 256 is too low for production servers and may result in connection errors. At least 8192 is recommended. --- Fix with 'ulimit -n 8192'
2020-11-30 11:56:36,039	INFO services.py:1169 -- View the Ray dashboard at http://127.0.0.1:8265
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/dr6jl/anaconda3/lib/python3.8/site-packages/ray/jars/ray_dist.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/dr6jl/anaconda3/lib/python3.8/site-packages/pyspark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/Users/dr6jl/anaconda3/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.0.0.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2020-11-30 11:56:40 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/usr/local/Cellar/apache-spark/3.0.1/libexec/jars/spark-unsafe_2.12-3.0.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Exception in thread "main" org.apache.spark.SparkException: Master must either be yarn or start with spark, mesos, k8s, or local
	at org.apache.spark.deploy.SparkSubmit.error(SparkSubmit.scala:936)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:238)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:871)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Error in sys.excepthook:
Traceback (most recent call last):
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/ray/worker.py", line 856, in custom_excepthook
    ray.state.state.add_worker(worker_id, worker_type, worker_info)
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/ray/state.py", line 733, in add_worker
    return self.global_state_accessor.add_worker_info(
AttributeError: 'NoneType' object has no attribute 'add_worker_info'

Original exception was:
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/raydp/context.py", line 121, in init_spark
    return _global_spark_context.get_or_create_session()
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/raydp/context.py", line 69, in get_or_create_session
    self._spark_session = spark_cluster.get_spark_session(
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/raydp/spark/ray_cluster.py", line 64, in get_spark_session
    spark_builder.appName(app_name).master(self.get_cluster_url()).getOrCreate()
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py", line 186, in getOrCreate
    sc = SparkContext.getOrCreate(sparkConf)
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/pyspark/context.py", line 371, in getOrCreate
    SparkContext(conf=conf or SparkConf())
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/pyspark/context.py", line 128, in __init__
    SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/pyspark/context.py", line 320, in _ensure_initialized
    SparkContext._gateway = gateway or launch_gateway(conf)
  File "/Users/dr6jl/anaconda3/lib/python3.8/site-packages/pyspark/java_gateway.py", line 105, in launch_gateway
    raise Exception("Java gateway process exited before sending its port number")
Exception: Java gateway process exited before sending its port number

Raydp package is not installed on ray:1.3.0 image

Hi,

I'm trying to use raydp with Ray client mode and I found the below issue as a guide.
#130

The issue suggests I should instantiate SparkSession within Ray worker's context, but as I tried it, an error occurred telling me that raydp is not installed on worker nodes.

Is this intended? Should I install raydp pacakges on all Ray worker nodes?

Py4JJavaError

Hi,
while trying out the example "Titanic survival prediction with RayDp", I ran into the following problem when running the step:

app_name = "Titanic survival prediction with RayDp" num_executors = 2 cores_per_executor = 1 memory_per_executor = "1GB" spark = raydp.init_spark(app_name, num_executors, cores_per_executor, memory_per_executor)

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-3-478be455a428> in <module>
      3 cores_per_executor = 1
      4 memory_per_executor = "1GB"
----> 5 spark = raydp.init_spark(app_name, num_executors, cores_per_executor, memory_per_executor)

/usr/local/lib/python3.8/dist-packages/raydp/context.py in init_spark(app_name, num_executors, executor_cores, executor_memory, configs)
    120                 _global_spark_context = _SparkContext(
    121                     app_name, num_executors, executor_cores, executor_memory, configs)
--> 122                 return _global_spark_context.get_or_create_session()
    123             except:
    124                 _global_spark_context = None

/usr/local/lib/python3.8/dist-packages/raydp/context.py in get_or_create_session(self)
     66         if self._spark_session is not None:
     67             return self._spark_session
---> 68         spark_cluster = self._get_or_create_spark_cluster()
     69         self._spark_session = spark_cluster.get_spark_session(
     70             self._app_name,

/usr/local/lib/python3.8/dist-packages/raydp/context.py in _get_or_create_spark_cluster(self)
     60         if self._spark_cluster is not None:
     61             return self._spark_cluster
---> 62         self._spark_cluster = SparkCluster()
     63         return self._spark_cluster
     64 

/usr/local/lib/python3.8/dist-packages/raydp/spark/ray_cluster.py in __init__(self)
     29         super().__init__(None)
     30         self._app_master_bridge = None
---> 31         self._set_up_master(None, None)
     32         self._spark_session: SparkSession = None
     33 

/usr/local/lib/python3.8/dist-packages/raydp/spark/ray_cluster.py in _set_up_master(self, resources, kwargs)
     35         # TODO: specify the app master resource
     36         self._app_master_bridge = RayClusterMaster()
---> 37         self._app_master_bridge.start_up()
     38 
     39     def _set_up_worker(self, resources: Dict[str, float], kwargs: Dict[str, str]):

/usr/local/lib/python3.8/dist-packages/raydp/spark/ray_cluster_master.py in start_up(self, popen_kwargs)
     54         self._set_properties()
     55         self._host = ray._private.services.get_node_ip_address()
---> 56         self._create_app_master(extra_classpath)
     57         self._started_up = True
     58 

/usr/local/lib/python3.8/dist-packages/raydp/spark/ray_cluster_master.py in _create_app_master(self, extra_classpath)
    162         if self._started_up:
    163             return
--> 164         self._app_master_java_bridge.startUpAppMaster(extra_classpath)
    165 
    166     def get_master_url(self):

/usr/local/lib/python3.8/dist-packages/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/usr/local/lib/python3.8/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o0.startUpAppMaster.
: java.lang.RuntimeException: Failed to initialize Ray runtime.
	at io.ray.api.Ray.init(Ray.java:27)
	at org.apache.spark.deploy.raydp.AppMasterJavaBridge.startUpAppMaster(AppMasterJavaBridge.scala:41)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Failed to initialize ray runtime
	at io.ray.runtime.DefaultRayRuntimeFactory.createRayRuntime(DefaultRayRuntimeFactory.java:43)
	at io.ray.api.Ray.init(Ray.java:38)
	at io.ray.api.Ray.init(Ray.java:25)
	... 12 more
Caused by: java.lang.RuntimeException: Failed to get address info. Output: null
	at io.ray.runtime.runner.RunManager.getAddressInfoAndFillConfig(RunManager.java:90)
	at io.ray.runtime.RayNativeRuntime.start(RayNativeRuntime.java:88)
	at io.ray.runtime.DefaultRayRuntimeFactory.createRayRuntime(DefaultRayRuntimeFactory.java:39)
	... 14 more
Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
	at io.ray.runtime.runner.RunManager.runCommand(RunManager.java:105)
	at io.ray.runtime.runner.RunManager.getAddressInfoAndFillConfig(RunManager.java:80)
	... 16 more
Caused by: java.io.IOException: error=2, No such file or directory
	at java.lang.UNIXProcess.forkAndExec(Native Method)
	at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
	at java.lang.ProcessImpl.start(ProcessImpl.java:134)
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
	... 18 more

I checked the closed issues before, and I already changed my JDK version to
openjdk version "1.8.0_282"
OpenJDK Runtime Environment (build 1.8.0_282-8u282-b08-0ubuntu1~20.04-b08)
OpenJDK 64-Bit Server VM (build 25.282-b08, mixed mode)

Can you please help me?
Thank you!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.