Git Product home page Git Product logo

thunder's Introduction

thunder

Latest Version Build Status Gitter Binder

scalable analysis of image and time series analysis in python

Thunder is an ecosystem of tools for the analysis of image and time series data in Python. It provides data structures and algorithms for loading, processing, and analyzing these data, and can be useful in a variety of domains, including neuroscience, medical imaging, video processing, and geospatial and climate analysis. It can be used locally, but also supports large-scale analysis through the distributed computing engine spark. All data structures and analyses in Thunder are designed to run identically and with the same API whether local or distributed.

Thunder is designed around modularity and composability — the core thunder package, in this repository, only defines common data structures and read/write patterns, and most functionality is broken out into several related packages. Each one is independently versioned, with its own GitHub repository for organizing issues and contributions.

This readme provides an overview of the core thunder package, its data types, and methods for loading and saving. Tutorials, detailed API documentation, and info about all associated packages can be found at the documentation site.

install

The core thunder package defines data structures and read/write patterns for images and series data. It is built on numpy, scipy, scikit-learn, and scikit-image, and is compatible with Python 2.7+ and 3.4+. You can install it using:

pip install thunder-python

related packages

Lots of functionality in Thunder, especially for specific types of analyses, is broken out into the following separate packages.

You can install the ones you want with pip, for example

pip install thunder-regression
pip install thunder-registration

example

Here's a short snippet showing how to load an image sequence (in this case random data), median filter it, transform it to a series, detrend and compute a fourier transform on each pixel, then convert it to an array.

import thunder as td

data = td.images.fromrandom()
ts = data.median_filter(3).toseries()
frequencies = ts.detrend().fourier(freq=3).toarray()

usage

Most workflows in Thunder begin by loading data, which can come from a variety of sources and locations, and can be either local or distributed (see below).

The two primary data types are images and series. images are used for collections or sequences of images, and are especially useful when working with movie data. series are used for collections of one-dimensional arrays, often representing time series.

Once loaded, each data type can be manipulated through a variety of statistical operators, including simple statistical aggregiations like mean min and max or more complex operations like gaussian_filter detrend and subsample. Both images and series objects are wrappers for ndarrays: either a local numpy ndarray or a distributed ndarray using bolt and spark. Calling toarray() on an images or series object at any time returns a local numpy ndarray, which is an easy way to move between Thunder and other Python data analysis tools, like pandas and scikit-learn.

For a full list of methods on image and series data, see the documentation site.

loading data

Both images and series can be loaded from a variety of data types and locations. For all loading methods, the optional argument engine allows you to specify whether data should be loaded in 'local' mode, which is backed by a numpy array, or in 'spark' mode, which is backed by an RDD.

All loading methods are available on the module for the corresponding data type, for example

import thunder as td

data = td.images.fromtif('/path/to/tifs')
data = td.series.fromarray(somearray)
data_distributed = ts.series.fromarray(somearray, engine=sc)

The argument engine can be either None for local use or a SparkContext for distributed use with Spark. And in either case, methods that load from files e.g. fromtif or frombinary can load from either a local filesystem or Amazon S3, with the optional argument credentials for S3 credentials. See the documentation site for a full list of data loading methods.

using with spark

Thunder doesn't require Spark and can run locally without it, but Spark and Thunder work great together! To install and configure a Spark cluster, consult the official Spark documentation. Thunder supports Spark version 1.5+ (currently tested against 2.0.0), and uses the Python API PySpark. If you have Spark installed, you can install Thunder just by calling pip install thunder-python on both the master node and all worker nodes of your cluster. Alternatively, you can clone this GitHub repository, and make sure it is on the PYTHONPATH of both the master and worker nodes.

Once you have a running cluster with a valid SparkContext — this is created automatically as the variable sc if you call the pyspark executable — you can pass it as the engine to any of Thunder's loading methods, and this will load your data in distributed 'spark' mode. In this mode, all operations will be parallelized, and chained operations will be lazily executed.

contributing

Thunder is a community effort! The codebase so far is due to the excellent work of the following individuals:

Andrew Osheroff, Ben Poole, Chris Stock, Davis Bennett, Jascha Swisher, Jason Wittenbach, Jeremy Freeman, Josh Rosen, Kunal Lillaney, Logan Grosenick, Matt Conlen, Michael Broxton, Noah Young, Ognen Duzlevski, Richard Hofer, Owen Kahn, Ted Fujimoto, Tom Sainsbury, Uri Laseron, W J Liddy

If you run into a problem, have a feature request, or want to contribute, submit an issue or a pull request, or come talk to us in the chatroom!

thunder's People

Contributors

andrewgiessel avatar andrewlew1s avatar andrewosh avatar arokem avatar bald6354 avatar boazmohar avatar broxtronix avatar d-v-b avatar freeman-lab avatar gitter-badger avatar industrial-sloth avatar joshrosen avatar jwittenbach avatar kunallillaney avatar laserson avatar lgrosenick avatar mathisonian avatar nerduno avatar ognend avatar okahn avatar poolio avatar rhofour avatar tcfuji avatar tomsains avatar vjlbym avatar waffle-iron avatar wjliddy avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

thunder's Issues

Python source shouldn't use mixed tabs and spaces for indentation

Thunder's Python sources use tabs for indentation and some of the files use both tabs and spaces. This is non-standard and clashes with the default Python settings in many editors. For example, IntelliJ displays lots of PEP8 warnings:

image

This makes it difficult to contribute changes without accidentally introducing spacing issues. I suggest that you switch to indenting with 4 spaces; there are some scripts that can automatically convert the existing sources and you can configure your editor to automatically perform the proper formatting for new code.

Can not load 'zebrafish-optomotor-response' data on EC2

Hi
I tried to run the following sample on Amazon cloud service EC2 using IPython notebook: http://nbviewer.ipython.org/url/research.janelia.org/zebrafish/notebooks/optomotor-response-PCA.ipynb

from thunder.utils import save, pack, subset
from thunder.regression import RegressionModel
from thunder.factorization import PCA
from thunder.viz import Colorize
import seaborn as sns
# load the data and the model parameters (in this case, a design matrix to perform trial-averaging)
data, params = tsc.loadExampleEC2('zebrafish-optomotor-response')

But I got the following error messages when it read the data 'zebrafish-optomotor-response'.

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-18-8420625e6184> in <
module>()
      6 # load the data and the model parameters (in this case, a design matrix to perform trial-averaging)
      7 #data, designmatrix = tsc.loadExampleEC2('zebrafish-optomotor-response')
----> 8 data, params = tsc.loadExampleEC2('zebrafish-optomotor-response')

/root/thunder/python/thunder/utils/context.pyc in loadExampleEC2(self, dataset)
    200             data = self.loadText("s3n://" + path + 'data/dat_plane*.txt', filter='dff', minPartitions=1000)
    201             paramfile = self._sc.textFile("s3n://" + path + "params.json")
--> 202             params = json.loads(paramfile.first())
    203             modelfile = asarray(params['trials'])
    204             return data, modelfile

/root/spark/python/pyspark/rdd.pyc in first(self)
    963         2
    964         """
--> 965         return self.take(1)[0]
    966 
    967     def saveAsPickleFile(self, path, batchSize=10):

/root/spark/python/pyspark/rdd.pyc in take(self, num)
    923         """
    924         items = []
--> 925         totalParts = self._jrdd.partitions().size()
    926         partsScanned = 0
    927 

/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    535         answer = self.gateway_client.send_command(command)
    536         return_value = get_return_value(answer, self.gateway_client,
--> 537                 self.target_id, self.name)
    538 
    539         for temp_arg in temp_args:

/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o62.partitions.
: org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: S3 HEAD request failed for '/optomotor-response%2F1%2Fparams.json' - ResponseCode=403, ResponseMessage=Forbidden
    at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:122)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
    at org.apache.hadoop.fs.s3native.$Proxy8.retrieveMetadata(Unknown Source)
    at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:326)
    at org.apache.hadoop.fs.FileSystem.getFileStatus(FileSystem.java:1337)
    at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1045)
    at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:987)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:177)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:176)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:201)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:201)
    at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:201)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:201)
    at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:50)
    at org.apache.spark.api.java.JavaRDD.partitions(JavaRDD.scala:32)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.jets3t.service.S3ServiceException: S3 HEAD request failed for '/optomotor-response%2F1%2Fparams.json' - ResponseCode=403, ResponseMessage=Forbidden
    at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:477)
    at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718)
    at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599)
    at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535)
    at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987)
    at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332)
    at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111)
    ... 36 more
Caused by: org.jets3t.service.impl.rest.HttpException
    at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:475)
    ... 42 more

Regards,
Yizhi Wang

Deserialization error in NMF

example_data = tsc.loadExample('fish-series')
nmf = thunder.NMF(k=5, method='als')
nmf.fit(example_data)

Gives me the following error:

<ipython-input-18-4c8d4d054a3d> in <module>()
      1 example_data = tsc.loadExample('fish-series')
      2 nmf = thunder.NMF(k=5, method='als')
----> 3 nmf.fit(example_data)

/mnt/spark/spark-1841c1f0-1813-49ed-ab38-36a1bb4eabd0/spark-eb4f748e-97a3-423c-81fd-326f1c953cc6/thunder_python-0.5.0_dev-py2.6.egg/thunder/factorization/nmf.py in fit(self, mat)
    160 
    161                 # update H using least squares row-wise with inv(W' * W) * W * R (same as pinv(W) * R)
--> 162                 h = pinvW.values().zip(mat.values()).map(lambda (x, y): outer(x, y)).reduce(add)
    163 
    164                 # clip negative values of H

/root/spark/python/pyspark/rdd.pyc in reduce(self, f)
    723             yield reduce(f, iterator, initial)
    724 
--> 725         vals = self.mapPartitions(func).collect()
    726         if vals:
    727             return reduce(f, vals)

/root/spark/python/pyspark/rdd.pyc in collect(self)
    684         """
    685         with SCCallSiteSync(self.context) as css:
--> 686             bytesInJava = self._jrdd.collect().iterator()
    687         return list(self._collect_iterator_through_file(bytesInJava))
    688 

/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o355.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 24.0 failed 4 times, most recent failure: Lost task 0.3 in stage 24.0 (TID 39, ip-10-168-156-186.ec2.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/root/spark/python/pyspark/worker.py", line 107, in main
    process()
  File "/root/spark/python/pyspark/worker.py", line 98, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/root/spark/python/pyspark/serializers.py", line 231, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/root/spark/python/pyspark/rdd.py", line 723, in func
    yield reduce(f, iterator, initial)
  File "/root/spark/python/pyspark/serializers.py", line 301, in load_stream
    " in pair: (%d, %d)" % (len(keys), len(vals)))
ValueError: Can not deserialize RDD with different number of items in pair: (128, 64)

I haven't run the NMF unit test, but I think it might be missing this because it only uses 1D keys.

Using 2af3738 and spark 1.2.1

Calling sortBy doesn't work on a PipelinedRDD, causes error in Images.toSeries()

Calling tseries = Images(my_volume_stack_rdd).toSeries() leads to the following error (only bottom of stack shown).

/usr/lib/python2.7/site-packages/thunder/rdds/imageblocks.pyc in _groupIntoSeriesBlocks(self)
     87         # sort must come after group, b/c group will mess with ordering.
     88         # return self.rdd.groupByKey().sortByKey(ascending=True).mapValues(lambda v: ImageBlockValue.fromPlanarBlocks(v, 0))
---> 89         sortedRdd = self.rdd.groupByKey().sortBy(lambda (k, _): k[::-1])
     90         return sortedRdd.mapValues(lambda v: ImageBlockValue.fromPlanarBlocks(v, 0))
     91 

AttributeError: 'PipelinedRDD' object has no attribute 'sortBy'

PipelinedRDD subclasses RDD and should have a sortBy method, but evidently this method goes missing from whatever rdd.groupByKey returns. May be in a bug in Spark, but I haven't seen anything from anyone else with this issue.

AWS credentials lost when starting a cluster from stopped state

Initial login after thunder-ec2 launch is fine, but after stopping and starting, I get this error whenever accessing a resource on S3:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-2-c4e9f34a2087> in <module>()
----> 1 tseries = tsc.loadSeries(series_path)

/mnt/spark/spark-5fd29201-6834-4f40-9a8d-3ad01253881b/spark-c1b91f8e-af1c-4978-8043-3b3c7b160518/thunder_python-0.5.0_dev-py2.6.egg/thunder/utils/context.pyc in loadSeries(self, dataPath, nkeys, nvalues, inputFormat, minPartitions, confFilename, keyType, valueType)
     92             # must be either 'text' or 'binary'
     93             data = loader.fromBinary(dataPath, confFilename=confFilename, nkeys=nkeys, nvalues=nvalues,
---> 94                                      keyType=keyType, valueType=valueType)
     95         return data
     96 

/mnt/spark/spark-5fd29201-6834-4f40-9a8d-3ad01253881b/spark-c1b91f8e-af1c-4978-8043-3b3c7b160518/thunder_python-0.5.0_dev-py2.6.egg/thunder/rdds/fileio/seriesloader.pyc in fromBinary(self, dataPath, ext, confFilename, nkeys, nvalues, keyType, valueType, newDtype, casting)
    210                                          'org.apache.hadoop.io.LongWritable',
    211                                          'org.apache.hadoop.io.BytesWritable',
--> 212                                          conf={'recordLength': str(recordSize)})
    213 
    214         data = lines.map(lambda (_, v):

/root/spark/python/pyspark/context.pyc in newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter, valueConverter, conf, batchSize)
    510         jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass,
    511                                                     valueClass, keyConverter, valueConverter,
--> 512                                                     jconf, batchSize)
    513         return RDD(jrdd, self)
    514 

/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile.
: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).

This is fixed by manually setting AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY in the current terminal session. The expected behavior is for the AWS access information to persist on the remote, or to be resent with thunder-ec2 start

failure loading thunder on python 2.6

Launching using the current master branch of thunder fails on python 2.6 with the following error:

Traceback (most recent call last):
  File "./bin/thunder", line 3, in <module>
    import thunder
  File "/root/thunder/python/thunder/__init__.py", line 13, in <module>
    from thunder.imgprocessing.registration import Registration, RegistrationModel
  File "/root/thunder/python/thunder/imgprocessing/registration.py", line 233
    transformations = {int(k): transClass(**v) for k, v in input['transformations'].iteritems()}
                                                 ^
SyntaxError: invalid syntax

Dictionary comprehensions were only introduced in python 2.7.

This affects the thunder-ec2 script, which launches the basic spark AMIs (which have python 2.6 installed) and pulls in the current head of master.

thunder-ec2 EC2 Response Error

I get the following error when running the basic thunder-ec2 example. I am able to run spark-ec2, however.

After googling the error, it appears that some people had the same issue w/ spark a few months ago, which was fixed by manually deleting old security groups in AWS. This did not fix the issue for me.

If there is a mistake on my part, please let me know.

Kevin$ thunder-ec2 -k kevin -i ~/kevin.pem -s 2 launch foo
Creating security group foo-master
Setting up security groups...
Creating security group foo-slaves
Traceback (most recent call last):
  File "/Users/Kevin/code/thunder/python/thunder/utils/ec2.py", line 167, in <module>
    (master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name)
  File "/Users/Kevin/downloads/spark-1.0.2-bin-hadoop1/ec2/spark_ec2.py", line 250, in launch_cluster
    master_group.authorize(src_group=master_group)
  File "/Users/Kevin/downloads/spark-1.0.2-bin-hadoop1/ec2/third_party/boto-2.4.1.zip/boto-2.4.1/boto/ec2/securitygroup.py", line 184, in authorize
  File "/Users/Kevin/downloads/spark-1.0.2-bin-hadoop1/ec2/third_party/boto-2.4.1.zip/boto-2.4.1/boto/ec2/connection.py", line 2181, in authorize_security_group
  File "/Users/Kevin/downloads/spark-1.0.2-bin-hadoop1/ec2/third_party/boto-2.4.1.zip/boto-2.4.1/boto/connection.py", line 944, in get_status
boto.exception.EC2ResponseError: EC2ResponseError: 400 Bad Request
<?xml version="1.0" encoding="UTF-8"?>
<Response><Errors><Error><Code>InvalidParameterValue</Code><Message>Invalid value 'null' for protocol. VPC security group rules must specify protocols explicitly.</Message></Error></Errors><RequestID>0d231d54-3112-42a3-afe2-612e462d553e</RequestID></Response>

Image subset

Hi, I am trying to test the software right now for use in a Calcium imaging project. Currently I am just going through examples on my desktop to get a feel for what one can do with the software. I am trying to define a subset of the fish-images series I loaded in the tutorial and getting the AttributeError: 'Images' object has no attribute 'subset' Can you tell me what I am doing wrong? Thanks.

varying numbers of images per file lead to incorrect Images keys

We now support (in master) single image files that contain multiple logical images via the nplanes argument. The keys of the resulting Images objects are calculated assuming that each file will contain the same number of logical indexes - lines 254 and 255 in the version of ImagesLoader that I have in my tmp branch:

nvals = len(values)
keys = [idx*nvals + timepoint for timepoint in xrange(nvals)]

This implicitly assumes that nvals will be the same for all image files. Idx is the zero-based count of the current image file, which allows idx*nvals to accurately determine the starting offset of the current file within the overall set of logical images, so long as nvals is fixed and identical for every file.

When this assumption is violated, as for instance if a single file contains all images with a variable-length trial, then the keys for the resulting Images object get all screwed up. This leads to problems downstream when converting to Series, because the Images keys are assumed to accurately reflect position of the image record within the overall RDD.

Fixing this I think will require another small set of jobs to calculate the total number of images and the number of images per file. This will only have to be run when nplanes is passed, because otherwise we know that a single file has a single image.

thunder and Numba?

Hi,

maybe you guys saw this already, Numba recently introduced some (experimental) support for Spark - basically enabling pickling of Numba JIT functions: https://groups.google.com/a/continuum.io/forum/#!topic/numba-users/TM83TUqIdCs

I think that might open a lot of interesting possibilities. As an example, here is a NMF implementation in Numba written by Mathieu Blondel: https://gist.github.com/mblondel/09648344984565f9477a

Not sure though, how easily something like that could be incorporated into thunder?

Best,
Matthias

Error in Series.subToInd doc-string

The subToInd method on the Series object doc-string (and thus the API doc) list dims as parameter, however no such parameter exists. Likely the result of copy-paste from indToSub method where there is such a parameter.

Error using Series.subset() with NumPy 1.9

Thunder's Series.subset() method relies on PySpark's rdd.takeSample(). Due to a recent patch to NumPy (numpy/numpy@6b1a120), takeSample is broken on NumPy 1.9 installations because it generates random seeds that frequently exceed the maximum bound of 2 ** 32.

As a result, the following example code:

data = tsc.makeExample('pca')
data.subset(10)

Will almost always produce:

ValueError: Seed must be between 0 and 4294967295

The underlying issue needs to be fixed in PySpark, but for now we can avoid the problem by explicitly specifying a seed in the correct range.

PCA fails when calling center() on a Series

Calling model = PCA(k=2).fit(my_tseries) gets me the error below in Thunder 5.0.0-dev.

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-29-7677bfcc2299> in <module>()
----> 1 mode = PCA(k=2).fit(tseries)

/mnt/spark/spark-d25b3fe2-8240-4440-960a-0dc18dff7381/spark-c3bdb8fb-135d-4d5a-93e4-3f2059082b88/thunder_python-0.5.0_dev-py2.6.egg/thunder/factorization/pca.pyc in fit(self, data)
     61             data = data.toRowMatrix()
     62 
---> 63         mat = data.center(0)
     64 
     65         svd = SVD(k=self.k, method=self.svdMethod)

/mnt/spark/spark-d25b3fe2-8240-4440-960a-0dc18dff7381/spark-c3bdb8fb-135d-4d5a-93e4-3f2059082b88/thunder_python-0.5.0_dev-py2.6.egg/thunder/rdds/series.pyc in center(self, axis)
    219         """
    220         if axis == 0:
--> 221             return self.applyValues(lambda x: x - mean(x))
    222         elif axis == 1:
    223             meanVec = self.mean()

/mnt/spark/spark-d25b3fe2-8240-4440-960a-0dc18dff7381/spark-c3bdb8fb-135d-4d5a-93e4-3f2059082b88/thunder_python-0.5.0_dev-py2.6.egg/thunder/rdds/data.pyc in applyValues(self, func, **kwargs)
    378         Series.apply
    379         """
--> 380         return self.apply(lambda (k, v): (k, func(v)), **kwargs)
    381 
    382     def collect(self, sorting=False):

/mnt/spark/spark-d25b3fe2-8240-4440-960a-0dc18dff7381/spark-c3bdb8fb-135d-4d5a-93e4-3f2059082b88/thunder_python-0.5.0_dev-py2.6.egg/thunder/rdds/data.pyc in apply(self, func, keepDtype, keepIndex)
    357         if keepIndex is False:
    358             noprop += ('_index',)
--> 359         return self._constructor(self.rdd.map(func)).__finalize__(self, noPropagate=noprop)
    360 
    361     def applyKeys(self, func, **kwargs):

/mnt/spark/spark-d25b3fe2-8240-4440-960a-0dc18dff7381/spark-c3bdb8fb-135d-4d5a-93e4-3f2059082b88/thunder_python-0.5.0_dev-py2.6.egg/thunder/rdds/data.pyc in __finalize__(self, other, noPropagate)
    100                     otherAttr = getattr(other, name, None)
    101                     if (otherAttr is not None) and (getattr(self, name, None) is None):
--> 102                         object.__setattr__(self, name, otherAttr)
    103         return self
    104 

AttributeError: can't set attribute

Calling Series.center() gives a similar AttributeError.

Fails even on the PCA example

However ICA works fine.

Include egg with library

Currently, when running on a standalone cluster, it is impossible to distribute Thunder's egg file without cloning the entire repository. A pip installation will not include setup.py, so building will fail. We should support a usage model in which the user calls pip install thunder-python and is then able to run on a cluster. To do this, we can include a copy of the currently built egg as a file inside python/lib

In addition, we should move the addPyFile step from the launch scripts to the ThunderContext creation so that users can have those files ship even without calling the thunder executable.

Idea from @laserson , thanks!

error loading floating point tifs

Attempting to load 32-bit floating point tifs causes an error due to the following exception:

ValueError: Thunder only supports luminance / greyscale images; got unknown image mode: 'F'

To replicate:

imgs = tsc.loadImages("/mnt/tmpram/f_tifs/", inputformat="tif-stack")  # assume some floating point tifs are in this directory
imgs.first()

"unexpected keyword argument" after loadImagesAsSeries on s3

Running on an ec2 cluster launched via the thunder-ec2 script...

series = tsc.loadImagesAsSeries("s3n://thunder.datasets/test/tif-stack-2/", inputformat='tif-stack')
series.first()

some output happens, then:

 org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/root/spark/python/pyspark/worker.py", line 79, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/root/spark/python/pyspark/serializers.py", line 196, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/root/spark/python/pyspark/serializers.py", line 127, in dump_stream
    for obj in iterator:
  File "/root/spark/python/pyspark/serializers.py", line 185, in _batched
    for item in iterator:
  File "/root/spark/python/pyspark/rdd.py", line 1148, in takeUpToNumLeft
    yield next(iterator)
  File "thunder/rdds/fileio/seriesloader.py", line 362, in readblockfromtif
  File "build/bdist.linux-x86_64/egg/thunder/rdds/fileio/readers.py", line 448, in close
    self._key.close(fast=True)
TypeError: close() got an unexpected keyword argument 'fast'

Support for conf file loading binary images

The binary image format require one specification (dims), and frequently requires specifying the data type (dtype). We should make it possible to load these from a conf.json folder in the same folder as the .bin files, just as we do for Series data. The format can be very simple:

{
     "dims": [X, Y, Z],
     "dtype": "int16"
}

example data: design matrix and stimulus parameter?

Hi,

The examples and the documentation look amazing. After reading the paper and visiting the webpage it's hard to resist the temptation not to run the examples. I saw that thunder comes with some example data in utils/data, however I couldn't find the design matrix & stimulus parameter to run e.g. this example: http://research.janelia.org/zebrafish/tuning.html.

I just want to make sure I'm not looking in the wrong place. Are these available somewhere?

I hope you don't mind me asking.

Much appreciated,
Matthias

Move check for folder existence earlier during conversion to series

When calling tsc.convertImagesToSeries with shuffle=True and overwrite=False the entire operation will fail half-way through if the desired save location already exists. However, the failure occurs after an expensive groupByKey and sortByKey have already taken place. We can raise an exception due to the folder check immediately, so might as well do that check up front.

error when run subset: Py4JJavaError: An error occurred while calling o102.collect.

Dr. Freeman,

Thank you for the wonderful Thunder program. I'm trying to learn it by running the example codes in the tutorial. I kept getting the Py4JJavaError when I ran code related to subset

pts = model.scores.subset(500, thresh=0.01, stat='norm')


Py4JJavaError Traceback (most recent call last)
in ()
1 # error at subset: Py4JJavaError: An error occurred while calling o102.collect.
----> 2 pts = model.scores.subset(500, thresh=0.01, stat='norm')

/Users/xuh2/anaconda/lib/python2.7/site-packages/thunder/rdds/series.pyc in subset(self, nsamples, thresh, stat)
506 if thresh is not None:
507 func = stat_dict[stat]
--> 508 result = array(self.rdd.values().filter(lambda x: func(x) > thresh).takeSample(False, nsamples))
509 else:
510 result = array(self.rdd.values().takeSample(False, nsamples))

/Users/xuh2/Downloads/software/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.pyc in takeSample(self, withReplacement, num, seed)
465 fraction = RDD._computeFractionForSampleSize(
466 num, initialCount, withReplacement)
--> 467 samples = self.sample(withReplacement, fraction, seed).collect()
468
469 # If the first sample didn't turn out large enough, keep trying to take samples;

/Users/xuh2/Downloads/software/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.pyc in collect(self)
721 """
722 with _JavaStackTrace(self.context) as st:
--> 723 bytesInJava = self._jrdd.collect().iterator()
724 return list(self._collect_iterator_through_file(bytesInJava))
725

/Users/xuh2/Downloads/software/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in call(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:

/Users/xuh2/Downloads/software/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(

Py4JJavaError: An error occurred while calling o117.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 1 times, most recent failure: Lost task 0.0 in stage 10.0 (TID 10, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/xuh2/Downloads/software/spark-1.1.0-bin-hadoop1/python/pyspark/worker.py", line 79, in main
serializer.dump_stream(func(split_index, iterator), outfile)
File "/Users/xuh2/Downloads/software/spark-1.1.0-bin-hadoop1/python/pyspark/serializers.py", line 196, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/Users/xuh2/Downloads/software/spark-1.1.0-bin-hadoop1/python/pyspark/serializers.py", line 127, in dump_stream
for obj in iterator:
File "/Users/xuh2/Downloads/software/spark-1.1.0-bin-hadoop1/python/pyspark/serializers.py", line 185, in _batched
for item in iterator:
File "/Users/xuh2/Downloads/software/spark-1.1.0-bin-hadoop1/python/pyspark/rddsampler.py", line 115, in func
if self.getUniformSample(split) <= self._fraction:
File "/Users/xuh2/Downloads/software/spark-1.1.0-bin-hadoop1/python/pyspark/rddsampler.py", line 57, in getUniformSample
self.initRandomGenerator(split)
File "/Users/xuh2/Downloads/software/spark-1.1.0-bin-hadoop1/python/pyspark/rddsampler.py", line 43, in initRandomGenerator
self._random = numpy.random.RandomState(self._seed)
File "mtrand.pyx", line 610, in mtrand.RandomState.init (numpy/random/mtrand/mtrand.c:7397)
File "mtrand.pyx", line 646, in mtrand.RandomState.seed (numpy/random/mtrand/mtrand.c:7697)
ValueError: Seed must be between 0 and 4294967295

    org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
    org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
    org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
    org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
    org.apache.spark.scheduler.Task.run(Task.scala:54)
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
    java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
    java.lang.Thread.run(Thread.java:695)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Is the `zebrafish-optomotor-response` S3 bucket not public?

I cannot access the example data from S3:

$ aws --region us-east-1 s3 ls s3://zebrafish.datasets/optomotor-response/1/data
A client error (AccessDenied) occurred when calling the ListObjects operation: Access Denied

I'd love to download the data to be able to work on my own internal cluster.

'thunder' is not recognized as an internal or external command, operable program or batch file

I tried to install thunder in a computer running 64 bit windows 7. I have set up pyspark.

Using pip install thunder-python, The install is OK, but have some warnings, says: "package init file 'thunder\lib__init__.py' not found (or not a regular file", "package init file 'thunder\standalone__init__.py' not found (or not a regular file",

and when run thunder in python shell,, it says 'thunder' is not recognized as an internal or external command, operable program or batch file.

could you provide some clue about this issue?

Thanks!

Cropping fails to squeeze empty dimensions

When cropping an image, if an empty selection is provided for one of more axes, it is not handled appropriately. It currently yields an empty selection over the entire image or volume, but should instead squeeze out the dropped dimension(s) and leave the others intact. For example:

data = tsc.loadExample('fish-images')
img = data.crop([0,0,0],[20,20,0]).first()[1]
img
>> array([], shape=(20, 20, 0), dtype=uint8)
img.shape
>> (20,20,0)

This should instead yield something with shape (20,20), as in:

img = data.first()[1][0:20,0:20,0]
img.shape
>> (20,20)

The problem is that slice indexing with, for example, [x:xx, y:yy, 0:0] does not give the same output as [x:xx, y:yy, 0], which is what we want.

Error when using thunder-ec2 with spark 1.3.0

Calling thunder-ec2 start gives me

Starting slaves...
Starting master...
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/thunder_python-0.5.0_dev-py2.7.egg/thunder/utils/ec2.py", line 563, in <module>
    opts=opts)
TypeError: wait_for_cluster_state() takes exactly 4 arguments (3 given)

This is because spark_ec2.py:wait_for_cluster_state now accepts an additional connection argument as of 1.3.0.

Adding conn=conn to the argument list in both calls to wait_for_cluster_state in thunder/utils/ec2.py fixes this error for me.

update thunder-ec2 script for spark 1.2.0

In testing against the 1.2.0 release candidate, the thunder-ec2 script fails with the following stack trace:

Traceback (most recent call last):
  File "/mnt/data/src/thunder_fork_1409/python/thunder/utils/ec2.py", line 14, in <module>
    from spark_ec2 import launch_cluster, get_existing_cluster, wait_for_cluster, stringify_command, \
ImportError: cannot import name wait_for_cluster

Indeed, wait_for_cluster no longer exists in spark 1.2.0; instead, we have a wait_for_cluster_state with a different signature. Although we expect to be migrating to spark 1.2.0 by default, the code ideally should support both versions.

Exposing options on Images.toBlocks()

The new toBlocks() conversion, under development on the new_imageblocks branch, currently takes as input a BlockingStrategy. While this will be the preferred method for constructing blocks by developers, it would be useful to expose a simpler set of options to users. In particular, an option to specify a block size (e.g. 100mb), or the block splits (e.g. [50,50,36]). The function would look something like the following pseudocode:

def toBlocks(input):

    if input is not a strategy:
        if input is string:
            assume input is a byte size
            construct the appropriate simple block strategy
        if input is a tuple:
            assume input is a split
            construct the appropriate simple block strategy
            warn users if likely block size exceeds some bound
     else:
        strategy = input

    return data.toBlocks(strategy)

Better auto-partitioning during image loading

Now that image loading supports an optional argument for the number of partitions, we should set the default in a smarter way. Rather than the default being one partition per image, it should probably be related to the number of workers, as given by sc.defaultParallelism.

For some operations there can be dramatic performance decrements when over-partitioning. For example, when loading these image data and computing a mean:

path = 's3n://neuro.datasets/svoboda.lab/tactile.navigation/1/'
data = tsc.loadImages(path + 'images-binary', recursive=True, dims=[512,512,4], ext='bin')
img = data.mean()

takes several minutes with npartitions set to the default (the number of images, in this case 9000). but only a few seconds with npartitions=50.

Doing some testing with @andrewosh, we found that the optimal number of partitions for this particular reduce operation and data is around half the number of executors. However, for other workflows (e.g. primarily with map operations) the level of parallelism should be set to 2-3x the number of executors. This difference, we think, is due to the fact that for images the records are particularly large, increasing the strain on data communication and serializion during the reduce.

Our proposal is to set the default npartitions=sc.defaultParallelism during image loading, which strikes a balance between these concerns.

@industrial-sloth ?

"unrecognized mode" from multipage 16-bit signed tiff files

Attempting to load a 16-bit signed tiff file leads to a ValueError: unrecognized mode. This appears to be a problem with the underlying PIL / Pillow library (Pillow 2.7.0 currently on ec2) rather than with thunder itself:

from PIL import Image
Image.VERSION  # returns "1.1.7"
Image.PILLOW_VERSION  # returns 2.7.0
im = Image.open("an216166_2013_07_17_run_01_sbv_01_main_002.tif")  # example 16 bit signed integer tiff file
im.mode  # returns "I;16S"
im.seek(1)  # throws ValueError

various memory errors running stats functions on moderately large images

A couple different errors can occur when running statistics methods (such as mean()) on Images data.

In one version, the stack trace will include the error: java.lang.OutOfMemoryError: Requested array size exceeds VM limit; in another, related version, the error occurs up at the python level:

File "/mnt/installs/spark_110_hadoop1/spark-1.1.0-bin-hadoop1/python/pyspark/serializers.py", line 469, in write_int
    stream.write(struct.pack("!i", value))
error: 'i' format requires -2147483648 <= number <= 2147483647

Both these errors are caused, in large part, by relatively profligate memory usage in Spark's statscounter.py, and both can be addressed with the same fix.

To replicate on ec2:
First, modify /root/spark/conf/spark-defaults.conf to replace the existing max kryo buffer size with something like the following:
spark.kryoserializer.buffer.max.mb=6000
(Without this, the error message will be something about the max kryo buffer size being exceeded, which is fundamentally uninteresting.)

The following should yield an OOM before the fix:

from thunder.rdds.fileio.imagesloader import ImagesLoader
import numpy as np
ary = np.arange(71500000, dtype='int16')
img = ImagesLoader(tsc._sc).fromArrays([ary])
mn = img.mean()

data methods not working?

Hi,

thanks for the great library. This looks extremely promising. I only started to play around with thunder so I'm probably missing something trivial here.

from thunder.utils import DataSets
data = DataSets.make(sc, "ica")
data.max() # works fine
data.variance() # fails with the traceback posted below

Best,
Matthias

My installation:

In [83]: thunder.__version__
Out[83]: '0.2.0'

In [84]: %watermark -v -m -p numpy,scipy
CPython 2.7.8
IPython 2.2.0

numpy 1.8.2
scipy 0.14.0

compiler   : GCC 4.1.2 20080704 (Red Hat 4.1.2-54)
system     : Linux
release    : 3.2.0-57-generic
machine    : x86_64
processor  : x86_64
interpreter: 64bit

Error

14/08/20 14:34:38 INFO Executor: Executor killed task 95
14/08/20 14:34:38 WARN TaskSetManager: Task 95 was killed.
14/08/20 14:34:38 INFO TaskSchedulerImpl: Removed TaskSet 11.0, whose tasks have all completed, from pool 
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-69-dc9e47b989bd> in <module>()
----> 1 data.variance()

/vol/data/matekm/dev/spark-1.0.2-bin-hadoop1/python/pyspark/rdd.pyc in variance(self)
    763         0.666...
    764         """
--> 765         return self.stats().variance()
    766 
    767     def stdev(self):

/vol/data/matekm/dev/spark-1.0.2-bin-hadoop1/python/pyspark/rdd.pyc in stats(self)
    745             return left_counter.mergeStats(right_counter)
    746 
--> 747         return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
    748 
    749     def mean(self):

/vol/data/matekm/dev/spark-1.0.2-bin-hadoop1/python/pyspark/rdd.pyc in reduce(self, f)
    646             if acc is not None:
    647                 yield acc
--> 648         vals = self.mapPartitions(func).collect()
    649         return reduce(f, vals)
    650 

/vol/data/matekm/dev/spark-1.0.2-bin-hadoop1/python/pyspark/rdd.pyc in collect(self)
    610         """
    611         with _JavaStackTrace(self.context) as st:
--> 612           bytesInJava = self._jrdd.collect().iterator()
    613         return list(self._collect_iterator_through_file(bytesInJava))
    614 

/vol/data/matekm/dev/spark-1.0.2-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    535         answer = self.gateway_client.send_command(command)
    536         return_value = get_return_value(answer, self.gateway_client,
--> 537                 self.target_id, self.name)
    538 
    539         for temp_arg in temp_args:

/vol/data/matekm/dev/spark-1.0.2-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o214.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 11.0:8 failed 1 times, most recent failure: Exception failure in TID 98 on host localhost: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/vol/data/matekm/dev/spark-1.0.2-bin-hadoop1/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/vol/data/matekm/dev/spark-1.0.2-bin-hadoop1/python/pyspark/rdd.py", line 1417, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/vol/data/matekm/dev/spark-1.0.2-bin-hadoop1/python/pyspark/rdd.py", line 312, in func
    def func(s, iterator): return f(iterator)
  File "/vol/data/matekm/dev/spark-1.0.2-bin-hadoop1/python/pyspark/rdd.py", line 747, in <lambda>
    return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
  File "/vol/data/matekm/dev/spark-1.0.2-bin-hadoop1/python/pyspark/statcounter.py", line 33, in __init__
    self.merge(v)
  File "/vol/data/matekm/dev/spark-1.0.2-bin-hadoop1/python/pyspark/statcounter.py", line 37, in merge
    delta = value - self.mu
TypeError: unsupported operand type(s) for -: 'tuple' and 'float'

        org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115)
        org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145)
        org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
        org.apache.spark.scheduler.Task.run(Task.scala:51)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
        java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        java.lang.Thread.run(Unknown Source)
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

wildcard patterns not respected when reading from s3 with shuffle=False

Attempting to load files like the following:
series = tsc.loadImagesAsSeries("s3n://thunder.datasets/test/stack-big/TM*.stack", dims=(2048, 1172, 31), startidx=0, stopidx=7, shuffle=False)
results in files that do not match the TM*.stack pattern being read.

This can lead to all kinds of weirdness downstream, perhaps the most obvious of which being errors like the following:

S3ResponseError: S3ResponseError: 416 Requested Range Not Satisfiable
<?xml version="1.0" encoding="UTF-8"?>
<Error><Code>InvalidRange</Code><Message>The requested range is not satisfiable</Message><RangeRequested>bytes=81608704-86409215</RangeRequested><ActualObjectSize>1178752</ActualObjectSize><RequestId>996784B06735CE68</RequestId><HostId>ZK3/el+P1VbLGkO5XhYezPZTh+IhBB4WuYw29Xp6WbvUqdxJCtHm3+6JSs9YiR8J59ZinKg75hQ=</HostId></Error>

SSL times out pulling from S3

I'm getting these errors when running

tsc.convertImagesToSeries(vol_path, vol_path + '/series', dims=(1184, 2048, 48), inputFormat='stack', dtype='uint16', blockSize="150M", shuffle=True, overwrite=False)

Using thunder 0.4.1 + spark 1.1.0 and thunder master + spark 1.2.1.

It looks like it's timing out when reading from S3. I increased boto's http timeout, but it didn't help. Can something be done at the Thunder level to set the SSL timeout, load things in chunks, or perhaps try-catch-retry?

org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/root/spark-1.2.1-bin-hadoop2.4/python/pyspark/worker.py", line 107, in main
    process()
  File "/root/spark-1.2.1-bin-hadoop2.4/python/pyspark/worker.py", line 98, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/root/spark-1.2.1-bin-hadoop2.4/python/pyspark/rdd.py", line 2081, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/root/spark-1.2.1-bin-hadoop2.4/python/pyspark/rdd.py", line 2081, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/root/spark-1.2.1-bin-hadoop2.4/python/pyspark/rdd.py", line 258, in func
    return f(iterator)
  File "/root/spark-1.2.1-bin-hadoop2.4/python/pyspark/rdd.py", line 1571, in combineLocally
    merger.mergeValues(iterator)
  File "/root/spark-1.2.1-bin-hadoop2.4/python/pyspark/shuffle.py", line 252, in mergeValues
    for k, v in iterator:
  File "/usr/local/lib/python2.7/dist-packages/thunder_python-0.4.1-py2.7.egg/thunder/rdds/fileio/readers.py", line 306, in readSplitFromS3
    buf = key.get_contents_as_string()
  File "/usr/local/lib/python2.7/dist-packages/boto/s3/key.py", line 1780, in get_contents_as_string
    response_headers=response_headers)
  File "/usr/local/lib/python2.7/dist-packages/boto/s3/key.py", line 1648, in get_contents_to_file
    response_headers=response_headers)
  File "/usr/local/lib/python2.7/dist-packages/boto/s3/key.py", line 1480, in get_file
    query_args=None)
  File "/usr/local/lib/python2.7/dist-packages/boto/s3/key.py", line 1533, in _get_file_internal
    for bytes in self:
  File "/usr/local/lib/python2.7/dist-packages/boto/s3/key.py", line 386, in next
    data = self.resp.read(self.BufferSize)
  File "/usr/local/lib/python2.7/dist-packages/boto/connection.py", line 413, in read
    return http_client.HTTPResponse.read(self, amt)
  File "/usr/lib/python2.7/httplib.py", line 567, in read
    s = self.fp.read(amt)
  File "/usr/lib/python2.7/socket.py", line 380, in read
    data = self._sock.recv(left)
  File "/usr/lib/python2.7/ssl.py", line 341, in recv
    return self.read(buflen)
  File "/usr/lib/python2.7/ssl.py", line 260, in read
    return self._sslobj.read(len)
SSLError: The read operation timed out

Recursive + wildcard directory listing

When loading images, it is possible to use wildcards, and it is possible to search a directory recursively, but you cannot combine the two, at least on a parallel local file system, for example, by calling:

tsc.loadImages('/path/to/images/*channel1.tif', recursive=True)

The problem is that os.walk, which is used for the recursive file listing, does not accept wildcards, see:

http://stackoverflow.com/questions/6987123/search-in-wildcard-folders-recursively-in-python

The simplest solution is probably to add a dependency on and use the glob2 library.

just a typo

not really an issue but on thefreemanlab.com installation page it says to type "pip install python-thunder" which should be "thunder-python".

Error indexing singleton series

When a selection operation on a Series object yields a singleton, it is currently treated as a scalar / int, and is paired with a singleton index. But when the singleton is itself an array, this will prevent subsequent indexing that ought to be possible. The following reveals the problem:

data = tsc.loadExample('fish-series')
model = RegressionModel.load(np.random.randn(12,240), "linear")
betas = model.fit(data).select('betas')
betas.select(0,10).first()

A fix is to recompute default indices when the singleton is an array, though this requires an extra first().

Error calling astype on singletons

Calling astype on a Data object with singleton entries yields an error. To reproduce, use:

data = tsc.loadExample('fish-series')
data.seriesMean().astype('int').first()

The error is TypeError: astype() takes no keyword arguments. It appears that when calling astype on a numpy.float64 (or other scalar), it doesn't take the same any additional arguments that work when calling it on an array. For example,

x = np.array([1.0,2.0,3.0])
x.astype('int', casting='unsafe')

works fine but this doesn't

x = np.float64(1.0)
x.astype('int', casting='unsafe')

One potential solution would be to check the length of the index before calling astype, and to only include the additional arguments if it's greater than 1.

corrupted images with big-endian tif data on ec2

It appears that the version of PIL (1.1.6) installed on AWS instances launched by the thunder-ec2 script does not correctly detect big-endian tif data, instead opening them as machine native (little endian).

Plotting the resulting images will likely show a salt-and-pepper noise pattern, but one in which the original image structure remains visible.

Thanks to Tom Sainsbury for originally reporting this issue.

Extra dimension from planes method

Currently, when calling Images.planes on volumetric data and requesting only one plane, the resulting dimensions still have three indices, but the volumes themselves are reduced to two-dimensions. For example, when running:

data = tsc.loadExample('fish-images')
data.planes(1,1).dims.count
data.planes(1,1).first()[1].shape

The shape is (76, 87), but the dimensions are (76, 87, 1). This can cause errors in subsequent operations that rely on matching dimensions (e.g. subsample). Images.planes should drop the eliminated axes when constructing new dimensions.

Inconsistent output from thunder.Colorize().images()

thunder.Colorize().images() can return NaN values in a colorized matrix unexpectedly and inconsistently. If I run the following code block, there's a probability that img contains NaN values only if img will be plotted via plt.imshow, i.e. if the loop variable n is less than 50. As there are no NaN values in the matrices being colorized, and as the appearance of NaN values in img depends on whether img is plotted, this behavior is unexpected and probably not intended.

int1 and min1 were chosen in order to shift the distribution of mats[0] such that it matched the data where I first observed this unexpected behavior.

from numpy import random
from matplotlib.pyplot import imshow

int1 = 6.2812
min1 = -3.1406
mats = random.rand(3,20,20)
mats[0] = (mats[0] * int1) - min1
for n in range(100):
    cols = thunder.Colorize(toType='hsv')
    img = cols.images(np.array((mats[0], mats[1], mats[2])))
    print(n,img.max(),img.min())

    if n < 50:
        imshow(img)

(0, 1.0, 0.0)
(1, nan, nan)
(2, 0.9995165108534424, 0.0)
(3, 0.9995165108534424, 0.0)
(4, 0.9995165108534424, 0.0)
(5, 0.9995165108534424, 0.0)
(6, 0.9995165108534424, 0.0)
(7, 0.9995165108534424, 0.0)
(8, 0.9995165108534424, 0.0)
(9, 0.9995165108534424, 0.0)
(10, 0.9995165108534424, 0.0)
(11, 0.9995165108534424, 0.0)
(12, 0.9995165108534424, 0.0)
(13, 0.9995165108534424, 0.0)
(14, 0.9995165108534424, 0.0)
(15, 0.9995165108534424, 0.0)
(16, 0.9995165108534424, 0.0)
(17, 0.9995165108534424, 0.0)
(18, 0.9995165108534424, 0.0)
(19, 0.9995165108534424, 0.0)
(20, 0.9995165108534424, 0.0)
(21, 0.9995165108534424, 0.0)
(22, 0.9995165108534424, 0.0)
(23, 0.9995165108534424, 0.0)
(24, 0.9995165108534424, 0.0)
(25, 0.9995165108534424, 0.0)
(26, 0.9995165108534424, 2.1441739946495189e-315)
(27, 0.9995165108534424, 0.0)
(28, 0.9995165108534424, 0.0)
(29, 0.9995165108534424, 0.0)
(30, 0.9995165108534424, 2.109052962725003e-315)
(31, 0.9995165108534424, 2.1027296339127651e-315)
(32, 0.9995165108534424, 0.0)
(33, 0.9995165108534424, 0.0)
(34, 0.9995165108534424, 0.0)
(35, 0.9995165108534424, 0.0)
(36, 0.9995165108534424, 0.0)
(37, 0.9995165108534424, 0.0)
(38, 0.9995165108534424, 0.0)
(39, 0.9995165108534424, 2.1081362140378317e-315)
(40, 0.9995165108534424, 2.1081362140378317e-315)
(41, 0.9995165108534424, 2.101666246541908e-315)
(42, 0.9995165108534424, 9.3872472709836843e-323)
(43, 0.9995165108534424, 0.0)
(44, nan, nan)
(45, 0.9995165108534424, 0.0)
(46, 0.9995165108534424, 0.0)
(47, 0.9995165108534424, 0.0)
(48, 0.9995165108534424, 4.9406564584124654e-324)
(49, 0.9995165108534424, 0.0)
(50, 0.9995165108534424, 2.1096390431567257e-315)
(51, 0.9995165108534424, 2.1096390431567257e-315)
(52, 0.9995165108534424, 2.1096390431567257e-315)
(53, 0.9995165108534424, 2.1096390431567257e-315)
(54, 0.9995165108534424, 2.1096390431567257e-315)
(55, 0.9995165108534424, 2.1096390431567257e-315)
(56, 0.9995165108534424, 2.1096390431567257e-315)
(57, 0.9995165108534424, 2.1096390431567257e-315)
(58, 0.9995165108534424, 2.1096390431567257e-315)
(59, 0.9995165108534424, 2.1096390431567257e-315)
(60, 0.9995165108534424, 2.1096390431567257e-315)
(61, 0.9995165108534424, 2.1096390431567257e-315)
(62, 0.9995165108534424, 2.1096390431567257e-315)
(63, 0.9995165108534424, 2.1096390431567257e-315)
(64, 0.9995165108534424, 2.1096390431567257e-315)
(65, 0.9995165108534424, 2.1096390431567257e-315)
(66, 0.9995165108534424, 2.1096390431567257e-315)
(67, 0.9995165108534424, 2.1096390431567257e-315)
(68, 0.9995165108534424, 2.1096390431567257e-315)
(69, 0.9995165108534424, 2.1096390431567257e-315)
(70, 0.9995165108534424, 2.1096390431567257e-315)
(71, 0.9995165108534424, 2.1096390431567257e-315)
(72, 0.9995165108534424, 2.1096390431567257e-315)
(73, 0.9995165108534424, 2.1096390431567257e-315)
(74, 0.9995165108534424, 2.1096390431567257e-315)
(75, 0.9995165108534424, 2.1096390431567257e-315)
(76, 0.9995165108534424, 2.1096390431567257e-315)
(77, 0.9995165108534424, 2.1096390431567257e-315)
(78, 0.9995165108534424, 2.1096390431567257e-315)
(79, 0.9995165108534424, 2.1096390431567257e-315)
(80, 0.9995165108534424, 2.1096390431567257e-315)
(81, 0.9995165108534424, 2.1096390431567257e-315)
(82, 0.9995165108534424, 2.1096390431567257e-315)
(83, 0.9995165108534424, 2.1096390431567257e-315)
(84, 0.9995165108534424, 2.1096390431567257e-315)
(85, 0.9995165108534424, 2.1096390431567257e-315)
(86, 0.9995165108534424, 2.1096390431567257e-315)
(87, 0.9995165108534424, 2.1096390431567257e-315)
(88, 0.9995165108534424, 2.1096390431567257e-315)
(89, 0.9995165108534424, 2.1096390431567257e-315)
(90, 0.9995165108534424, 2.1096390431567257e-315)
(91, 0.9995165108534424, 2.1096390431567257e-315)
(92, 0.9995165108534424, 2.1096390431567257e-315)
(93, 0.9995165108534424, 2.1096390431567257e-315)
(94, 0.9995165108534424, 2.1096390431567257e-315)
(95, 0.9995165108534424, 2.1096390431567257e-315)
(96, 0.9995165108534424, 2.1096390431567257e-315)
(97, 0.9995165108534424, 2.1096390431567257e-315)
(98, 0.9995165108534424, 2.1096390431567257e-315)
(99, 0.9995165108534424, 2.1096390431567257e-315)

EDIT: added sample output of code snippet

Access keys unavailiable

During our EC2 setup we add the user's access keys to /root/ephemeral-hdfs/conf/core-site.xml. It appears that they now must also be added to /root/spark/conf/core-site.xml. Not immediately clear what change has prompted this or when it occurred, possibly at the release of Spark 1.2. But I have confirmed that setting keys in the first conf file only fails to authenticate when loading data, but adding keys to the second conf file fixes the problem. Will push a patch with the fix shortly.

For reference, this is the error message that appeared during loading any public data set from S3:

Py4JJavaError: An error occurred while calling o29.partitions.
: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).

Standardize metadata repr

Currently the output of __repr__ on our various data objects is inconsistent and a little confusing. The order of printed attributes should ideally reflect the underlying class hierarchy, that is, start with the attributes that are common. It would help to also replace the various references to the number of elements with a single attribute, nrecords, defined on the Data class. Here is a proposal for what the listings should look like for the current primary data objects (based on a discussion with @industrial-sloth):

Images
nrecords: 1000
dtype: 'float'
dims: (x,y,z)

Series
nrecords: 1000
dtype: 'float'
dims: (x,y,z)
index: [0, 1, 2, 3, 4, 5, ...]

RowMatrix
nrecords: 1000
dims: (x,y,z)
dtype: 'float'
index: [0, 1, 2, 3, 4, 5, ...]
nrows: 1000
ncols: 20

Persisting index on Series object

Currently, when a 'Series' object is serialized (e.g. for persisting to disk or transfer over a network), the index is discarded. Thus when the 'Series' is deserialized, the index will be lost and must be recomputed; this is problematic, especially if the user does not have access to the code created the index in the first place.

One reasonable solution would be to define an Index class that wraps the current list containing the index and can also handle serialization/deserialization as well as checking that the values in the list are serializable.

Add sparse version of pack

For Series data with non-contiguous keys (e.g. when representing sparse arrays), packing into a local array fails because the dimensions are not fully spanned. We should add a packSparse method that constructs a local array and fills in its entries with values at locations given by the keys. We should allow dimensions to be an optional argument, in case the bounds of the keys in the data itself do not span the range of the desired array.

Consider Imageio for image loading

We have had repeated issues with PIL / pillow, including malfunctioning installations on certain OS X versions, as well as tif-loading behavior that works in pillow but not PIL (see e.g. #82 ).

@broxtronix suggested we check out https://github.com/imageio/imageio. It looks promising. We should evaluate whether it has sufficient functionality to replace all of our current usages of PIL / pillow.

Missing Thunder imports on EC2 slaves

I am running Thunder (master branch, updated today) on EC2, and I have been encountering a problem wherin my ec2 slave instances are having trouble importing and running Thunder code. I've included one simple example that triggers this behavior below.

Poking around a bit in the thunder/python/thunder/utils/ec2.py script, it looks like Thunder is installed on the master node, but not on the slaves nodes. Only '/root/thunder/python/thunder/utils/data/' gets mirrored over to the slave nodes.

I'm guessing that this is because Spark will often pickle and send over any Thunder code on the master python process to the slave processes, but this does not seem to be working correctly here. I had been under the impression that this pickling capability only worked for symbols that had been imported into the master node's python process, but not if the code called by one of those symbols then called an import command of its own. That is, if 'import ' is called from within a pickled function on the slave nodes, it will attempt to find that library on the slave node's local python installation, which would explain why it is failing here since thunder is not installed locally on the slaves!

As a workaround for now, I have installed Thunder on the slave nodes using the following commands:

~/spark-ec2/copy-dir /root/thunder
pssh -h /root/spark-ec2/slaves echo /root/thunder/python >> /usr/lib/python2.6/site-packages/paths.pth

(The second command ensures that thunder is in the PYTHONPATH for the slaves)

Let me know if I'm missing anything, or if there is any trick to getting the pickling / import to work with the code residing solely on the master node. Thanks!!

Test case

imagepath = 's3n://path/to/some/s3/data'
rdd_vols = tsc.loadImages(imagepath, inputformat='tif-stack').cache()
num_vols = rdd_vols.count() # Force data to be cached

rdd_series = rdd_vols.toSeries().cache()
num_series = rdd_series.count()

This produces the following error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-16-509ed1013e0d> in <module>()
----> 1 rdd_series = rdd_vols.toSeries().cache()
      2 num_series = rdd_series.count()
      3 print num_series

/root/thunder/python/thunder/rdds/images.pyc in toSeries(self, blockSize, splitsPerDim, groupingDim)
    280         blocksdata = self._scatterToBlocks(blockSize=blockSize, blocksPerDim=splitsPerDim, groupingDim=groupingDim)
    281 
--> 282         return blocksdata.toSeries(seriesDim=0)
    283 
    284     def saveAsBinarySeries(self, outputdirname, blockSize="150M", splitsPerDim=None, groupingDim=None,

/root/thunder/python/thunder/rdds/imageblocks.py in toSeries(self, seriesDim, newdtype, casting)
     24             return ImageBlocks._blockToSeries(blockVal, seriesDim)
     25 
---> 26         blockedrdd = self._groupIntoSeriesBlocks()
     27 
     28         # returns generator of (z, y, x) array data for all z, y, x

/root/thunder/python/thunder/rdds/imageblocks.py in _groupIntoSeriesBlocks(self)
     84         # sort must come after group, b/c group will mess with ordering.
     85         # return self.rdd.groupByKey().sortByKey(ascending=True).mapValues(lambda v: ImageBlockValue.fromPlanarBlocks(v, 0))
---> 86         sortedRdd = self.rdd.groupByKey().sortBy(lambda (k, _): k[::-1])
     87         return sortedRdd.mapValues(lambda v: ImageBlockValue.fromPlanarBlocks(v, 0))
     88 

/root/spark/python/pyspark/rdd.pyc in sortBy(self, keyfunc, ascending, numPartitions)
    631         [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
    632         """
--> 633         return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values()
    634 
    635     def glom(self):

/root/spark/python/pyspark/rdd.pyc in sortByKey(self, ascending, numPartitions, keyfunc)
    601         # the key-space into bins such that the bins have roughly the same
    602         # number of (key, value) pairs falling into them
--> 603         rddSize = self.count()
    604         maxSampleSize = numPartitions * 20.0  # constant from Spark's RangePartitioner
    605         fraction = min(maxSampleSize / max(rddSize, 1), 1.0)

/root/spark/python/pyspark/rdd.pyc in count(self)
    845         3
    846         """
--> 847         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
    848 
    849     def stats(self):

/root/spark/python/pyspark/rdd.pyc in sum(self)
    836         6.0
    837         """
--> 838         return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
    839 
    840     def count(self):

/root/spark/python/pyspark/rdd.pyc in reduce(self, f)
    757             if acc is not None:
    758                 yield acc
--> 759         vals = self.mapPartitions(func).collect()
    760         return reduce(f, vals)
    761 

/root/spark/python/pyspark/rdd.pyc in collect(self)
    721         """
    722         with _JavaStackTrace(self.context) as st:
--> 723             bytesInJava = self._jrdd.collect().iterator()
    724         return list(self._collect_iterator_through_file(bytesInJava))
    725 

/usr/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/usr/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o197.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 400 in stage 9.0 failed 4 times, most recent failure: Lost task 400.3 in stage 9.0 (TID 1743, ip-10-65-128-31.ec2.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/root/spark/python/pyspark/worker.py", line 75, in main
    command = pickleSer._read_with_length(infile)
  File "/root/spark/python/pyspark/serializers.py", line 150, in _read_with_length
    return self.loads(obj)
ImportError: No module named thunder.rdds.imageblocks

        org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
        org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
        org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:265)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
        org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

trouble accessing EC2 example data

Hello thunder team,

apologies if this is a novice user error but I wanted to look at the example designmatrix for regression analysis and tried:

data, designmatrix = tsc.loadExampleEC2('zebrafish-optomotor-response') per the response to issue #17 however I receive an error regarding access keys matching s3n urls.

Please let me know if there is something I have wrong. I seem to be connecting to EC2 successfully and have tried uninstalling and re-installing thunder.

Thank You!


Py4JJavaError Traceback (most recent call last)
in ()
----> 1 data, designmatrix = tsc.loadExampleEC2('zebrafish-optomotor-response')

/root/thunder/python/thunder/utils/context.pyc in loadExampleEC2(self, dataset)
546 nkeys=3)
547 paramFile = self._sc.textFile("s3n://" + path + "params.json")
--> 548 params = json.loads(paramFile.first())
549 modelFile = asarray(params['trials'])
550 return data, modelFile

/root/spark/python/pyspark/rdd.py in first(self)
1165 2
1166 """
-> 1167 return self.take(1)[0]
1168
1169 def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None):

/root/spark/python/pyspark/rdd.py in take(self, num)
1124 """
1125 items = []
-> 1126 totalParts = self._jrdd.partitions().size()
1127 partsScanned = 0
1128

/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in call(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:

/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(

Py4JJavaError: An error occurred while calling o29.partitions.
: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).
at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:66)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
at org.apache.hadoop.fs.s3native.$Proxy7.initialize(Unknown Source)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:216)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:179)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:50)
at org.apache.spark.api.java.JavaRDD.partitions(JavaRDD.scala:32)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.