@maxpumperla Hi,
When I am running the example `
from future import absolute_import
from future import print_function
from keras.datasets import mnist
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras.optimizers import RMSprop
from keras.utils import np_utils
from elephas.spark_model import SparkMLlibModel
from elephas.utils.rdd_utils import to_labeled_point, lp_to_simple_rdd
from elephas import optimizers as elephas_optimizers
from pyspark import SparkContext, SparkConf
Define basic parameters
batch_size = 64
nb_classes = 10
nb_epoch = 3
Load data
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.reshape(60000, 784)
x_test = x_test.reshape(10000, 784)
x_train = x_train.astype("float32")
x_test = x_test.astype("float32")
x_train /= 255
x_test /= 255
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')
Convert class vectors to binary class matrices
y_train = np_utils.to_categorical(y_train, nb_classes)
y_test = np_utils.to_categorical(y_test, nb_classes)
model = Sequential()
model.add(Dense(128, input_dim=784))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(128))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(10))
model.add(Activation('softmax'))
Compile model
rms = RMSprop()
Create Spark context
conf = SparkConf().setAppName('Mnist_Spark_MLP').setMaster('local[8]')
sc = SparkContext(conf=conf)
Build RDD from numpy features and labels
lp_rdd = to_labeled_point(sc, x_train, y_train, categorical=True)
rdd = lp_to_simple_rdd(lp_rdd, True, nb_classes)
Initialize SparkModel from Keras model and Spark context
adadelta = elephas_optimizers.Adadelta()
spark_model = SparkMLlibModel(sc, model, optimizer=adadelta, frequency='batch', mode='asynchronous', num_workers=2, master_optimizer=rms)
Train Spark model
spark_model.train(lp_rdd, nb_epoch=20, batch_size=32, verbose=0,
validation_split=0.1, categorical=True, nb_classes=nb_classes)
Evaluate Spark model by evaluating the underlying model
score = spark_model.master_network.evaluate(x_test, y_test, verbose=2)
print('Test accuracy:', score[1])
`
which is in the elepahs example folder, I constantly faced with the following error:
Process Process-8:
Traceback (most recent call last):
File "/usr/lib/python3.4/multiprocessing/process.py", line 254, in _bootstrap
self.run()
File "/usr/lib/python3.4/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.4/dist-packages/elephas/spark_model.py", line 165, in start_service
threaded=True, use_reloader=False)
File "/usr/local/lib/python3.4/dist-packages/flask/app.py", line 843, in run
run_simple(host, port, self, **options)
File "/usr/local/lib/python3.4/dist-packages/werkzeug/serving.py", line 694, in run_simple
inner()
File "/usr/local/lib/python3.4/dist-packages/werkzeug/serving.py", line 656, in inner
fd=fd)
File "/usr/local/lib/python3.4/dist-packages/werkzeug/serving.py", line 544, in make_server
passthrough_errors, ssl_context, fd=fd)
File "/usr/local/lib/python3.4/dist-packages/werkzeug/serving.py", line 464, in init
HTTPServer.init(self, (host, int(port)), handler)
File "/usr/lib/python3.4/socketserver.py", line 430, in init
self.server_bind()
File "/usr/lib/python3.4/http/server.py", line 133, in server_bind
socketserver.TCPServer.server_bind(self)
File "/usr/lib/python3.4/socketserver.py", line 444, in server_bind
self.socket.bind(self.server_address)
OSError: [Errno 98] Address already in use
Traceback (most recent call last):
File "/usr/local/lib/python3.4/dist-packages/IPython/core/interactiveshell.py", line 2881, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "", line 12, in
validation_split=0.1, categorical=True, nb_classes=nb_classes)
File "/usr/local/lib/python3.4/dist-packages/elephas/spark_model.py", line 343, in train
self._train(rdd, nb_epoch, batch_size, verbose, validation_split)
File "/usr/local/lib/python3.4/dist-packages/elephas/spark_model.py", line 208, in _train
rdd.mapPartitions(worker.train).collect()
File "/opt/spark/python/pyspark/rdd.py", line 776, in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/pyspark/rdd.py", line 2403, in _jrdd
self._jrdd_deserializer, profiler)
File "/opt/spark/python/pyspark/rdd.py", line 2336, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "/opt/spark/python/pyspark/rdd.py", line 2315, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "/opt/spark/python/pyspark/serializers.py", line 428, in dumps
return cloudpickle.dumps(obj, 2)
File "/opt/spark/python/pyspark/cloudpickle.py", line 657, in dumps
cp.dump(obj)
File "/opt/spark/python/pyspark/cloudpickle.py", line 107, in dump
return Pickler.dump(self, obj)
File "/usr/lib/python3.4/pickle.py", line 412, in dump
self.save(obj)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 744, in save_tuple
save(element)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/opt/spark/python/pyspark/cloudpickle.py", line 204, in save_function
self.save_function_tuple(obj)
File "/opt/spark/python/pyspark/cloudpickle.py", line 241, in save_function_tuple
save((code, closure, base_globals))
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 729, in save_tuple
save(element)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 774, in save_list
self._batch_appends(obj)
File "/usr/lib/python3.4/pickle.py", line 801, in _batch_appends
save(tmp[0])
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/opt/spark/python/pyspark/cloudpickle.py", line 398, in save_instancemethod
self.save_reduce(types.MethodType, (obj.func, obj.self), obj=obj)
File "/opt/spark/python/pyspark/cloudpickle.py", line 535, in save_reduce
save(args)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 729, in save_tuple
save(element)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/opt/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/opt/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/opt/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/opt/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 774, in save_list
self._batch_appends(obj)
File "/usr/lib/python3.4/pickle.py", line 798, in _batch_appends
save(x)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/opt/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 774, in save_list
self._batch_appends(obj)
File "/usr/lib/python3.4/pickle.py", line 798, in _batch_appends
save(x)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/opt/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/opt/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 774, in save_list
self._batch_appends(obj)
File "/usr/lib/python3.4/pickle.py", line 798, in _batch_appends
save(x)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 744, in save_tuple
save(element)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/opt/spark/python/pyspark/cloudpickle.py", line 398, in save_instancemethod
self.save_reduce(types.MethodType, (obj.func, obj.self), obj=obj)
File "/opt/spark/python/pyspark/cloudpickle.py", line 535, in save_reduce
save(args)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 729, in save_tuple
save(element)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/opt/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 774, in save_list
self._batch_appends(obj)
File "/usr/lib/python3.4/pickle.py", line 798, in _batch_appends
save(x)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/opt/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/opt/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/opt/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/opt/spark/python/pyspark/cloudpickle.py", line 321, in save_builtin_function
return self.save_function(obj)
File "/opt/spark/python/pyspark/cloudpickle.py", line 196, in save_function
if islambda(obj) or obj.code.co_filename == '' or themodule is None:
AttributeError: 'builtin_function_or_method' object has no attribute 'code'
why?
I searched through the web, but cannot find true explanation, it is only mentioned that it might be due to this issue apache/spark#8707 which is explained a bit more here https://issues.apache.org/jira/browse/SPARK-10542
the main problem as I see is in train and rdd section.... seems as if we had to define lambda function