Git Product home page Git Product logo

cerndb / dist-keras Goto Github PK

View Code? Open in Web Editor NEW
624.0 49.0 171.0 55.9 MB

Distributed Deep Learning, with a focus on distributed training, using Keras and Apache Spark.

Home Page: http://joerihermans.com/work/distributed-keras/

License: GNU General Public License v3.0

Python 69.60% CSS 1.59% JavaScript 1.16% HTML 27.65%
machine-learning deep-learning apache-spark data-parallelism distributed-optimizers keras optimization-algorithms tensorflow data-science hadoop

dist-keras's Introduction

Distributed Keras

Distributed Deep Learning with Apache Spark and Keras.

Introduction

Distributed Keras is a distributed deep learning framework built op top of Apache Spark and Keras, with a focus on "state-of-the-art" distributed optimization algorithms. We designed the framework in such a way that a new distributed optimizer could be implemented with ease, thus enabling a person to focus on research. Several distributed methods are supported, such as, but not restricted to, the training of ensembles and models using data parallel methods.

Most of the distributed optimizers we provide, are based on data parallel methods. A data parallel method, as described in [1], is a learning paradigm where multiple replicas of a single model are used to optimize a single objective. Using this approach, we are able to dignificantly reduce the training time of a model. Depending on the parametrization, we also observed that it is possible to achieve better statistical model performance compared to a more traditional approach (e.g., like the SingleTrainer implementation), and yet, spending less wallclock time on the training of the model. However, this is subject to further research.

Attention: A rather complete introduction to the problem of Distributed Deep Learning is presented in my Master Thesis http://github.com/JoeriHermans/master-thesis. Furthermore, the thesis describes includes several novel insights, such as a redefinition of parameter staleness, and several new distributed optimizers such as AGN and ADAG.

Installation

We will guide you how to install Distributed Keras. However, we will assume that an Apache Spark installation is available. In the following subsections, we describe two approaches to achieve this.

pip

When you only require the framework for development purposes, just use pip to install dist-keras.

pip install --upgrade dist-keras

# OR

pip install --upgrade git+https://github.com/JoeriHermans/dist-keras.git

git & pip

However, if you would like to contribute, or run some of the examples. It is probably best to clone the repository directly from GitHub and install it afterwards using pip. This will also resolve possible missing dependencies.

git clone https://github.com/JoeriHermans/dist-keras
cd dist-keras
pip install -e .

General notes

.bashrc

Make sure the following variables are set in your .bashrc. It is possible, depending on your system configuration, that the following configuration doesn't have to be applied.

# Example of a .bashrc configuration.
export SPARK_HOME=/usr/lib/spark
export PYTHONPATH="$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH"

Running an example

We would like to refer the reader to the workflow.ipynb notebook in the examples folder. This will give you a complete introduction to the problem of distributed deep learning, and will guide you through the steps that have to be executed.

Furthermore, we would also like to show how you exactly should process "big" datasets. This is shown in the examples starting with the prefix example_. Please execute them in the provided sequence.

Spark 2.0

If you want to run the examples using Apache Spark 2.0.0 and higher. You will need to remove the line containing sqlContext = SQLContext(sc). We need to do this because in Spark 2.0+, the SQLContext, and Hive context are now merged in the Spark session.

Optimization Algorithms

Sequential Trainer

This optimizer follows the traditional scheme of training a model, i.e., it uses sequential gradient updates to optimize the parameters. It does this by executing the training procedure on a single Spark executor.

SingleTrainer(model, features_col, label_col, batch_size, optimizer, loss, metrics=["accuracy"])

ADAG (Currently Recommended)

DOWNPOUR variant which is able to achieve significantly better statistical performance while being less sensitive to hyperparameters. This optimizer was developed using insights gained while developing this framework. More research regarding parameter staleness is still being conducted to further improve this optimizer.

ADAG(keras_model, worker_optimizer, loss, metrics=["accuracy"], num_workers=2, batch_size=32,
     features_col="features", label_col="label", num_epoch=1, communication_window=12)

Dynamic SGD

Dynamic SGD, dynamically maintains a learning rate for every worker by incorperating parameter staleness. This optimization scheme is introduced in "Heterogeneity-aware Distributed Parameter Servers" at the SIGMOD 2017 conference [5].

DynSGD(keras_model, worker_optimizer, loss, metrics=["accuracy"], num_workers=2, batch_size=32,
       features_col="features", label_col="label", num_epoch=1, communication_window=10)

Asynchronous Elastic Averaging SGD (AEASGD)

The distinctive idea of EASGD is to allow the local workers to perform more exploration (small rho) and the master to perform exploitation. This approach differs from other settings explored in the literature, and focus on how fast the center variable converges [2] .

In this section we show the asynchronous version of EASGD. Instead of waiting on the synchronization of other trainers, this method communicates the elastic difference (as described in the paper), with the parameter server. The only synchronization mechanism that has been implemented, is to ensure no race-conditions occur when updating the center variable.

AEASGD(keras_model, worker_optimizer, loss, metrics=["accuracy"], num_workers, batch_size, features_col,
       label_col, num_epoch, communication_window, rho, learning_rate)

Asynchronous Elastic Averaging Momentum SGD (AEAMSGD)

Asynchronous EAMSGD is a variant of asynchronous EASGD. It is based on the Nesterov's momentum scheme, where the update of the local worker is modified to incorepare a momentum term [2] .

EAMSGD(keras_model, worker_optimizer, loss, metrics=["accuracy"], num_workers, batch_size,
       features_col, label_col, num_epoch, communication_window, rho,
       learning_rate, momentum)

DOWNPOUR

An asynchronous stochastic gradient descent procedure introduced by Dean et al., supporting a large number of model replicas and leverages adaptive learning rates. This implementation is based on the pseudocode provided by [1] .

DOWNPOUR(keras_model, worker_optimizer, loss, metrics=["accuracy"], num_workers, batch_size,
         features_col, label_col, num_epoch, learning_rate, communication_window)

Ensemble Training

In ensemble training, we train n models in parallel on the same dataset. All models are trained in parallel, but the training of a single model is done in a sequential manner using Keras optimizers. After the training process, one can combine and, for example, average the output of the models.

EnsembleTrainer(keras_model, worker_optimizer, loss, metrics=["accuracy"], features_col,
                label_col, batch_size, num_ensembles)

Model Averaging

Model averaging is a data parallel technique which will average the trainable parameters of model replicas after every epoch.

AveragingTrainer(keras_model, worker_optimizer, loss, metrics=["accuracy"], features_col,
                 label_col, num_epoch, batch_size, num_workers)

Job deployment

We also support remote job deployment. For example, imagine you are developing your model on a local notebook using a small development set. However, in order to submit your job on a remote cluster, you first need to develop a cluster job, and run the job there. In order to simplify this process, we have developed a simplified interface for a large scale machine learning job.

In order to submit a job to a remote cluster, you simply run the following code:

# Define the distributed optimization procedure, and its parameters.
trainer = ADAG(keras_model=mlp, worker_optimizer=optimizer_mlp, loss=loss_mlp, metrics=["accuracy"], num_workers=20,
               batch_size=32, communication_window=15, num_epoch=1,
               features_col="features_normalized_dense", label_col="label_encoded")

# Define the job parameters.
job = Job(secret, job_name, data_path, num_executors, num_processes, trainer)
job.send('http://yourcluster:[port]')
job.wait_completion()
# Fetch the trained model, and history for training evaluation.
trained_model = job.get_trained_model()
history = job.get_history()

Punchcard Server

Job scheduling, and execution is handled by our Punchcard server. This server will accept requests from a remote location given a specific secret, which is basically a long identification string of a specific user. However, a user can have multiple secrets. At the moment, a job is only executed if there are no other jobs running for the specified secret.

In order to submit jobs to Punchcard we need to specify a secrets file. This file is basically a JSON structure, it will have the following structure:

[
    {
        "secret": "secret_of_user_1",
        "identity": "user1"
    },
    {
        "secret": "secret_of_user_2",
        "identity": "user2"
    }
]

After the secrets file has been constructed, the Punchcard server can be started by issueing the following command.

python scripts/punchcard.py --secrets /path/to/secrets.json

Secret Generation

In order to simplify secret generation, we have added a costum script which will generate a unique key for the specified identity. The structure can be generated by running the following command.

python scripts/generate_secret.py --identity userX

Optimization Schemes

TODO

General note

It is known that adding more asynchronous workers deteriorates the statistical performance of the model. There have been some studies which examinate this particular effect. However, some of them conclude that actually adding more asynchronous workers contributes to something what they call implicit momentum [3]. However, this is subject to further investigation.

Known issues

  • Python 3 compatibility.

TODO's

List of possible future additions.

  • Save Keras model to HDFS.
  • Load Keras model from HDFS.
  • Compression / decompression of network transmissions.
  • Stop on target loss.
  • Multiple parameter servers for large Deep Networks.
  • Python 3 compatibility.
  • For every worker, spawn an additional thread which is responsible for sending updates to the parameter server. The actual worker thread will just submit tasks to this queue.

Citing

If you use this framework in any academic work, please use the following BibTex code.

@misc{dist_keras_joerihermans,
  author = {Joeri R. Hermans, CERN IT-DB},
  title = {Distributed Keras: Distributed Deep Learning with Apache Spark and Keras},
  year = {2016},
  publisher = {GitHub},
  journal = {GitHub Repository},
  howpublished = {\url{https://github.com/JoeriHermans/dist-keras/}},
}

References

  • Dean, J., Corrado, G., Monga, R., Chen, K., Devin, M., Mao, M., ... & Ng, A. Y. (2012). Large scale distributed deep networks. In Advances in neural information processing systems (pp. 1223-1231). [1]

  • Zhang, S., Choromanska, A. E., & LeCun, Y. (2015). Deep learning with elastic averaging SGD. In Advances in Neural Information Processing Systems (pp. 685-693). [2]

  • Mitliagkas, Ioannis, et al. "Asynchrony begets Momentum, with an Application to Deep Learning." arXiv preprint arXiv:1605.09774 (2016). [3]

Licensing

GPLv3 CERN

dist-keras's People

Contributors

cy-dev avatar jwang12345 avatar mcamilleri avatar natbusa avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dist-keras's Issues

Transformers cause out of memory errors on executors

I think this is due to utils.new_dataframe_row copying whole rows. Why not use a UDF in the transform method? I observe this while doing a count on a dataset of only 6 million rows after transforming it with OneHotTransformer.

too much stderr on workers

When using ADAG, yarn logs get full of verbose text.
I guess the origin is at

sys.stderr.write("Epoch: " + str(self.current_epoch) + " Iteration: " + str(self.iteration) + " loss:" + str(h) + "\n")

Is it possible to disable it? It's becoming a DOS to our cluster!
In 20 minutes I got more than 3 milion entries on my logs!

Model runs in all GPUs

I have user partitioned data (for now 3 users), and want to train a model for every partitioned data.

I used dist-keras and using local[*] spark mode with 3 executors (8g) and each with 1 cores i.e. 1 executor for 1 user. When the script is triggered i see the model runs on all GPUs. Has anyone experienced the similar issue, I can provide more information if asked.

Version
keras - 2.1.3
tensorflow - 1.4.0-rc0
spark - 2.2.1

[1] Tesla K80 | 53'C, 0 % | 11439 / 11439 MB | br(10856M) br(208M) br(285M) br(60M)
[2] Tesla K80 | 49'C, 0 % | 11439 / 11439 MB | br(10856M) br(208M) br(285M) br(60M)
[3] Tesla K80 | 55'C, 0 % | 11439 / 11439 MB | br(10856M) br(208M) br(285M) br(60M)
[4] Tesla K80 | 42'C, 0 % | 11439 / 11439 MB | br(10854M) br(210M) br(285M) br(60M)
[5] Tesla K80 | 49'C, 0 % | 11439 / 11439 MB | br(10854M) br(210M) br(285M) br(60M)
[6] Tesla K80 | 37'C, 0 % | 11439 / 11439 MB | br(10854M) br(210M) br(285M) br(60M)
[7] Tesla K80 | 45'C, 0 % | 11439 / 11439 MB | br(10852M) br(212M) br(285M) br(60M)

Weights not being updated

I'm following this notebook almost exactly. However, after I train my model, my weights aren't getting updated:

model.layers[0].get_weights()
trained_model.layers[0].get_weights()

Both of these give me:

[array([[-0.39513412, 0.26937097, -0.36478603, 0.30427128, -0.13985097,
-0.22316453, 0.13130313, -0.08426034],
[ 0.41418487, -0.46847233, 0.58078319, -0.63027477, -0.45647684,
-0.325973 , 0.22211522, 0.55291325],
[ 0.54379755, -0.30091569, -0.02049094, -0.4734239 , -0.41363743,
-0.38102722, -0.19341171, -0.36358535],
[-0.08354402, 0.39400059, 0.04485017, -0.1212253 , 0.07950532,
0.37202805, 0.30843312, -0.25526762]], dtype=float32),
array([ 0., 0., 0., 0., 0., 0., 0., 0.], dtype=float32)]

Why is this?

tee vs. union for epochs

I see that the method for managing epochs was changed to copy the iterator of the data num_epoch times using tee. The documentation for tee says

This itertool may require significant auxiliary storage (depending on how much temporary data needs to be stored). In general, if one iterator uses most or all of the data before another iterator starts, it is faster to use list() instead of tee().

which makes it seem like this method is not very efficient. also, if one wants to use aparallelism_factor > 1, then this way of managing the epochs would result in re-training on data in one partition num_epoch times before seeing any new data. Wouldn't it be better to keep the old way using unionAll (and cache dataset before you copy it? not so sure about this part...)

Ambiguous reference in notebook workflow

While running the workflow notebook, running cell 12 present the following error:

AnalysisException: u"Reference 'label' is ambiguous, could be: label#395, label#399, label#400.;"

Is there a work-around for the problem?

Issues when initializing SparkSession and KafkaUtils.createStream

Hi,

I am running your ml_pipeline example and I have following configurations:
Python : 3.6
Keras : 2
Spark : 2
Kafka : 2.11
Java : 8

'D:\spark\bin\spark-submit2.cmd" --conf "spark.app.name' is not recognized as an internal or external command, operable program or batch file. Traceback (most recent call last): File "ml_pipeline.py", line 69, in <module> sc = SparkSession.builder.config(conf=conf).appName(application_name).getOrCreate() File "d:\Anaconda3\lib\site-packages\pyspark\sql\session.py", line 173, in getOrCreate sc = SparkContext.getOrCreate(sparkConf) File "d:\Anaconda3\lib\site-packages\pyspark\context.py", line 334, in getOrCreate SparkContext(conf=conf or SparkConf()) File "d:\Anaconda3\lib\site-packages\pyspark\context.py", line 115, in __init__ SparkContext._ensure_initialized(self, gateway=gateway, conf=conf) File "d:\Anaconda3\lib\site-packages\pyspark\context.py", line 283, in _ensure_initialized SparkContext._gateway = gateway or launch_gateway(conf) File "d:\Anaconda3\lib\site-packages\pyspark\java_gateway.py", line 95, in launch_gateway raise Exception("Java gateway process exited before sending the driver its port number") Exception: Java gateway process exited before sending the driver its port number

application_name = "Distributed Keras Kafka Pipeline
I overcome this error by removing the spaces in title "Distributed Keras Kafka Pipeline". It was strange for me but it did work not sure why ??

Now. I am stuck on this error "AttributeError: 'SparkSession' object has no attribute '_getJavaStorageLevel'". I did some research and found that the default storage level has changed to MEMORY_AND_DISK. So I don't think I have to provide any such value as configuration. I appreciate your help.

EDIT:

I have resolved the above issue by using the relevant spark-streaming-kafka-assembly jar file and it successfully creating the streaming context now. But now I am facing another issue.

On fist iteration as control goes into predict(df) method, I got following errors:

============================PREPARED DATAFRAME=========================== DataFrame[DER_deltaeta_jet_jet: double, DER_deltar_tau_lep: double, DER_lep_eta_centrality: double, DER_mass_MMC: double, DER_mass_jet_jet: double, DER_mass_transverse_met_lep: double, DER_mass_vis: double, DER_met_phi_centrality: double, DER_prodeta_jet_jet: double, DER_pt_h: double, DER_pt_ratio_lep_tau: double, DER_pt_tot: double, DER_sum_pt: double, EventId: double, PRI_jet_all_pt: double, PRI_jet_leading_eta: double, PRI_jet_leading_phi: double, PRI_jet_leading_pt: double, PRI_jet_num: double, PRI_jet_subleading_eta: double, PRI_jet_subleading_phi: double, PRI_jet_subleading_pt: double, PRI_lep_eta: double, PRI_lep_phi: double, PRI_lep_pt: double, PRI_met: double, PRI_met_phi: double, PRI_met_sumet: double, PRI_tau_eta: double, PRI_tau_phi: double, PRI_tau_pt: double, features: vector, features_normalized: vector] 18/03/01 15:04:51 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 18/03/01 15:04:51 WARN BlockManager: Block input-0-1519896890400 replicated to only 0 peer(s) instead of 1 peers 18/03/01 15:04:51 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 18/03/01 15:04:51 WARN BlockManager: Block input-0-1519896890600 replicated to only 0 peer(s) instead of 1 peers 18/03/01 15:04:51 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 18/03/01 15:04:51 WARN BlockManager: Block input-0-1519896890800 replicated to only 0 peer(s) instead of 1 peers 2018-03-01 15:04:51.914911: W c:\l\tensorflow_1501918863922\work\tensorflow-1.2.1\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE instructions, but these are available on your machine and could speed up CPU computations. 18/03/01 15:04:51 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 18/03/01 15:04:51 WARN BlockManager: Block input-0-1519896891600 replicated to only 0 peer(s) instead of 1 peers 2018-03-01 15:04:51.923321: W c:\l\tensorflow_1501918863922\work\tensorflow-1.2.1\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE2 instructions, but these are available on your machine and could speed up CPU computations. 2018-03-01 15:04:51.940857: W c:\l\tensorflow_1501918863922\work\tensorflow-1.2.1\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE3 instructions, but these are available on your machine and could speed up CPU computations. 2018-03-01 15:04:51.949033: W c:\l\tensorflow_1501918863922\work\tensorflow-1.2.1\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations. 2018-03-01 15:04:51.959770: W c:\l\tensorflow_1501918863922\work\tensorflow-1.2.1\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.2 instructions, but these are available on your machine and could speed up CPU computations. 2018-03-01 15:04:51.970355: W c:\l\tensorflow_1501918863922\work\tensorflow-1.2.1\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX instructions, but these are available on your machine and could speed up CPU computations. 2018-03-01 15:04:51.979551: W c:\l\tensorflow_1501918863922\work\tensorflow-1.2.1\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX2 instructions, but these are available on your machine and could speed up CPU computations. 2018-03-01 15:04:51.989807: W c:\l\tensorflow_1501918863922\work\tensorflow-1.2.1\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use FMA instructions, but these are available on your machine and could speed up CPU computations. 18/03/01 15:04:52 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 18/03/01 15:04:52 WARN BlockManager: Block input-0-1519896891800 replicated to only 0 peer(s) instead of 1 peers 18/03/01 15:04:52 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 18/03/01 15:04:52 WARN BlockManager: Block input-0-1519896892000 replicated to only 0 peer(s) instead of 1 peers 18/03/01 15:04:52 ERROR JobScheduler: Error running job streaming job 1519896870000 ms.0 org.apache.spark.SparkException: An exception was raised by Python: Traceback (most recent call last): File "d:\Anaconda3\lib\site-packages\pyspark\streaming\util.py", line 65, in call r = self.func(t, *rdds) File "d:\Anaconda3\lib\site-packages\pyspark\streaming\dstream.py", line 159, in <lambda> func = lambda t, rdd: old_func(rdd) File "ml_pipeline.py", line 132, in process_instances df = predict(df) # Add the raw Neural Network predictions. File "ml_pipeline.py", line 91, in predict predictor = ModelPredictor(keras_model=model, features_col="features_normalized", output_col="prediction") File "d:\dist-keras\distkeras\predictors.py", line 45, in __init__ super(ModelPredictor, self).__init__(keras_model) File "d:\dist-keras\distkeras\predictors.py", line 23, in __init__ self.model = serialize_keras_model(keras_model) File "d:\dist-keras\distkeras\utils.py", line 84, in serialize_keras_model dictionary['weights'] = model.get_weights() File "d:\Anaconda3\lib\site-packages\keras\models.py", line 699, in get_weights return self.model.get_weights() File "d:\Anaconda3\lib\site-packages\keras\engine\topology.py", line 2008, in get_weights return K.batch_get_value(weights) File "d:\Anaconda3\lib\site-packages\keras\backend\tensorflow_backend.py", line 2320, in batch_get_value return get_session().run(ops) File "d:\Anaconda3\lib\site-packages\tensorflow\python\client\session.py", line 789, in run run_metadata_ptr) File "d:\Anaconda3\lib\site-packages\tensorflow\python\client\session.py", line 927, in _run raise RuntimeError('The Session graph is empty. Add operations to the ' RuntimeError: The Session graph is empty. Add operations to the graph before calling run().

Any help would be appreciated. Am I missing anything here ?

Thanks & Regards
Anish Sharma

something confused about LabelIndexTransformer's code

Here is part of your LabelIndexTransformer's code:

    def get_index(self, vector):
        """Returns the index with the highest value or with activation threshold."""
        for index in range(0, self.output_dimensionality):
            if vector[index] >= self.activation_threshold:
                return index

Returns the index with the highest value or with activation threshold.But i think it only return the first index with activation threshold.It can't return the index with the highest value.Do i misunderstand it?

Spark streaming with dist-keras

Hi,

My problem is that I need to predict on a time series data which is coming in real time. So, I was looking for any sort of tutorial on how to leverage spark streaming with dist-keras. To my disappointment, I was not able to find any. Can any one tell me if that is even possible with the latest release of dist-keras. Just to mention that I am asking about spark streaming and not spark as I know it supports spark. Any other ideas would be welcome. Please advice.

Thanks & Regards
Anish Sharma

what makes dist-keras different?

Hi, I'm interested in this project. I have a question, what's the difference between disk-keras and elephas another project which is also built with spark and keras?

GPU-compatible release

The current dist-keras release seems to depend on tensorflow (see setup.py). Would it be possible to make a release (on PyPi) that depends on tensorflow-gpu? Thanks!

Address already in use

Sometimes when training a model with a distributed learner implemented distkeras an error of the following form shows up:

Exception in thread Thread-22:
Traceback (most recent call last):
File "/usr/lib/anaconda2/lib/python2.7/threading.py", line 801, in __bootstrap_inner
self.run()
File "/usr/lib/anaconda2/lib/python2.7/threading.py", line 754, in run
self.__target(*self.__args, **self.__kwargs)
File "/dist-keras/distkeras/trainers.py", line 458, in service
self.parameter_server.initialize()
File "/dist-keras/distkeras/parameter_servers.py", line 111, in initialize
file_descriptor.bind(('0.0.0.0', self.master_port))
File "/usr/lib/anaconda2/lib/python2.7/socket.py", line 228, in meth
return getattr(self._sock,name)(*args)
error: [Errno 98] Address already in use

How could I avoid this error?

Sparse Vector -Usage

Hi there, I was going to create a Spars Vector Transformer but noticed you said that Sparse vectors don’t work at the moment. Can you describe in more details how come? and if it will ever be implemented? and is it difficult to implement?

Thanks,

When I run transformer, reported ImportError: No module named distkeras.utils

I run a transformer on mnist data set.

from pyspark.sql.types import IntegerType
changedTypedf = train_df.withColumn("label_int", train_df["label"].cast("integer")).select('features', 'label_int')
changedTypedf.printSchema()
encoder = OneHotTransformer(nb_classes, input_col="label_int", output_col="label_encoded")
encoder.transform(changedTypedf)

I want to run these codes.
The schema is
root
|-- features: vector (nullable = true)
|-- label_int: integer (nullable = true)

However, it reported these error.

Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-4755371756168234437.py", line 367, in
raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-4755371756168234437.py", line 360, in
exec(code, _zcUserQueryNameSpace)
File "", line 5, in
File "/usr/local/lib/python2.7/site-packages/distkeras/transformers.py", line 299, in transform
return dataframe.rdd.map(self._transform).toDF()
File "/usr/lib/spark/python/pyspark/sql/session.py", line 57, in toDF
return sparkSession.createDataFrame(self, schema, sampleRatio)
File "/usr/lib/spark/python/pyspark/sql/session.py", line 535, in createDataFrame
rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
File "/usr/lib/spark/python/pyspark/sql/session.py", line 375, in _createFromRDD
struct = self._inferSchema(rdd, samplingRatio)
File "/usr/lib/spark/python/pyspark/sql/session.py", line 346, in _inferSchema
first = rdd.first()
File "/usr/lib/spark/python/pyspark/rdd.py", line 1361, in first
rs = self.take(1)
File "/usr/lib/spark/python/pyspark/rdd.py", line 1343, in take
res = self.context.runJob(self, takeUpToNumLeft, p)
File "/usr/lib/spark/python/pyspark/context.py", line 992, in runJob
port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call
answer, self.gateway_client, self.target_id, self.name)
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 26.0 failed 4 times, most recent failure: Lost task 0.3 in stage 26.0 (TID 67, ip-172-31-22-245.ec2.internal, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt1/yarn/usercache/zeppelin/appcache/application_1507639434255_0007/container_1507639434255_0007_01_000003/pyspark.zip/pyspark/worker.py", line 166, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/mnt1/yarn/usercache/zeppelin/appcache/application_1507639434255_0007/container_1507639434255_0007_01_000003/pyspark.zip/pyspark/worker.py", line 55, in read_command
command = serializer._read_with_length(file)
File "/mnt1/yarn/usercache/zeppelin/appcache/application_1507639434255_0007/container_1507639434255_0007_01_000003/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length
return self.loads(obj)
File "/mnt1/yarn/usercache/zeppelin/appcache/application_1507639434255_0007/container_1507639434255_0007_01_000003/pyspark.zip/pyspark/serializers.py", line 454, in loads
return pickle.loads(obj)
ImportError: No module named distkeras.utils
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1678)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1677)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:446)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt1/yarn/usercache/zeppelin/appcache/application_1507639434255_0007/container_1507639434255_0007_01_000003/pyspark.zip/pyspark/worker.py", line 166, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/mnt1/yarn/usercache/zeppelin/appcache/application_1507639434255_0007/container_1507639434255_0007_01_000003/pyspark.zip/pyspark/worker.py", line 55, in read_command
command = serializer._read_with_length(file)
File "/mnt1/yarn/usercache/zeppelin/appcache/application_1507639434255_0007/container_1507639434255_0007_01_000003/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length
return self.loads(obj)
File "/mnt1/yarn/usercache/zeppelin/appcache/application_1507639434255_0007/container_1507639434255_0007_01_000003/pyspark.zip/pyspark/serializers.py", line 454, in loads
return pickle.loads(obj)
ImportError: No module named distkeras.utils
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more

Any hints for me? How can I solve it?

Repartition vs coalesce

In your trainer.py code you use repartition. I suggest coalesce as to not trigger shuffle and cause expense time spending.

trainer.py

        if shuffle:
            dataframe = shuffle(dataframe)
        # Indicate the parallelism (number of worker times parallelism factor).
        parallelism = self.parallelism_factor * self.num_workers
        # Check if we need to repartition the dataframe.
        if num_partitions >= parallelism:
            dataframe = dataframe.coalesce(parallelism)
        else:
       dataframe = dataframe.repartition(parallelism)

MNIST with spark standalone cluster mode

@JoeriHermans

Hi Joeri,

I tried to run one of my experiment with pysprak standalone cluster mode, with 3 workers.
I'm getting an connectionRefused. error to the worker.
Is this expected?

ee207437@pcg-ee207437-1:/usr/lib/spark$ ./bin/spark-submit --master spark://10.51.5.40:7077 examples/src/main/python/gtzanKeras.py gtzan.parquet
Using TensorFlow backend.
17/10/11 14:35:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
root
|-- features_normalized: vector (nullable = true)
|-- label_index: double (nullable = true)
|-- label: array (nullable = true)
| |-- element: double (containsNull = true)


Layer (type) Output Shape Param #

dense_1 (Dense) (None, 40) 1240


activation_1 (Activation) (None, 40) 0


dropout_1 (Dropout) (None, 40) 0


dense_2 (Dense) (None, 15) 615


activation_2 (Activation) (None, 15) 0


dropout_2 (Dropout) (None, 15) 0


dense_3 (Dense) (None, 10) 160


activation_3 (Activation) (None, 10) 0

Total params: 2,015
Trainable params: 2,015
Non-trainable params: 0


Number of training instances: 887
Number of testing instances: 113
2017-10-11 14:36:03.929908: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations.
2017-10-11 14:36:03.929928: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.2 instructions, but these are available on your machine and could speed up CPU computations.
2017-10-11 14:36:03.929934: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX instructions, but these are available on your machine and could speed up CPU computations.
2017-10-11 14:36:03.929938: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX2 instructions, but these are available on your machine and could speed up CPU computations.
2017-10-11 14:36:03.929943: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use FMA instructions, but these are available on your machine and could speed up CPU computations.
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 754, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/local/lib/python2.7/dist-packages/distkeras/trainers.py", line 466, in service
self.parameter_server.initialize()
File "/usr/local/lib/python2.7/dist-packages/distkeras/parameter_servers.py", line 111, in initialize
file_descriptor.bind(('0.0.0.0', self.master_port))
File "/usr/lib/python2.7/socket.py", line 228, in meth
return getattr(self._sock,name)(*args)
error: [Errno 98] Address already in use

[Stage 9:> (0 + 3) / 3]17/10/11 14:36:10 WARN TaskSetManager: Lost task 1.0 in stage 9.0 (TID 656, 10.51.5.30, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
process()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/lib/python2.7/dist-packages/distkeras/workers.py", line 261, in train
self.connect()
File "/usr/local/lib/python2.7/dist-packages/distkeras/workers.py", line 197, in connect
self.socket = connect(self.master_host, self.master_port, self.disable_nagle)
File "/usr/local/lib/python2.7/dist-packages/distkeras/networking.py", line 97, in connect
fd.connect((host, port))
File "/usr/lib/python2.7/socket.py", line 228, in meth
return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Thanks,
Manoj

Error in StringIndexer of the example

Hi,

I am getting the following error when I run the example that works with atlas_higgs data.
In the StringIndexer section I am getting the following error:

File "/disk/disk16/hadoop/yarn/local/usercache/319413696/appcache/application_1510179039244_71672/container_e31_1510179039244_71672_01_000004/pyspark.zip/pyspark/worker.py", line 163, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/disk/disk16/hadoop/yarn/local/usercache/319413696/appcache/application_1510179039244_71672/container_e31_1510179039244_71672_01_000004/pyspark.zip/pyspark/worker.py", line 54, in read_command
    command = serializer._read_with_length(file)
  File "/disk/disk16/hadoop/yarn/local/usercache/MyID/appcache/application_1510179039244_71672/container_e31_1510179039244_71672_01_000004/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length
    return self.loads(obj)
  File "/disk/disk16/hadoop/yarn/local/usercache/MyID/appcache/application_1510179039244_71672/container_e31_1510179039244_71672_01_000004/pyspark.zip/pyspark/serializers.py", line 434, in loads
    return pickle.loads(obj)
AttributeError: 'module' object has no attribute 'to_dense_vector'

Progress Bar

Hi, I've been using the dist-keras and its been great so far, I've encountered a problem though for large datasets (eg:// 1,000,000 training rows) it can take > 12h. Is there a progress bar like with keras.

Iteration 1
Epoch 1/1
4500/80460 [=>.............................................] - ETA: 795s - loss 3.5021

Problem with running mnist.py example

I am trying to run mnist.py with standalone cluster mode. for this I set master = "local[*] to "spark://MY_MASTER_IP:7077" and I submit my task by following command

 spark-submit --master spark://192.168.1.101:7077 \
--num-executors 1\
--executor-memory 8G \
--executor-cores 2 \
--driver-memory 8G PATH_TO/mnist.py

but I get the following error

INFO DAGScheduler: ResultStage 1 (load at NativeMethodAccessorImpl.java:0) failed in 2.200 s due to Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 13, 192.168.1.102, executor 1): java.io.FileNotFoundException: File file:/home/semanticslab3/development/spark/spark-2.2.1-bin-hadoop2.7/bin/data/mnist_train.csv does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:174)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336)
	at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1113)
	at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1113)
	at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2125)
	at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2125)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

     Driver stacktrace:
    17/12/14 16:29:32 INFO DAGScheduler: Job 1 failed: load at NativeMethodAccessorImpl.java:0, took   2.211758 s

With master = "local[*] it works nice but it use only on worker. I want to use to two of my worker

Thanks

how to use directly the numpy data loaded to rdd

Hi thanks for this work, it would be also nice to have some examples with mnist and cifar where we manipulate numpy arrays since this are actually small datasets and load them to memory, bring them in the desired format and then transform them into RDD and use them with the optimizers you've provided. Since right now the trainer method takes only kind of a dataframe structure with column labels. I know we could easily transform the data into that format but it would be nice to see an example of how to use directly the RDD.

for instance:

from keras.datasets import mint
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = X_train(-1, 784).astype('float32') / 255.
.
.
.
here make them RDD
construct keras model
train by distributing the model
train by distributing the data

Cheers.

mapPartitions

I found the following error while trying to run your example

Traceback (most recent call last):
  File "/home/dist-keras/examples/single_trainer_example.py", line 78, in <module>
    dataset = labelVectorTransformer.transform(dataset).toDF().select("features_normalized", "label_index", "label")
  File "/home/dist-keras/distkeras/distributed.py", line 54, in transform
    return data.mapPartitions(self._transform)
  File "/home/spark/python/pyspark/sql/dataframe.py", line 844, in __getattr__
    "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
AttributeError: 'DataFrame' object has no attribute 'mapPartitions'


multi GPU on single machine support

Hi, are there any plans for multi-GPUs on a single machine support? I would like to use downpour (asynchronous) SGD in my training, which is not supported in Keras.

AttributeError: 'module' object has no attribute 'clear_session'

When I tried to run workflow.ipynb, I got this error message:
I am using most recent version of keras and theano.

trained_model = trainer.train(training_set)
File "build/bdist.linux-x86_64/egg/distkeras/trainers.py", line 575, in train
File "build/bdist.linux-x86_64/egg/distkeras/trainers.py", line 425, in allocate_parameter_server
File "build/bdist.linux-x86_64/egg/distkeras/parameter_servers.py", line 229, in init
File "build/bdist.linux-x86_64/egg/distkeras/parameter_servers.py", line 90, in init
File "build/bdist.linux-x86_64/egg/distkeras/parameter_servers.py", line 36, in init
File "build/bdist.linux-x86_64/egg/distkeras/utils.py", line 123, in deserialize_keras_model
AttributeError: 'module' object has no attribute 'clear_session'

Maybe you want to at least mention elephas in your readme

Hi @JoeriHermans,

I like a lot what you did with this library. However, I have to say that it's fairly obvious from your commit log that you started from elephas and slowly refactored the project to make it something else, see e.g.

8578a7b

That is in itself really not a problem, given elephas' license, but it'd only be fair to mention that you did not exactly start from scratch here. Given that you used an Elephas github issue to advertise your project I think you should follow the unwritten rule of OSS and mark a fork as a fork, even if it now evolved into an independent project.

It would have really been great to just work together on one project (e.g. by making you collaborator of elephas), instead of duplicating efforts...

Alright, but as I said in the beginning you did a great job here, so keep it up.

Strange "stateful" behavior

I'm writing a simple model for Dist-Keras.
If I change some parameter of the Neural Network (for example the size of some layer), I get errors like:
ValueError: Cannot feed value of shape (16, 128) for Tensor u'Placeholder_1:0', which has shape '(15, 128)'
In fact, I changed a layer size from 16 to 15, but it seems like the system has memory of the old size and prompts this error.

Cannot make dist-keras work on cloudera cluster

Hello,

I've been struggling for days with dist-keras and I just can't seem to make it work on our cloudera cluster.

I've been runnign the ADAG (Convolutional network) on the MNIST example (https://github.com/cerndb/dist-keras/blob/master/examples/mnist.ipynb) on my local VM and it just works !

However, when I try to make this example work on our cluster, using several workers, I constantly get the following error :

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 4 times, most recent failure: Lost task 0.3 in stage 11.0 (TID 222, datalab-prj-cloudera-slave-3, executor 4): ExecutorLostFailure (executor 4 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 5.0 GB of 4.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

At first, I tried to increase the memory but even 10g isn't enough.
I profiled the MNIST on my VM and it takes close to nothing so I don't understand why it's using sooo much memory on the cluster.

Any idea ?

Thank you !

dist-keras not utilizing GPUs on Spark workers when running example notebook

Hi, I'm trying to run the workflow.ipynb example notebook on a GPU-enabled cluster (3 single-GPU AWS p2.xlarge instances). The example runs fine, but when I run nvidia-smi on my machines I don't see any GPU utilization (no memory usage, no running processes).

Including the relevant parts of my pip freeze output below:

dist-keras==0.2.1
Keras==2.1.2
tensorflow==1.4.0
tensorflow-gpu==1.4.0
tensorflow-tensorboard==0.4.0rc3
Theano==1.0.0

My machines are running Ubuntu 16.04 and Spark 2.2.

The discussion in #10 seemed to imply that dist-keras utilizes GPUs; has anybody seen similar behavior or know if there's special config settings I need to specify for GPU utilization?

Let me know if there's any other information I can provide that'd help :)

Import Error

I am facing this Import error.
I have tried installing distkeras from src and using pip directory but this error never goes away. Can I please receive some insights regarding this?

distkeras_import_error

two calls on train_on_batch per iteration

Hi Joeri,

Thank you for making this package available! Your package seems a promising solution for the data loading problem we are facing in our model.

I have a question regarding different implementations of the worker class in workers.py. I suppose there should be only one call to "train_on_batch" for each batch (X, Y) in all worker subclasses, but this is the case only for ADAGWorker. For the worker subclasses implementing all the other algorithms, there are 2 calls to "train_on_batch" and only the second one is recorded in "training_history". Why?

something wrong with my embedding's test

Here is my python runned on pyspark.

from keras.layers.embeddings import Embedding
from keras.layers.recurrent import LSTM, GRU
from pyspark.ml.feature import VectorAssembler
from keras.models import Sequential
from keras.layers.core import *
from distkeras.trainers import *
from pyspark import SQLContext


sqlContext = SQLContext(sc)
reader = sqlContext
raw_dataset = reader.read.format('com.databricks.spark.csv').options(header='false', inferSchema='true').load(
    '/home/hpcc/test/11.csv')
# raw_dataset = raw_dataset.repartition(4)
features = raw_dataset.columns
features.remove('C0')
vector_assembler = VectorAssembler(inputCols=features, outputCol="features")
dataset = vector_assembler.transform(raw_dataset)
dataset = dataset.select("features", "C0")
model1 = Sequential()
model1.add(Embedding(input_dim=52965, output_dim=256))
model1.add(LSTM(128))
model1.add(Dropout(0.5))
model1.add(Dense(1))
model1.add(Activation('sigmoid'))
model1.summary()
optimizer_mlp= 'adagrad'
loss_mlp = 'binary_crossentropy'
dataset.cache()
trainer = DOWNPOUR(keras_model=model1, worker_optimizer=optimizer_mlp, loss=loss_mlp, num_workers=4,
                   batch_size=16, communication_window=5, learning_rate=0.1, num_epoch=1,
                   features_col="features", label_col="C0")
trainer.set_parallelism_factor(1)
trained_model = trainer.train(dataset)

But it print error like this:

16/12/30 20:53:09 ERROR Executor: Exception in task 1.0 in stage 4.0 (TID 7)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "build/bdist.linux-x86_64/egg/distkeras/workers.py", line 193, in train
    X = np.asarray([x[self.features_column] for x in feature_iterator])
  File "/root/anaconda2/lib/python2.7/site-packages/numpy/core/numeric.py", line 482, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 769, in __getitem__
    raise ValueError("Index %d out of bounds." % index)
ValueError: Index 50 out of bounds.

My 11.csv is like this :

1,23,5211,871,1223,11,5,355,9999,1,28032,33057,1259,5575,4,1,9604,23790,83,136,18,4695,3,121,1,151,521,2,130,24677,4,1,612,947,612,1645,4534,59,29008,1,21679,1,416,3474,3,4189,1,142,5,11323,3
........

It's C0 column is label,and C1 to C50 are features.
Hope for your answer.Thanks a lots.

Regarding compatibility between Python, Tensorflow and dist-keras

Hi JoeriHermans,

I am using python 2.7 and have install Keras 2.1.4. Now when I trying to run your MNIST example, it gave me a "ImportError: No module named tensorflow" error. I went ahead and tried to install tensorflow via pip. I am on Windows 10 64 bit. As it turns out on windows, tensorflow only supports Python 3. So I am stuck here because in dist-keras description, it was mentioned that it has compatibly issues with python 3. I want my setup to be perfect so that I shouldn't face any issues going forward. Also, I am leaning towards python 2.7 because of better library support. For example, the other day I was installing Cassandra and the server was not starting on python 3 but work well in python 2.7. Please advice on how can I make keras and dist-keras work with tensorflow on windows 10 64 bit platform with python 2.7.

Thanks & Regards
Anish Sharma

Tensorflow back end not working on GPUs (session lost)

When running the mnist example on gpu with tensorflow it seems to lose the session while trying to train a model. It executes fine the first part of the code that uses a session; so, I'm perplexed as to what could be going on here. It seems as if the tf session is being lost on the new worker. Any help would be greatly appreciated.

Traceback (most recent call last):
File "mnist.py", line 268, in
trained_model = trainer.train(training_set)
File "lib/python2.7/site-packages/distkeras/trainers.py", line 638, in train
self.history = dataset.rdd.mapPartitionsWithIndex(worker.train).collect()
File "spark/python/pyspark/rdd.py", line 808, in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call
File "spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 19.0 failed 4 times, most recent failure: Lost task 1.3 in stage 19.0 (TID 60, 10.30.72.126, executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "spark/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
process()
File "spark/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "lib/python2.7/site-packages/distkeras/workers.py", line 260, in train
self.prepare_model()
File "lib/python2.7/site-packages/distkeras/workers.py", line 97, in prepare_model
self.model = deserialize_keras_model(self.model)
File "lib/python2.7/site-packages/distkeras/utils.py", line 126, in deserialize_keras_model
model.set_weights(weights)
File lib/python2.7/site-packages/keras/models.py", line 702, in set_weights
self.model.set_weights(weights)
File "lib/python2.7/site-packages/keras/engine/topology.py", line 2004, in set_weights
K.batch_set_value(tuples)
File "lib/python2.7/site-packages/keras/backend/tensorflow_backend.py", line 2193, in batch_set_value
get_session().run(assign_ops, feed_dict=feed_dict)
File "lib/python2.7/site-packages/keras/backend/tensorflow_backend.py", line 163, in get_session
_SESSION = tf.Session(config=config)
File "lib/python2.7/site-packages/tensorflow/python/client/session.py", line 1486, in init
super(Session, self).init(target, graph, config=config)
File "lib/python2.7/site-packages/tensorflow/python/client/session.py", line 621, in init
self._session = tf_session.TF_NewDeprecatedSession(opts, status)
File "lib/python2.7/contextlib.py", line 24, in exit
self.gen.next()
File "lib/python2.7/site-packages/tensorflow/python/framework/errors_impl.py", line 466, in raise_exception_on_not_ok_status
pywrap_tensorflow.TF_GetCode(status)

Small typo in the `parameter_servers.py`: 'Sequential' object has no attribute 'get_weight'

Well, I already found the solution, so I share my finding with you:
When running the mnist.ipynb example, namely the optimizer cells, I have found a strange AttributeError : Sequential' object has no attribute 'get_weight. As I found out later, it was a typo, at line 230 of parameter_servers.py file, it should be get_weights. The Keras Sequential model doesn't have get_weight method.

After this fix, all the dist-keras optimizers were working properly.

BTW, I run Keras 2.0.2 with Theano 0.9.0 backend and Spark 2.1 and Python 2.7.12

Here is the full traceback:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-21-d05d8e848182> in <module>()
      2                    batch_size=4, communication_window=5, num_epoch=1,
      3                    features_col="features_normalized_dense", label_col="label_encoded")
----> 4 trained_model = trainer.train(training_set)

/home/alexburlacu/MDSC#April/dist-keras/distkeras/trainers.pyc in train(self, dataframe, shuffle)
    573         """
    574         # Allocate the parameter server.
--> 575         self.parameter_server = self.allocate_parameter_server()
    576         # Start the communication service.
    577         self.start_service()

/home/alexburlacu/MDSC#April/dist-keras/distkeras/trainers.pyc in allocate_parameter_server(self)
    423         this implementation.
    424         """
--> 425         parameter_server = DeltaParameterServer(self.master_model, self.master_port)
    426 
    427         return parameter_server

/home/alexburlacu/MDSC#April/dist-keras/distkeras/parameter_servers.pyc in __init__(self, model, master_port)
    228     def __init__(self, model, master_port):
    229         super(DeltaParameterServer, self).__init__(model, master_port)
--> 230         self.center_variable = np.asarray(self.model.get_weight())
    231 
    232     def handle_commit(self, conn, addr):

AttributeError: 'Sequential' object has no attribute 'get_weight'

minist

How to run a minist example,hope for your answer.

What is the best way to use the value of mnist.load_data() as inputs for efficient training in Dist-keras?

I have my own data numpy arrays with the same format of the return value of mnist.load_data().
(X_train, y_train), (X_test, y_test) = myOwnData.load_data()
The shape of x_train is (number of samples,244,244,3)
The shape of y_train is (number of samples, number of classes)

My data has also same format.
the shape of feature(x) -> (number of samples,244,244,3)
the shape of label(x) -> (number of samples, number of classes)

(I read image file and convert them to ndarray for features, and I also made label ndarray manually)

I tested my code in single machine by using 'model.fit(model, x_train, y_train...)' in original Keras.
I am trying to test in dist-keras
I am struggling to use my code with dist-keras because of the input type.

One solution I thought is
"combine 2dnarrays and make csv" -> read csv file like mnist example u made

The other way I thought is
"use numpy parsing example you made"
BUT I don't know how to combine two spark dataframes after converting 2 dnarrays to spark dataframes
(I also chekced one of closed issues with similar topic, but I cannot apply it in my case since I am not used to using numpy and pyspark.

Also, For efficient training I am wondering which else way is more efficient.

Improved Parameter Server performance

When using more than ~20 workers the temporal efficiency of the optimizers does not improve. This is due to the fact that handling worker requests and updates are handled by Python threads (instead of processes). In order to make it more efficient, we should allocate a separate process (where the central variable is in shared memory) which incorporates the data in a more concurrent manner.

Running out of HDD

Hi, I am running out of HDD space when I use the trainer. The command from spark UI is:
javaToPython at NativeMethodAccessorImpl.java:0 +details
RDD: *Project [features_1hot#55, labels_1hot#56] +- Scan ExistingRDD[label_index#53L,features#54,features_1hot#55,labels_1hot#56]

org.apache.spark.sql.Dataset.javaToPython(Dataset.scala:2794)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:280)
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
py4j.commands.CallCommand.execute(CallCommand.java:79)
py4j.GatewayConnection.run(GatewayConnection.java:214)
java.lang.Thread.run(Thread.java:748)

It runs till all the nodes run out of HDD space. What's going on here? Thanks,

P.S. This happened when I set EPOCH to 5000.

Difficulties with Convolotional Networks

I have convolotional layers in my network and I reshape my data set in Keras like this:
dataX=np.reshape(data, (data.shape[0],30,40,1))

When I wanted to use dist-keras, I tried to use ReshapeTransformer to reshape the data, but not successful.
I know that one way is to convert the spark dataframe to pandas dataframe and do reshaping and again convert it to RDDs. However, I am looking for a better way. Do you have any solution for it?

Thanks,

read spark data frame

Why don't use the dataframe way to read data in your example? Have you tried with these lines?

conf = SparkConf()
conf.set("spark.executor.memory", "4G")
conf.set("spark.driver.memory", "2G")
conf.set("spark.executor.cores", "7")
conf.set("spark.python.worker.memory", "4G")
conf.set("spark.driver.maxResultSize", "0")
conf.set("spark.sql.crossJoin.enabled", "true")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.default.parallelism", "4")
conf.set("spark.sql.crossJoin.enabled", "true")
spark = SparkSession \
    .builder.config(conf=conf) \
    .appName("test-spark").getOrCreate()
df = spark.read.csv("../input/train_numeric.csv", header="true",inferSchema="true",mode="DROPMALFORMED")

Will distributed.py become available?

Hello!

I'm trying to run data_preprocessing.ipynb and analysis.ipynb examples, but distributed.py is not available. Namely, method LabelVectorTransformer is missing. Will this import be back? Is there some temporary solution I could use to run these python notebooks?

Regards,
Vera

how can I solve this issue?

I really love your project and tried to use yours but I am in trouble like this.
(on Ubuntu)I downloaded spark-2.2.0-hadoop2.7 and unzip.
after that, I installed dist-keras.
I faced this error.
"# In Windows, ensure the Java child processes do not linger after Python has exited.
Exception: Java gateway process exited before sending the driver its port number"

at "sc = SparkContext(conf=conf)"
I did googling but I can't find the way. Can you let me know how to solve it?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.