Git Product home page Git Product logo

elephas's Introduction

Hi there, I'm Max ๐Ÿ‘‹

Iโ€™m a Data Science & Engineering practitioner from Hamburg, Germany. Iโ€™m an avid open source contributor, author of machine learning & technology books, speaker and Coursera instructor.

I specialize in Deep Learning and its applications and can build machine learning solutions and data products from first prototype to production. As Ray contributor, DL4J core developer, Keras contributor, Hyperopt maintainer and author of a range of open source libraries, I have a distinct view on ML engineering and the data science ecosystem.

Here's what I'm up to right now

  • ๐Ÿ“– Check out my new book with O'Reilly called "Learning Ray", which helps you get started with distributed Python with Ray. Most content is available on my homepage.
  • ๐Ÿ”ญ Some of the materials from my courses as Data Science professor at IU can be found here. My teaching centers around programming Python, big data technologies and reinforcement learning.
  • ๐Ÿ”ญ If you have interesting freelance opportunities, don't hesitate to get in touch with me.
  • ๐Ÿ’ฌ Ask me about anything Python or Data Science related. I'm happy to discuss!
  • โšก Fun fact: I get asked for directions by total strangers all the time, even in foreign countries (and I have yet to figure out why).

Interesting things I've been doing in the past

And here's how you can get in touch with me

๐Ÿ“ˆ GitHub Stats

Max's meaningless GitHub stats

elephas's People

Contributors

a-fayez92 avatar abishek-murali avatar albe91 avatar anamf avatar andybergon avatar danielenricocahall avatar dependabot[bot] avatar elgalu avatar gauravyeole avatar icyblade avatar ivanmontero avatar jomivega avatar kmader avatar maxpumperla avatar n4nagappan avatar oscardpan avatar sinjax avatar spencerimp avatar tsingjyujing avatar viirya avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

elephas's Issues

hyperparam_optimization.py: Error when checking model target: expected activation_2 to have shape (None, 512) but got array with shape (60000, 10)

Hi,

A couple of weeks ago during late November I could get the example to work fine. However when I run the example hyperparam_optimization.py this week with most recent elephas I get the following

` File "/usr/spark-2.0.2/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/usr/spark-2.0.2/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/spark-2.0.2/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/usr/local/lib/python2.7/dist-packages/elephas/hyperparam.py", line 86, in minimize
trials=trials, full_model_string=self.model_string, rseed=rand_seed)
File "/usr/local/lib/python2.7/dist-packages/hyperas/optim.py", line 90, in base_minimizer
rstate=np.random.RandomState(rseed))
File "/usr/local/lib/python2.7/dist-packages/hyperopt/fmin.py", line 307, in fmin
return_argmin=return_argmin,
File "/usr/local/lib/python2.7/dist-packages/hyperopt/base.py", line 635, in fmin
return_argmin=return_argmin)
File "/usr/local/lib/python2.7/dist-packages/hyperopt/fmin.py", line 320, in fmin
rval.exhaust()
File "/usr/local/lib/python2.7/dist-packages/hyperopt/fmin.py", line 199, in exhaust
self.run(self.max_evals - n_done, block_until_done=self.async)
File "/usr/local/lib/python2.7/dist-packages/hyperopt/fmin.py", line 173, in run
self.serial_evaluate()
File "/usr/local/lib/python2.7/dist-packages/hyperopt/fmin.py", line 92, in serial_evaluate
result = self.domain.evaluate(spec, ctrl)
File "/usr/local/lib/python2.7/dist-packages/hyperopt/base.py", line 840, in evaluate
rval = self.fn(pyll_rval)
File "temp_model.py", line 98, in keras_fmin_fnct
File "/usr/local/lib/python2.7/dist-packages/keras/models.py", line 652, in fit
sample_weight=sample_weight)
File "/usr/local/lib/python2.7/dist-packages/keras/engine/training.py", line 1038, in fit
batch_size=batch_size)
File "/usr/local/lib/python2.7/dist-packages/keras/engine/training.py", line 967, in _standardize_user_data
exception_prefix='model target')
File "/usr/local/lib/python2.7/dist-packages/keras/engine/training.py", line 111, in standardize_input_data
str(array.shape))
Exception: Error when checking model target: expected activation_2 to have shape (None, 512) but got array with shape (60000, 10)

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

16/12/06 02:00:22 WARN scheduler.TaskSetManager: Lost task 3.0 in stage 1.0 (TID 11, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/spark-2.0.2/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/usr/spark-2.0.2/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/spark-2.0.2/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
`

get_hyperopt_model_string needs extra parameters

I built up a newer system using the directions in I have created and I noticed a newer error message about this function needing more parameters.

The ones I did add seemed to alleviate the issue though I'm not comfortable with the defaults. They are similar to the ones I have seen in the code base.

elephas/hyperparam.py
@@ -17,7 +17,7 @@ def init(self, sc, num_workers=4):
self.num_workers = num_workers

 def compute_trials(self, model, data, max_evals):
  •    model_string = get_hyperopt_model_string(model, data)
    
  •    model_string = get_hyperopt_model_string(model, data, notebook_name=None, verbose=True, stack=3)
       bc_model = self.spark_context.broadcast(model_string)
       bc_max_evals = self.spark_context.broadcast(max_evals)
    

what is the *data parallel* meaning here?

There are some data parallel exmaples: here and here

I am wondering what is the data parallel meaning here?

Does it mean that: during each of the epoch, the many batches are paralleled and trained on different workers? After one worker finished its job on one batch, it will update the model weights?

That means, master parallel the batches into different workers right? The total training is still based on the train data set, not partial of the training data set.

Different weights on driver after applying delta with one worker

Hello, let me begin by saying that I admit some inexperience with both Keras and Elephas.

I am experimenting on Standalone mode with one worker and 'asynchronous' mode with updates per epoch. I was tracking the updates in the driver particularly.

Correct me if I am wrong but if there is one worker, isn't the application of the delta that is being sent to the driver going to result in the parameters,

  1. weights_after_training of AsynchronousSparkWorker
  2. self.weights of the MLlibModel , that is the Driver, after running put_deltas_to_server(..)

being the same after each epoch?

This does not seem to be the case and I am wondering whether this is some aspect of Elephas or Keras that is eluding me.

AttributeError: 'builtin_function_or_method' object has no attribute '__code__'

@maxpumperla Hi,
When I am running the example `
from future import absolute_import
from future import print_function

from keras.datasets import mnist
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras.optimizers import RMSprop
from keras.utils import np_utils

from elephas.spark_model import SparkMLlibModel
from elephas.utils.rdd_utils import to_labeled_point, lp_to_simple_rdd
from elephas import optimizers as elephas_optimizers

from pyspark import SparkContext, SparkConf

Define basic parameters

batch_size = 64
nb_classes = 10
nb_epoch = 3

Load data

(x_train, y_train), (x_test, y_test) = mnist.load_data()

x_train = x_train.reshape(60000, 784)
x_test = x_test.reshape(10000, 784)
x_train = x_train.astype("float32")
x_test = x_test.astype("float32")
x_train /= 255
x_test /= 255
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')

Convert class vectors to binary class matrices

y_train = np_utils.to_categorical(y_train, nb_classes)
y_test = np_utils.to_categorical(y_test, nb_classes)

model = Sequential()
model.add(Dense(128, input_dim=784))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(128))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(10))
model.add(Activation('softmax'))

Compile model

rms = RMSprop()

Create Spark context

conf = SparkConf().setAppName('Mnist_Spark_MLP').setMaster('local[8]')
sc = SparkContext(conf=conf)

Build RDD from numpy features and labels

lp_rdd = to_labeled_point(sc, x_train, y_train, categorical=True)
rdd = lp_to_simple_rdd(lp_rdd, True, nb_classes)

Initialize SparkModel from Keras model and Spark context

adadelta = elephas_optimizers.Adadelta()
spark_model = SparkMLlibModel(sc, model, optimizer=adadelta, frequency='batch', mode='asynchronous', num_workers=2, master_optimizer=rms)

Train Spark model

spark_model.train(lp_rdd, nb_epoch=20, batch_size=32, verbose=0,
validation_split=0.1, categorical=True, nb_classes=nb_classes)

Evaluate Spark model by evaluating the underlying model

score = spark_model.master_network.evaluate(x_test, y_test, verbose=2)
print('Test accuracy:', score[1])
`
which is in the elepahs example folder, I constantly faced with the following error:

Process Process-8:
Traceback (most recent call last):
File "/usr/lib/python3.4/multiprocessing/process.py", line 254, in _bootstrap
self.run()
File "/usr/lib/python3.4/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.4/dist-packages/elephas/spark_model.py", line 165, in start_service
threaded=True, use_reloader=False)
File "/usr/local/lib/python3.4/dist-packages/flask/app.py", line 843, in run
run_simple(host, port, self, **options)
File "/usr/local/lib/python3.4/dist-packages/werkzeug/serving.py", line 694, in run_simple
inner()
File "/usr/local/lib/python3.4/dist-packages/werkzeug/serving.py", line 656, in inner
fd=fd)
File "/usr/local/lib/python3.4/dist-packages/werkzeug/serving.py", line 544, in make_server
passthrough_errors, ssl_context, fd=fd)
File "/usr/local/lib/python3.4/dist-packages/werkzeug/serving.py", line 464, in init
HTTPServer.init(self, (host, int(port)), handler)
File "/usr/lib/python3.4/socketserver.py", line 430, in init
self.server_bind()
File "/usr/lib/python3.4/http/server.py", line 133, in server_bind
socketserver.TCPServer.server_bind(self)
File "/usr/lib/python3.4/socketserver.py", line 444, in server_bind
self.socket.bind(self.server_address)
OSError: [Errno 98] Address already in use
Traceback (most recent call last):
File "/usr/local/lib/python3.4/dist-packages/IPython/core/interactiveshell.py", line 2881, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "", line 12, in
validation_split=0.1, categorical=True, nb_classes=nb_classes)
File "/usr/local/lib/python3.4/dist-packages/elephas/spark_model.py", line 343, in train
self._train(rdd, nb_epoch, batch_size, verbose, validation_split)
File "/usr/local/lib/python3.4/dist-packages/elephas/spark_model.py", line 208, in _train
rdd.mapPartitions(worker.train).collect()
File "/opt/spark/python/pyspark/rdd.py", line 776, in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/pyspark/rdd.py", line 2403, in _jrdd
self._jrdd_deserializer, profiler)
File "/opt/spark/python/pyspark/rdd.py", line 2336, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "/opt/spark/python/pyspark/rdd.py", line 2315, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "/opt/spark/python/pyspark/serializers.py", line 428, in dumps
return cloudpickle.dumps(obj, 2)
File "/opt/spark/python/pyspark/cloudpickle.py", line 657, in dumps
cp.dump(obj)
File "/opt/spark/python/pyspark/cloudpickle.py", line 107, in dump
return Pickler.dump(self, obj)
File "/usr/lib/python3.4/pickle.py", line 412, in dump
self.save(obj)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 744, in save_tuple
save(element)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/opt/spark/python/pyspark/cloudpickle.py", line 204, in save_function
self.save_function_tuple(obj)
File "/opt/spark/python/pyspark/cloudpickle.py", line 241, in save_function_tuple
save((code, closure, base_globals))
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 729, in save_tuple
save(element)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 774, in save_list
self._batch_appends(obj)
File "/usr/lib/python3.4/pickle.py", line 801, in _batch_appends
save(tmp[0])
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/opt/spark/python/pyspark/cloudpickle.py", line 398, in save_instancemethod
self.save_reduce(types.MethodType, (obj.func, obj.self), obj=obj)
File "/opt/spark/python/pyspark/cloudpickle.py", line 535, in save_reduce
save(args)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 729, in save_tuple
save(element)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/opt/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/opt/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/opt/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/opt/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 774, in save_list
self._batch_appends(obj)
File "/usr/lib/python3.4/pickle.py", line 798, in _batch_appends
save(x)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/opt/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 774, in save_list
self._batch_appends(obj)
File "/usr/lib/python3.4/pickle.py", line 798, in _batch_appends
save(x)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/opt/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/opt/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 774, in save_list
self._batch_appends(obj)
File "/usr/lib/python3.4/pickle.py", line 798, in _batch_appends
save(x)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 744, in save_tuple
save(element)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/opt/spark/python/pyspark/cloudpickle.py", line 398, in save_instancemethod
self.save_reduce(types.MethodType, (obj.func, obj.self), obj=obj)
File "/opt/spark/python/pyspark/cloudpickle.py", line 535, in save_reduce
save(args)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 729, in save_tuple
save(element)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/opt/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 774, in save_list
self._batch_appends(obj)
File "/usr/lib/python3.4/pickle.py", line 798, in _batch_appends
save(x)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/opt/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/opt/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/opt/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/opt/spark/python/pyspark/cloudpickle.py", line 321, in save_builtin_function
return self.save_function(obj)
File "/opt/spark/python/pyspark/cloudpickle.py", line 196, in save_function
if islambda(obj) or obj.code.co_filename == '' or themodule is None:
AttributeError: 'builtin_function_or_method' object has no attribute 'code'

why?

I searched through the web, but cannot find true explanation, it is only mentioned that it might be due to this issue apache/spark#8707 which is explained a bit more here https://issues.apache.org/jira/browse/SPARK-10542

the main problem as I see is in train and rdd section.... seems as if we had to define lambda function

Empty rdd throws an error while calling worker.train

Hi , thanks for writing elephas , looks very interesting.
I was just playing around with the code.

I observed at some points when the rdd is repartitioned , there might be empty partitions, they throw error in worker.train as keras tries to fit arrays with zero data.

Understanding matrix as features

I have a convolutional network that runs on a 2D numpy matrix, which I'm trying to move onto Spark/Elephas.

The pipeline is numpy array -> pyspark.mllib.linalg.DenseMatrix -> Convolutional Network prediction.

This gives me the error:
TypeError: Cannot convert type <class 'pyspark.mllib.linalg.DenseMatrix'> into Vector

Can a multidimensional numpy array be formatted in a way that that a Keras Convolution1D or Convolution2D layers will understand them?

Massive dataset + data_generator

Hello guys,

Before I begin, I want to thank you for this amazing tool. It's truely awesome and enables true distributed deep learning out of the box. Now for some questions.

  1. From what I've seen I need to load the dataset in memory before I send it to elephas for distributed processing. When the dataset is massive, as in multiple times my ram, how can I use an hdf5 so that each worker can load parts of the dataset from disk and do the processing? Please let me know.
  2. Is there any support for keras "data_generator" functionality? That enables real time data augmentation?

Thank you for your support.

Dockerfile needs update

Apache spark <=2.1.0 does not support Python 3.6, also tensorflow should be updated to 1.1.0 and keras should be updated to keras 2.0.2. However, after I changed the version numbers and tried to run the example files, elephas crash as it does not support the latest spark version yet. I don't mind helping to update Dockerfile once elephas is updated.

Tasks are not being distributed through Spark Cluster

Hi there,

I have been trying to test the elephas using the mnist_mlp_spark.py example using a spark cluster with 2 slaves and apparently the model/parameter training is only being handled by the master node (driver) and slaves are doing nothing in this process.

I have monitored the memory usage of worker nodes and no changes while the model is being run.

I am wondering if the elephas only works with the spark in a Standalone mode or perhaps I am missing something.

Note that I have used the mnist_mlp_spark and I only changed setMaster('local[8]') to setMaster('YARN') and ran the spark-submit command in a cluster mode.

Freezed after submitting code

I'm running elephas on cluster, but it freezed after submitting my code.
16/11/22 13:54:06 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on spark-ubuntu-02:34639 (size: 1555.0 B, free: 511.1 MB)
I'm running spark on two machines in standalone mode.
My code:

from __future__ import absolute_import
from __future__ import print_function
import time
from keras.datasets import mnist
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras.optimizers import SGD
from keras.utils import np_utils

from elephas.spark_model import SparkModel
from elephas.utils.rdd_utils import to_simple_rdd
from elephas import optimizers as elephas_optimizers

from pyspark import SparkContext, SparkConf
start = time.clock()
# Define basic parameters
batch_size = 64
nb_classes = 10
nb_epoch = 1

# Create Spark context
conf = SparkConf().setAppName('Mnist_Spark_MLP').setMaster("spark://222.19.197.70:7077")
sc = SparkContext(conf=conf)
# Load data
(x_train, y_train), (x_test, y_test) = mnist.load_data()

x_train = x_train.reshape(60000, 784)
x_test = x_test.reshape(10000, 784)
x_train = x_train.astype("float32")
x_test = x_test.astype("float32")
x_train /= 255
x_test /= 255
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')

# Convert class vectors to binary class matrices
y_train = np_utils.to_categorical(y_train, nb_classes)
y_test = np_utils.to_categorical(y_test, nb_classes)

model = Sequential()
model.add(Dense(128, input_dim=784))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(128))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(10))
model.add(Activation('softmax'))

sgd = SGD(lr=0.1)

# Build RDD from numpy features and labels
rdd = to_simple_rdd(sc, x_train, y_train)

# Initialize SparkModel from Keras model and Spark context
adagrad = elephas_optimizers.Adagrad()
spark_model = SparkModel(sc,
                         model,
                         optimizer=adagrad,
                         frequency='epoch',
                         mode='asynchronous',
                         num_workers=2,master_optimizer=sgd)

# Train Spark model
spark_model.train(rdd, nb_epoch=nb_epoch, batch_size=batch_size, verbose=1, validation_split=0.1)

# Evaluate Spark model by evaluating the underlying model
score = spark_model.master_network.evaluate(x_test, y_test, verbose=1)
end = time.clock()
print ("read: %f s" % (end - start))
print('Test accuracy:', score[1])

Support for python3 and Spark2

Hi,
Could you please update your excellent package to support python 3 and Spark 2 as well. As I know you may utilize hyperas and hyperopt here; hence, updating all these packages would be great help for all people who extensively use them.

Cannot run synchronous mode

It looks like SparkWorker needs 8 parameters but only given 4

    elif self.mode == 'synchronous':
        init = self.master_network.get_weights()
        parameters = self.spark_context.broadcast(init)
        worker = SparkWorker(yaml, parameters, train_config)

class SparkWorker(object):
    '''
    Synchronous Spark worker. This code will be executed on workers.
    '''
    def __init__(self, yaml, parameters, train_config, master_optimizer, master_loss, master_metrics, custom_objects):
        self.yaml = yaml
        self.parameters = parameters
        self.train_config = train_config
        self.master_optimizer = master_optimizer
        self.master_loss = master_loss
        self.master_metrics = master_metrics
        self.custom_objects = custom_objects

ImportError: No module named theano_utils for elephas_optimizer import

First, thanks for the bringing Keras to Spark.

Seems very interesting.

I'm trying to implement the example from the README.md but I get the following error:

Traceback (most recent call last):
File "..pathToFile/File.py", line 12, in
from elephas import optimizers as elephas_optimizers
File "/Users/XXX/anaconda/lib/python2.7/site-packages/elephas/optimizers.py", line 11, in
from keras.utils.theano_utils import shared_zeros, shared_scalar, floatX
ImportError: No module named theano_utils

I also tried changing keras backend to TensorFlow. However Elephas is still looking (and cannot find) "theano_utils".

Any ideas on why this doesn't work?

Thanks

installation problem with anaconda python

Hello,
I got installation issues of elephas on anaconda python:

Collecting hyperopt (from hyperas->elephas)
Downloading hyperopt-0.0.2.tar.gz (85kB)
100% |โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 92kB 3.1MB/s
Complete output from command python setup.py egg_info:
DEBUG:root:distribute_setup.py not found, defaulting to system setuptools
Traceback (most recent call last):
File "", line 1, in
File "/private/tmp/pip-build-f5ry_pgi/hyperopt/setup.py", line 119, in
if package_data is None: package_data = find_package_data(packages)
File "/private/tmp/pip-build-f5ry_pgi/hyperopt/setup.py", line 102, in find_package_data
for subdir in find_subdirectories(package):
File "/private/tmp/pip-build-f5ry_pgi/hyperopt/setup.py", line 73, in find_subdirectories
subdirectories = os.walk(package_to_path(package)).next()[1]
AttributeError: 'generator' object has no attribute 'next'

----------------------------------------

Command "python setup.py egg_info" failed with error code 1 in /private/tmp/pip-build-f5ry_pgi/hyperopt/

Thank you in advance
Moshiour

Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

Hi, I am getting error while trying to run model using pyspark

from elephas.spark_model import SparkModel
import elephas.spark_model
import keras
from elephas import optimizers as elephas_optimizers

adagrad = elephas_optimizers.Adagrad()

spark_model = SparkModel(sc,model, optimizer=adagrad, frequency='epoch', mode='asynchronous')
spark_model.train(rdd, nb_epoch=20, batch_size=32, verbose=0, validation_split=0.1)

Exception Traceback (most recent call last)
in ()
7
8 spark_model = SparkModel(sc,model, optimizer=adagrad, frequency='epoch', mode='asynchronous')
----> 9 spark_model.train(rdd, nb_epoch=20, batch_size=32, verbose=0, validation_split=0.1)

.\Anaconda3\envs\tensorflow\lib\site-packages\elephas\spark_model.py in train(self, rdd, nb_epoch, batch_size, verbose, validation_split)
186
187 if self.mode in ['asynchronous', 'synchronous', 'hogwild']:
--> 188 self._train(rdd, nb_epoch, batch_size, verbose, validation_split, master_url)
189 else:
190 print("""Choose from one of the modes: asynchronous, synchronous or hogwild""")

.\Anaconda3\envs\tensorflow\lib\site-packages\elephas\spark_model.py in _train(self, rdd, nb_epoch, batch_size, verbose, validation_split, master_url)
197 self.master_network.compile(optimizer=self.master_optimizer, loss=self.master_loss, metrics=self.master_metrics)
198 if self.mode in ['asynchronous', 'hogwild']:
--> 199 self.start_server()
200 yaml = self.master_network.to_yaml()
201 train_config = self.get_train_config(nb_epoch, batch_size,

.\Anaconda3\envs\tensorflow\lib\site-packages\elephas\spark_model.py in start_server(self)
122 ''' Start parameter server'''
123 self.server = Process(target=self.start_service)
--> 124 self.server.start()
125
126 def stop_server(self):

.\Anaconda3\envs\tensorflow\lib\multiprocessing\process.py in start(self)
103 'daemonic processes are not allowed to have children'
104 _cleanup()
--> 105 self._popen = self._Popen(self)
106 self._sentinel = self._popen.sentinel
107 _children.add(self)

.\Anaconda3\envs\tensorflow\lib\multiprocessing\context.py in _Popen(process_obj)
210 @staticmethod
211 def _Popen(process_obj):
--> 212 return _default_context.get_context().Process._Popen(process_obj)
213
214 class DefaultContext(BaseContext):

.\Anaconda3\envs\tensorflow\lib\multiprocessing\context.py in _Popen(process_obj)
311 def _Popen(process_obj):
312 from .popen_spawn_win32 import Popen
--> 313 return Popen(process_obj)
314
315 class SpawnContext(BaseContext):

.\Anaconda3\envs\tensorflow\lib\multiprocessing\popen_spawn_win32.py in init(self, process_obj)
64 try:
65 reduction.dump(prep_data, to_child)
---> 66 reduction.dump(process_obj, to_child)
67 finally:
68 context.set_spawning_popen(None)

.\Anaconda3\envs\tensorflow\lib\multiprocessing\reduction.py in dump(obj, file, protocol)
57 def dump(obj, file, protocol=None):
58 '''Replacement for pickle.dump() using ForkingPickler.'''
---> 59 ForkingPickler(file, protocol).dump(obj)
60
61 #

.\spark\spark-2.1.0-bin-hadoop2.7\python\pyspark\context.py in getnewargs(self)
277 # This method is called when attempting to pickle SparkContext, which is always an error:
278 raise Exception(
--> 279 "It appears that you are attempting to reference SparkContext from a broadcast "
280 "variable, action, or transformation. SparkContext can only be used on the driver, "
281 "not in code that it run on workers. For more information, see SPARK-5063."

Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

Could anyone please help me with this.

index 0 is out of bounds for axis 0 with size 0

It seems that while executing train method AsynchronousSparkWorker, "X_train" numpy array has zero length, due to which some of the tasks are failing with the error mentioned in the subject line. I have fixed the same on my local system by incorporating the below mentioned lines inside the train method

if len(X_train)==0:
return

The spark task is stuck even training job finished

Hi @maxpumperla ,
I encounter unexplainable problem, my spark task is stuck when fit() or train_on_batch() finished.
First, I think maybe the lock results in this problem in "asynchronous" mode but even I try "hogwhild" mode and my spark task is still stuck.
What do you think there have the other reason to cause this problem?

Java heap error in while using a relatively small dataset

Hi,

I am training a CNN model using elephas to distribute the parameter training and I am getting "out of memory - Spark java.lang.OutOfMemoryError: Java heap space" error.

This model uses a dataset of 2.6 GB and I am using a spark cluster with 2 workers. all the nodes have 15 GB of RAM and 8 cores of CPU. I use the following command to run the program.

spark-submit --num-executors 8 --executor-cores 5 --executor-memory 4.5G ./myprogram.py

And before getting to the updating the parameter transactions I get the OutOfMemoryError. Can you suggest how much memory should I allocate?

Note that I have tried this with different values in num_worker and got the same error.

spark_model.py determine_master always return 127.0.0.1

hi there

i runing spark on virtual machine

when spark_model.py: determine_master always return 127.0.0.1

so i use like this

def get_ip_address(ifname):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
struct.pack('256s', ifname[:15])
)[20:24])

def determine_master():
'''
Get URL of parameter server, running on master
'''
#master_url = socket.gethostbyname(socket.gethostname()) + ':5000'
master_url = get_ip_address('eth0')+':5000'
#print "master_url:" master_url
return master_url

Elephas and RNN

I have a few concerns about using elephas to implement RNN.
The first is whether the Graph model can be used?
The second is how to deal with variable sequence length in training set? Does it currently support this?
The third is how to save and load model weight? Is it the same with the one in Keras?
Thanks! Looking forward to your reply.

Serialization fails with error "TypeError: can't pickle ellipsis objects"

I tried to run Elephas, but it fails every time with the follow stack trace:

TypeError: can't pickle ellipsis objects
Traceback (most recent call last):
  File "/home/willem/projects/elephas/examples/./mnist_mlp_spark.py", line 66, in <module>
    spark_model.train(rdd, nb_epoch=nb_epoch, batch_size=batch_size, verbose=2, validation_split=0.1)
  File "/home/willem/.virtualenvs/elephas/local/lib/python2.7/site-packages/elephas/spark_model.py", line 197, in train
    self._train(rdd, nb_epoch, batch_size, verbose, validation_split, master_url)
  File "/home/willem/.virtualenvs/elephas/local/lib/python2.7/site-packages/elephas/spark_model.py", line 217, in _train
    rdd.mapPartitions(worker.train).collect()
  File "/home/willem/spark-2.2.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 809, in collect
  File "/home/willem/spark-2.2.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2455, in _jrdd
  File "/home/willem/spark-2.2.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2388, in _wrap_function
  File "/home/willem/spark-2.2.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
  File "/home/willem/spark-2.2.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 460, in dumps
  File "/home/willem/spark-2.2.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 704, in dumps
  File "/home/willem/spark-2.2.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 162, in dump
pickle.PicklingError: Could not serialize object: TypeError: can't pickle ellipsis objects

I think it has to do with how the Worker types and SparkModel are implemented.
Can you shed some light on this?

Incorrectly dealing with batch normalization layers

The elephas calculates the deltas of the weights by simple subtraction. This strategy is efficient for the conv2d layers and dense layers. However, modern neural networks always contain batch normalization layers in which the subtraction strategy fails. Thus, elephas could not be used for distributed training some popular networks such as google inceptions and YOLO.

Can this be solved?

The rdd is not distributed very well by repartition in spark_model.py

@maxpumperla I found my training rdd is not be partitioned very well and I commented "repartition(self.num_workers)" in spark_model.py and use random mechanism to do repartiton at to_simple_rdd or in my pyspark script.
Therefore, my training data is distributed on my exectuors very well.
Could you provide the entry point to do repartition at rdd_utils.py, not in spark_model.py?

Running a single model per node

From what I understand it's currently possible to run the same model on multiple workers where each worker gets a different batch of data. But is it possible to run one model per worker where every model may have different parameters but trained on the same batch of data? This could really help for hyperparameter search.

Elephas slice_X error

Elephas encounter this error with the latest theano. Can you help me fix this.

Traceback (most recent call last):
  File "/home/zeus/workspace/./KerasOnSparkElephas.py", line 28, in <module>
    from elephas.spark_model import SparkModel
  File "/home/zeus/anaconda2/lib/python2.7/site-packages/elephas/spark_model.py", line 18, in <module>
    from keras.models import model_from_yaml, slice_X
ImportError: cannot import name slice_X

Building model on workers costly

Do you find that this step takes a long time? Is the model recompiling the Theano functions here? Would there be ways to speed that up (somehow have the precompiled model persist in memory for each task and just send the weights from the master model in with the map)?

model = model_from_yaml(self.yaml)

Custom layers

It seems that custom layers are not recognized in the train phase of spark model, specifically when loaded from yaml (model_from_yaml). I have tried with a custom activation function and it didn't work, the get_from_module function raised an exception "Invalid activation function: ...".

Parallelized Predictions

Hi

It looks like the current predict method of a SparkModel doesn't run in a parallelized fashion. Is this indeed the case and, if so, are there plans to add that functionality?

Thanks

predict_classes method missing in SparkModel class

Sequential model of Keras provides a method predict_classes that returns the label for a given test feature. The below mentioned lines of code would help to achieve the desired result

def predict_classes(self,data):
return self.master_network.predict_classes(data)

Example with pre-trained model

Apologies if this is hidden somewhere in the examples, but I am trying to simply use a pre-trained model in spark. From what I have read, it looks like I should be able to implement a transform object from a keras model like this:

transformer = ElephasTransformer(labelCol  = 'features',                                                                                                                  
                                 outputCol = 'preds',                                                                                                                     
                                 keras_model_config=model.get_config(),                                                                                                   
                                 weights = model.get_weights()) 

Then I should be able to call: res = transformer._transform(df). I realize this isn't by design, so I'm simply hacking this together... Does this seem plausible / on the right path?

My input data are length 50 feature vectors that I insert into the dataframe further upstream as a list. The error I get might be related to the datatype choice:

17/07/24 23:28:31 ERROR executor.Executor: Exception in task 0.0 in stage 2.0 (TID 2)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/mattr/spark_test2/spark_env/lib/python2.7/site-packages/elephas/ml_model.py", line 106, in <lambda>
    features = np.asarray(rdd.map(lambda x: from_vector(x.features)).collect())
  File "/home/mattr/spark_test2/spark_env/lib/python2.7/site-packages/elephas/mllib/adapter.py", line 23, in from_vector
    return vector.array
AttributeError: 'list' object has no attribute 'array'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Any help would be much appreciated!

Keras 2.02 ImportError: cannot import name 'get_from_module'

I'm trying to use elephas with Keras 2.02 (model is only compatible with the new version). and get the following error: ImportError: cannot import name 'get_from_module'. This is likely due to there being no get_from_module in Keras 2.02. Is there a quick fix that anyone is aware of to get it working with 2.02?

Is there a way not to use the Master as a Worker?

Hi

First off, thanks for the great library! So I've been trying to use Elephas to train some very GPU-memory-intensive networks, and I've been running into lots of memory issues specifically on the master node, as it's being used to manage the process, do training, and predict. Is there a way not to use the master for training?

Thanks

error when importing elephas

Hi, thanks a lot for your great work. I was wondering if you could give me some hints on this error, which happens every time that I import elephas.

ImportError: No module named utils.rdd_utils

Also in the front page readme here on github the example indicated below, I think the line with the arrow contains an argument not used any more which is the num_workers? Am I right?

from elephas.spark_model import SparkModel
from elephas import optimizers as elephas_optimizers

adagrad = elephas_optimizers.Adagrad()
spark_model = SparkModel(sc,model, optimizer=adagrad, frequency='epoch', mode='asynchronous', num_workers=2)
---> spark_model.train(rdd, nb_epoch=20, batch_size=32, verbose=0, validation_split=0.1, num_workers=8)

Another thing that I noticed is that by default the import of elephas uses theano as backend instead of tensorflow even if you have explicitly specified tensorflow in your .keras/keras.json

Does elephas play nice with tensorflow and keras 1.2.2 ?

Cheers!

support spark-2

@maxpumperla Hey Max,
I think there are errors like "... has no attribute ..." which might cause by incompatibility between spark-2 and its previews versions... does hyperas support spark-2?

Running elephas with 2 machines

I'm trying to run elephas on a cluster with 2 physical machines.
Machine 1 runs the spark master, machine 2 runs the spark worker (1 worker, 1 core).
With this configuration I can run the "pi.py" from the spark official examples successfully.
But an error occurred while running the "mnist_mlp_spark.py" demo from elephas.
The code is modified like this:
sc = SparkContext()
and submitted using
spark-submit --master spark://MASTER_IP:7077

16/03/22 16:08:38 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 192.168.1.116, partition 0,NODE_LOCAL, 2212 bytes)
16/03/22 16:08:39 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.116:44060 (size: 4.9 KB, free: 1983.0 MB)
16/03/22 16:08:42 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 192.168.1.116:42611
16/03/22 16:08:42 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 151 bytes
16/03/22 16:08:53 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 192.168.1.116, partition 1,NODE_LOCAL, 2212 bytes)
16/03/22 16:08:53 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, 192.168.1.116): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/home/lingban/github/elephas/elephas/spark_model.py", line 262, in train
File "/home/lingban/elephas/elephas/spark_model.py", line 33, in get_server_weights
return pickle.loads(urllib2.urlopen(request).read())
File "/usr/lib/python2.7/urllib2.py", line 127, in urlopen
return _opener.open(url, data, timeout)
File "/usr/lib/python2.7/urllib2.py", line 404, in open
response = self._open(req, data)
File "/usr/lib/python2.7/urllib2.py", line 422, in _open
'_open', req)
File "/usr/lib/python2.7/urllib2.py", line 382, in _call_chain
result = func(*args)
File "/usr/lib/python2.7/urllib2.py", line 1214, in http_open
return self.do_open(httplib.HTTPConnection, req)
File "/usr/lib/python2.7/urllib2.py", line 1184, in do_open
raise URLError(err)
URLError: <urlopen error [Errno 111] Connection refused>

The mnist_mlp_spark demo can run successfully with the setMaster('local[8]') setting.

UnicodeEncodeError

Hello,

When I try to run elephas I got some errors.

Command:
../spark/bin/spark-submit --driver-memory 2G --executor-memory 30G ./run_distributed.py -v

Output:
172.20.20.111 - - [14/Dec/2015 17:48:18] "GET /parameters HTTP/1.1" 200 -
15/12/14 17:48:56 WARN TaskSetManager: Lost task 7.0 in stage 1.0 (TID 43, 172.20.20.107): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/torque/mucahit/spark-1.5.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/home/torque/mucahit/spark-1.5.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home/torque/mucahit/spark-1.5.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/home/torque/mucahit/keras_venv/local/lib/python2.7/site-packages/elephas/spark_model.py", line 222, in train
put_deltas_to_server(deltas, self.master_url)
File "/home/torque/mucahit/keras_venv/local/lib/python2.7/site-packages/elephas/spark_model.py", line 36, in put_deltas_to_server
return urllib2.urlopen(request).read()
File "/usr/lib/python2.7/urllib2.py", line 127, in urlopen
return _opener.open(url, data, timeout)
File "/usr/lib/python2.7/urllib2.py", line 401, in open
response = self._open(req, data)
File "/usr/lib/python2.7/urllib2.py", line 419, in _open
'_open', req)
File "/usr/lib/python2.7/urllib2.py", line 379, in _call_chain
result = func(_args)
File "/usr/lib/python2.7/urllib2.py", line 1211, in http_open
return self.do_open(httplib.HTTPConnection, req)
File "/usr/lib/python2.7/urllib2.py", line 1178, in do_open
h.request(req.get_method(), req.get_selector(), req.data, headers)
File "/home/torque/mucahit/keras_venv/lib/python2.7/httplib.py", line 963, in request
File "/home/torque/mucahit/keras_venv/lib/python2.7/httplib.py", line 997, in _send_request
File "/home/torque/mucahit/keras_venv/lib/python2.7/httplib.py", line 959, in endheaders
File "/home/torque/mucahit/keras_venv/lib/python2.7/httplib.py", line 819, in _send_output
File "/home/torque/mucahit/keras_venv/lib/python2.7/httplib.py", line 795, in send
File "/usr/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(_args)
UnicodeEncodeError: 'ascii' codec can't encode character u'\ufffd' in position 183: ordinal not in range(128)

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

15/12/14 17:48:56 INFO TaskSetManager: Starting task 7

Elephas in Mesos

@maxpumperla we are also experimenting elephas in mesos. Do you have tried to submit at the Spark Mesos dispatcher, so in deploy-mode cluster using docker containers?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.