blaze / odo Goto Github PK
View Code? Open in Web Editor NEWData Migration for the Blaze Project
Home Page: http://odo.readthedocs.org/
License: BSD 3-Clause "New" or "Revised" License
Data Migration for the Blaze Project
Home Page: http://odo.readthedocs.org/
License: BSD 3-Clause "New" or "Revised" License
migration from blaze
reposting for ease of use:
from @chdoig
As Spark is becoming a popular backend, it's a common use case for people to want to transfer their datasets in DBs to a Spark/HDFS cluster.
It would be nice to have an easy interface for end-users to transfer their tables in DBs to a Cluster.
into(Spark/HDFS, SQL DBs)
A lot of people are talking now about tachyon, maybe worth taking a look:
http://tachyon-project.org/
http://ampcamp.berkeley.edu/big-data-mini-course/tachyon.html
This might be related with @quasiben work on SparkSQL. Maybe a barrier for people to star using SparkSQL is how they should make that transfer since:
A SchemaRDD can be created from an existing RDD, Parquet file, a JSON dataset, or by running HiveQL against data stored in Apache Hive.
But I'm not able to find how you make that connection from existing SQL DBs:
http://spark.apache.org/docs/latest/sql-programming-guide.html
cc: @mrocklin
SparkSQL officially speaks Hive. I think we're currently sending it MySQL and possibly through the wrong method. If we have pyhive install we should probably use that instead.
Blaze still holds our recipes for PySpark and SparkSQL in the old "just into" style. These should be brought over to into and translated into append
/convert
style.
putting this issue here, as it's an into conversion, not really a blaze specific thing
In [1]: from pyspark import SparkContext, HiveContext, SQLContext
In [2]: sc = SparkContext('local[*]', 'test')
Spark assembly has been built with Hive, including Datanucleus jars on classpath
In [3]: sql = SQLContext(sc)
In [4]: df = pd.DataFrame({'a': [1,2,3,1,2,3], 'b': [4,5,6,45,5,6], 'c': list('abcdef')})
In [5]: df.head(2)
Out[5]:
a b c
0 1 4 a
1 2 5 b
In [6]: into(sql, df, name='df')
Out[6]: MapPartitionsRDD[12] at mapPartitions at SerDeUtil.scala:143
In [7]: discover(sql)
Out[7]: dshape("{}")
default False, but when True print out progress chunk (llike on chunk i/n). Useful when doing a long conversion
Maybe I'm missing something, but why does Odo detect and reroute on NotImplementedError
instead of NotImplemented
?
Python docs advise raising NotImplementError
from abstract methods that are supposed to be overwritten by derived classes.
NotImplemented
is intended for "rich comparisons" and choosing which object should handle these comparisons.
Odo's path choosing seems to be more analogous to the latter rather than the former. So maybe backends should return NotImplemented
instead of raising NotImplementedError
. This might benefit performance slightly too, since raising and catching exceptions probably incurs more overhead than returning and checking constants.
When executing the tests in backends/tests, several tests fail, since an exception is thrown when trying to delete temporary files.
py.test test_hdfstore.py
←[1m============================= test session starts =============================←[0m
platform win32 -- Python 2.7.8 -- py-1.4.26 -- pytest-2.6.4
collected 12 items ←[0m←[1m
←[0m
test_hdfstore.py FFFFFFFFFFFF
================================== FAILURES ===================================
________________________________ test_discover ________________________________
←[1m def test_discover():←[0m
←[1m> with tmpfile('hdf5') as fn:←[0m
←[1m df.to_hdf(fn, '/a/b/data')←[0m
test_hdfstore.py:39:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
..\..\..\..\contextlib.py:17: in __enter__
←[1m return self.gen.next()←[0m
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
extension = '.hdf5'
←[1m @contextmanager←[0m
←[1m def tmpfile(extension=''):←[0m
←[1m extension = '.' + extension.lstrip('.')←[0m
←[1m handle, filename = tempfile.mkstemp(extension)←[0m
←[1m> os.remove(filename)←[0m
←[1m←[31mE WindowsError: [Error 32] Der Prozess kann nicht auf die Datei zugreifen, da sie von einem anderen Prozess verwendet wird: 'c:\\users\\acbdef\\appdata\\local\\temp\\tmpq9_zuc.hdf5'←[0m
..\..\utils.py:44: WindowsError
It's entirely cosmetic, but the graphviz network should be updated with odo
-- perhaps there is some new functionality which could be merged into the network?
More work to be done with spark and sparksql
I would like to augment the blaze server so that if someone updates an hdfstore, the blaze server can do compute on the updated file.
Here is my plan, I'd like your feedback on it
(maybe we call it livehdfstore?)
The other option is to look for a kwarg in the resource call (live=True?) and dispatch based on that to either the original hdfstore, or the subclass?
The newly add ( #26 ) JSON endpoints don't yet support reading/writing gzip files. In theory this isn't hard to add but in practice making this work on Windows/Mac/Linux for Python 2/3 can be a bit of a pain.
Help welcome.
Objects like HDF5 files, SQL databases, Mongo Databases, and dicts of chunks of things might all be convertible. One could also imagine adding a very thin Directory
type that expressed a filesystem hierarchy as a nested record type.
A natural common interchange format here might include dicts of either chunks(DataFrame)
or chunks(ndarray)
depending on the underlying dshape. This would also be a good place for chunks(dynd.nd.array)
.
From a private conversation:
Would be nice if into
could move files in S3 to e.g., Redshift
data = into('redshift://conn_info::table', 's3://my/s3/bucket/*.csv')
blaze 0.7.2, into 0.2.2
so 2 issues
os.path.sep
Valid SQLite URL forms are:
sqlite:///:memory: (or, sqlite://)
sqlite:///relative/path/to/file.db
sqlite:////absolute/path/to/file.db
In [1]: from blaze import Data
In [2]: import sqlite3
In [3]: db = sqlite3.connect('foo.db')
In [6]: import pandas as pd
In [8]: import numpy as np
In [9]: pd.DataFrame(np.random.rand(5,5)).to_sql('example',db)
In [10]: Data('sqlite:///foo.db')
Out[10]:
Data: Engine(sqlite:///foo.db)
DataShape: {
example: var * {
index: ?int32,
0: ?float64,
1: ?float64,
2: ?float64,
3: ?float64,
...
In [11]: Data('sqlite:///C:foo.db')
Out[11]:
Data: Engine(sqlite:///C:foo.db)
DataShape: {
example: var * {
index: ?int32,
0: ?float64,
1: ?float64,
2: ?float64,
3: ?float64,
...
# ok as the paths aren't escaped propertly
In [12]: Data('sqlite:///C:\Users\Jeff Reback\foo.db')
OperationalError: (OperationalError) unable to open database file None None
In [13]: Data('sqlite:///C:\\Users\\Jeff Reback\\foo.db')
Out[13]:
Data: Engine(sqlite:///C:\Users\Jeff Reback\foo.db)
DataShape: {
example: var * {
index: ?int32,
0: ?float64,
1: ?float64,
2: ?float64,
3: ?float64,
...
# shouldn't this work?
In [14]: Data('sqlite:////C:\\Users\\Jeff Reback\\foo.db')
OperationalError: (OperationalError) unable to open database file None None
@mrocklin, I was trying to get a new version for python 2.7 in binstar, as instructed by doing:
conda install -c blaze into -f
and I was still getting my previous error:
/Users/cdoig/anaconda/envs/topik/lib/python2.7/site-packages/into/convert.pyc in series_to_array(s, dshape, **kwargs)
66 @convert.register(np.ndarray, pd.Series, cost=0.1)
67 def series_to_array(s, dshape=None, **kwargs):
---> 68 dtype = datashape.to_numpy_dtype(datashape.dshape(dshape))
69 sdtype = s.dtype
70 values = s.values
I then checked binstar and wasn't able to find a new package for 2.7. I was just seeing this package info:
depends [u'datashape', u'multipledispatch', u'networkx', u'numpy 1.9*', u'pandas', u'python 3.4*', u'toolz']
I then discovered that binstar was creating the same name for all versions:
586[warning] Distribution osx-64/into-0.2.2-8_g68222bc.tar.bz2 already exists ... removing
587Exit BINSTAR_BUILD_RESULT=success
and it was removing the previous built package (with different python and numpy versions):
e.g.:
https://binstar.org/blaze/into/builds/85/9#log
https://binstar.org/blaze/into/builds/85/8#log
and that the one that remained was the latest:
https://binstar.org/blaze/into/builds/85/11, with numpy=1.9 and python=3.4
I've opened a PR #112 to add the python version to the string, like @tswicegood does in conda-env:
https://github.com/conda/conda-env/blob/develop/conda.recipe/meta.yaml
But still haven't figured out how to include the numpy version.
It would be nice to automate the metadata loading process from CSV files on HDFS into the Hive metastore. In principle the CREATE TABLE
statement can be handled as we already do in backends/sql_csv.py
. Some novel parts
Chunks
)The metaclass can probably be a no-op.
PyWebHDFS or more generally the WebHDFS web interface seems like a reasonable approach to interacting with a foreign HDFS. I'm running into issues with it now though; it seems that some link gets internally redirected and becomes opaque.
We may want to handle the downloading/HTTP request ourselves so that we can use an HTTP RANGE GET request (or anything really that lets us download just some of a file.)
cc @quasiben
This is sort of a gateway issue to handling HDFS data generally. HDFS(CSV) -> pyspark.RDD
could be a thing as well as (if we find a suitable library, HDFS(CSV) -> HDFS(Parquet)
.
http://odo.readthedocs.org/en/latest/uri.html
should be clear that the root is implicate on the table if its necessary
e.g.
odo(...,'hdfstore://foo.h5::bar')
and odo(...,'hdfstore://foo.h5::/bar')
are equivalent
Content of json file located in user-temp directory:
[{"amount": 100, "name": "Alice"}, {"amount": 200, "name": "Bob"}]
$ py.test test_json.py
←[1m============================= test session starts =============================←[0m
platform win32 -- Python 2.7.8 -- py-1.4.26 -- pytest-2.6.4
collected 21 items ←[0m←[1m
←[0m
test_json.py ...FF........F....F..
================================== FAILURES ===================================
________________________________ test_resource ________________________________
←[1m def test_resource():←[0m
←[1m> assert isinstance(resource('jsonlines://foo.json'), JSONLines)←[0m
←[1m←[31mE assert isinstance(<into.directory._Directory object at 0x0422CEB0>, JSONLines)←[0m
←[1m←[31mE + where <into.directory._Directory object at 0x0422CEB0> = resource('jsonlines://foo.json')←[0m
test_json.py:55: AssertionError
___________________________ test_resource_guessing ____________________________
←[1m def test_resource_guessing():←[0m
←[1m with json_file(dat) as fn:←[0m
←[1m> assert isinstance(resource(fn), JSON)←[0m
test_json.py:63:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
..\..\regex.py:64: in __call__
←[1m return self.dispatch(s)(s, *args, **kwargs)←[0m
..\..\directory.py:57: in resource_directory
←[1m subtype = type(resource(one_uri, **kwargs))←[0m
..\..\regex.py:64: in __call__
←[1m return self.dispatch(s)(s, *args, **kwargs)←[0m
!!! Recursion detected (same locals & position)
___________________________ test_multiple_jsonlines ___________________________
←[1m def test_multiple_jsonlines():←[0m
←[1m try:←[0m
←[1m with open('_test_a1.json', 'w') as f:←[0m
←[1m json.dump(dat, f)←[0m
←[1m with open('_test_a2.json', 'w') as f:←[0m
←[1m json.dump(dat, f)←[0m
←[1m> r = resource('_test_a*.json')←[0m
test_json.py:154:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
..\..\regex.py:64: in __call__
←[1m return self.dispatch(s)(s, *args, **kwargs)←[0m
..\..\directory.py:57: in resource_directory
←[1m subtype = type(resource(one_uri, **kwargs))←[0m
..\..\regex.py:64: in __call__
←[1m return self.dispatch(s)(s, *args, **kwargs)←[0m
..\..\directory.py:57: in resource_directory
←[1m subtype = type(resource(one_uri, **kwargs))←[0m
..\..\regex.py:64: in __call__
←[1m return self.dispatch(s)(s, *args, **kwargs)←[0m
!!! Recursion detected (same locals & position)
_____________________________ test_resource_gzip ______________________________
←[1m def test_resource_gzip():←[0m
←[1m> assert isinstance(resource('foo.json.gz'), (JSON, JSONLines))←[0m
←[1m←[31mE assert isinstance(<into.directory._Directory object at 0x04364030>, (<class 'into.backends.json.JSON'>, <cla
ss 'into.backends.json.JSONLines'>))←[0m
←[1m←[31mE + where <into.directory._Directory object at 0x04364030> = resource('foo.json.gz')←[0m
test_json.py:206: AssertionError
←[1m←[31m===================== 4 failed, 17 passed in 2.78 seconds =====================←[0m```
It would be nice to quickly view the contents of an S3 bucket
>>> discover(resource('s3://blaze_data'))
...
There doesn't seem to be a way to get from an HDFStore to SQL, though you can go from an HDFStore to a Dataframe and a DataFrame to SQL.
This fails:
blz.into('sqlite:///bayarea.db::buildings', 'hdfstore://bayarea.h5::/buildings')
with this error:
NetworkXNoPath: node <class 'pandas.io.pytables.FrameFixed'> not reachable from <class '_abcoll.Iterator'>
But this works fine:
blz.into(pd.DataFrame, 'hdfstore://bayarea.h5::/buildings')
And this works fine:
buildings = store['buildings']
blz.into('sqlite:///bayarea.db::buildings', buildings)
By default we ignore the index. We should probably only do this if it's the trivial ordering 0, 1, 2, ...
and treat it as data otherwise.
Loading the following data from a csv and writing it to a SQLite table
,operation,name,aircraft,datetime_nearest
151,S16,24_002_A320,A320,2014-05-15 07:46:15.100000
154,S16,24_005_A321,A321,2014-05-15 08:08:23.200000
using
d = Data('file.csv')
into('sqlite:///../db.sqlite3::data_event', d)
results in the error below.
I was using the anaconda version
into 0.1.3 np19py34_0
---------------------------------------------------------------------------
StatementError Traceback (most recent call last)
<ipython-input-25-8bc54b31474a> in <module>()
----> 1 into('sqlite:///../db.sqlite3::data_event', d)
/home/rifr/anaconda3/envs/prj/lib/python3.4/site-packages/multipledispatch/dispatcher.py in __call__(self, *args, **kwargs)
161 self._cache[types] = func
162 try:
--> 163 return func(*args, **kwargs)
164
165 except MDNotImplementedError:
/home/rifr/anaconda3/envs/prj/lib/python3.4/site-packages/blaze/interactive.py in into(a, b, **kwargs)
258 result = compute(b, **kwargs)
259 kwargs['dshape'] = b.dshape
--> 260 return into(a, result, **kwargs)
261
262
/home/rifr/anaconda3/envs/prj/lib/python3.4/site-packages/multipledispatch/dispatcher.py in __call__(self, *args, **kwargs)
161 self._cache[types] = func
162 try:
--> 163 return func(*args, **kwargs)
164
165 except MDNotImplementedError:
/home/rifr/anaconda3/envs/prj/lib/python3.4/site-packages/into/into.py in into_string(uri, b, **kwargs)
72 pass
73 a = resource(uri, **kwargs)
---> 74 return into(a, b, **kwargs)
75
76
/home/rifr/anaconda3/envs/prj/lib/python3.4/site-packages/multipledispatch/dispatcher.py in __call__(self, *args, **kwargs)
161 self._cache[types] = func
162 try:
--> 163 return func(*args, **kwargs)
164
165 except MDNotImplementedError:
/home/rifr/anaconda3/envs/prj/lib/python3.4/site-packages/into/into.py in into_object(a, b, **kwargs)
58 except NotImplementedError:
59 pass
---> 60 return append(a, b, **kwargs)
61
62
/home/rifr/anaconda3/envs/prj/lib/python3.4/site-packages/multipledispatch/dispatcher.py in __call__(self, *args, **kwargs)
161 self._cache[types] = func
162 try:
--> 163 return func(*args, **kwargs)
164
165 except MDNotImplementedError:
/home/rifr/anaconda3/envs/prj/lib/python3.4/site-packages/into/backends/sql.py in append_anything_to_sql_Table(t, o, **kwargs)
234 @append.register(sa.Table, object)
235 def append_anything_to_sql_Table(t, o, **kwargs):
--> 236 return append(t, convert(Iterator, o, **kwargs), **kwargs)
237
238
/home/rifr/anaconda3/envs/prj/lib/python3.4/site-packages/multipledispatch/dispatcher.py in __call__(self, *args, **kwargs)
161 self._cache[types] = func
162 try:
--> 163 return func(*args, **kwargs)
164
165 except MDNotImplementedError:
/home/rifr/anaconda3/envs/prj/lib/python3.4/site-packages/into/backends/sql.py in append_iterator_to_table(t, rows, **kwargs)
227 with engine.connect() as conn:
228 for chunk in partition_all(1000, rows): # TODO: 1000 is hardcoded
--> 229 conn.execute(t.insert(), chunk)
230
231 return t
/home/rifr/anaconda3/envs/prj/lib/python3.4/site-packages/sqlalchemy/engine/base.py in execute(self, object, *multiparams, **params)
727 type(object))
728 else:
--> 729 return meth(self, multiparams, params)
730
731 def _execute_function(self, func, multiparams, params):
/home/rifr/anaconda3/envs/prj/lib/python3.4/site-packages/sqlalchemy/sql/elements.py in _execute_on_connection(self, connection, multiparams, params)
320
321 def _execute_on_connection(self, connection, multiparams, params):
--> 322 return connection._execute_clauseelement(self, multiparams, params)
323
324 def unique_params(self, *optionaldict, **kwargs):
/home/rifr/anaconda3/envs/prj/lib/python3.4/site-packages/sqlalchemy/engine/base.py in _execute_clauseelement(self, elem, multiparams, params)
824 compiled_sql,
825 distilled_params,
--> 826 compiled_sql, distilled_params
827 )
828 if self._has_events or self.engine._has_events:
/home/rifr/anaconda3/envs/prj/lib/python3.4/site-packages/sqlalchemy/engine/base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
891 self._handle_dbapi_exception(e,
892 util.text_type(statement), parameters,
--> 893 None, None)
894
895 if context.compiled:
/home/rifr/anaconda3/envs/prj/lib/python3.4/site-packages/sqlalchemy/engine/base.py in _handle_dbapi_exception(self, e, statement, parameters, cursor, context)
1157 util.raise_from_cause(
1158 sqlalchemy_exception,
-> 1159 exc_info
1160 )
1161 else:
/home/rifr/anaconda3/envs/prj/lib/python3.4/site-packages/sqlalchemy/util/compat.py in raise_from_cause(exception, exc_info)
186 exc_info = sys.exc_info()
187 exc_type, exc_value, exc_tb = exc_info
--> 188 reraise(type(exception), exception, tb=exc_tb, cause=exc_value)
189 else:
190 exec("def reraise(tp, value, tb=None, cause=None):\n"
/home/rifr/anaconda3/envs/prj/lib/python3.4/site-packages/sqlalchemy/util/compat.py in reraise(tp, value, tb, cause)
179 value.__cause__ = cause
180 if value.__traceback__ is not tb:
--> 181 raise value.with_traceback(tb)
182 raise value
183
/home/rifr/anaconda3/envs/prj/lib/python3.4/site-packages/sqlalchemy/engine/base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
887 conn = self._revalidate_connection()
888
--> 889 context = constructor(dialect, self, conn, *args)
890 except Exception as e:
891 self._handle_dbapi_exception(e,
/home/rifr/anaconda3/envs/prj/lib/python3.4/site-packages/sqlalchemy/engine/default.py in _init_compiled(cls, dialect, connection, dbapi_connection, compiled, parameters)
571 for key in self.compiled.positiontup:
572 if key in processors:
--> 573 param.append(processors[key](compiled_params[key]))
574 else:
575 param.append(compiled_params[key])
/home/rifr/anaconda3/envs/prj/lib/python3.4/site-packages/sqlalchemy/dialects/sqlite/base.py in process(value)
370 }
371 else:
--> 372 raise TypeError("SQLite DateTime type only accepts Python "
373 "datetime and date objects as input.")
374 return process
StatementError: SQLite DateTime type only accepts Python datetime and date objects as input. (original cause: TypeError: SQLite DateTime type only accepts Python datetime and date objects as input.) 'INSERT INTO data_event (name, operation_id, datetime_nearest, aircraft) VALUES (?, ?, ?, ?)' ({'name': 151, 'aircraft': 'A320', 'datetime_nearest': '24_002_A320', 'operation_id': 'S16'}, {'name': 154, 'aircraft': 'A321', 'datetime_nearest': '24_005_A321', 'operation_id': 'S16'})
On the SQLAlchemy website it is stated that while SQLite doesn't support Datetime's natively, SQLAlchemy can take care of this, so I would expect into
to work just fine with datetimes.
In [28]: d.dshape
Out[28]:
dshape("""var * {
Unnamed: 0: int64,
operation: string,
name: string,
aircraft: string,
datetime_nearest: datetime
}""")
I'd be ok with not having it for py26
blaze issue migration
original error from @mrocklin
In [1]: !cat data.csv
Name,Daily
A,0.15
B,0.135
C,0.110
In [2]: from blaze import *
In [3]: csv = CSV('data.csv')
In [4]: Table(csv)
Out[4]:
Name Daily
0 A 0.150
1 B 0.135
2 C 0.110
In [5]: sql = SQL('sqlite:///:memory:', 'data', schema=csv.schema)
In [6]: into(sql, csv)
Out[6]: <blaze.data.sql.SQL at 0x7f4a5a82b410>
In [7]: list(sql)
ValueError: parse error converting string "Daily" to float64
This breaks both
pip install into
The behavior of the `>` version specifier has changed in PEP 440. `>` is now an exclusive operator
, meaning that >0.15 does not match >0.15.*. Perhaps you want `>=` instead of `>`?
Could not find a version that satisfies the requirement pandas>0.15 (from into) (from versions: 0.
1, 0.2b0, 0.2b1, 0.2, 0.3.0b0, 0.3.0b2, 0.3.0, 0.4.0, 0.4.1, 0.4.2, 0.4.3, 0.5.0, 0.6.0, 0.6.1, 0.7.
0rc1, 0.7.0, 0.7.1, 0.7.2, 0.7.3, 0.8.0rc1, 0.8.0rc2, 0.8.0, 0.8.1, 0.9.0, 0.9.1, 0.10.0, 0.10.1, 0.
11.0, 0.12.0, 0.13.0, 0.13.1, 0.14.0, 0.14.1, 0.15.0, 0.15.1, 0.15.2)
No distributions matching the version for pandas>0.15 (from into)
python setup.py install
Searching for pandas>0.15
Reading https://pypi.python.org/simple/pandas/
No local packages or download links found for pandas>0.15
error: Could not find suitable distribution for Requirement.parse('pandas>0.15')
(Tested under windows 7, python3.4 x64 with latest pip, setuptools)
Apparently culprit is commit e44a1ec
e.g.
this is an append
odo(...,'hdfstore://foo.h5::bar')
and this is not
odo(...,'hdfstore://foo.h5::bar',mode='w')
The following code is failing for me on Windows, Python 2.7:
>>> from into import into
>>> into(list, 'test.csv')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "C:\Python27\lib\site-packages\multipledispatch\dispatcher.py", line 163, in __call__
return func(*args, **kwargs)
File "C:\Python27\lib\site-packages\into\into.py", line 79, in into_string_string
r = resource(b, **kwargs)
File "C:\Python27\lib\site-packages\into\regex.py", line 64, in __call__
return self.dispatch(s)(s, *args, **kwargs)
File "C:\Python27\lib\site-packages\into\directory.py", line 57, in resource_directory
subtype = type(resource(one_uri, **kwargs))
File "C:\Python27\lib\site-packages\into\regex.py", line 64, in __call__
return self.dispatch(s)(s, *args, **kwargs)
...
File "C:\Python27\lib\site-packages\into\regex.py", line 64, in __call__
return self.dispatch(s)(s, *args, **kwargs)
File "C:\Python27\lib\site-packages\into\directory.py", line 54, in resource_directory
one_uri = first(glob(uri))
File "C:\Python27\lib\glob.py", line 27, in glob
return list(iglob(pathname))
File "C:\Python27\lib\glob.py", line 38, in iglob
if not has_magic(pathname):
The contents of 'test.csv':
Test,File
1,2
3,4
5,6
I'm pretty new to this system, just saw it on the blogpost. Is 2.7 not supported, have I done something fundamentally wrong, or have I found a bug? I pretty much can't get anything to work beyond simple built-in types:
>>> into(list, {1,2,3,4,5})
[1, 2, 3, 4, 5]
Sorry for opening a new ticket, but I'm not sure how to reopen the old one ...
Most of the tests are passed on my Windows, Python 2.7 computer, except for:
Connection forbidden: Most probably related to a firewall on my computer ...
_______________________ ERROR collecting test_mongo.py ________________________
test_mongo.py:15: in <module>
←[1m conn = pymongo.MongoClient()←[0m
..\..\..\pymongo\mongo_client.py:377: in __init__
←[1m raise ConnectionFailure(str(e))←[0m
←[1m←[31mE ConnectionFailure: [Errno 10061] Es konnte keine Verbindung hergestellt werden, da der Zielcomputer die Verbindung verweigerte←[0m
Test file missing .. (I used pip install to install package to my local python installation). Test is passed when airline.sas7bdat is available in the test directory.
________________________ ERROR collecting test_sas.py _________________________
test_sas.py:18: in <module>
←[1m sasfile = SAS7BDAT(test_path)←[0m
..\..\..\sas7bdat.py:422: in __init__
←[1m self._file = open(self.path, 'rb')←[0m
←[1m←[31mE IOError: [Errno 2] No such file or directory: 'c:\\Python27\\Lib\\site-packages\\into\\backends\\tests\\airline.
sas7bdat'←[0m
'ssh' is not defined in 'except socket.error:'
________________________ ERROR collecting test_ssh.py _________________________
test_ssh.py:25: in <module>
←[1m ssh.close()←[0m
←[1m←[31mE NameError: name 'ssh' is not defined←[0m
←[1m←[31m========== 137 passed, 5 skipped, 2 xfailed, 3 error in 7.10 seconds ==========←[0m
They're currently hosted at http://into.readthedocs.org/en/latest/ but we can change the domain.
This should be possible with some CNAME
magic in the right place.
Following this notebook I thought it would be nice to test with my data in a bcolz file. Unfortunately, I ran into a NotImplementedError
In [1]: from into import into
In [2]: into('data.bcolz', 'receivers.csv')
---------------------------------------------------------------------------
NotImplementedError Traceback (most recent call last)
<ipython-input-2-e781d0c64487> in <module>()
----> 1 into('data.bcolz', 'receivers.csv')
/home/rifr/anaconda3/envs/sonair/lib/python3.4/site-packages/multipledispatch/dispatcher.py in __call__(self, *args, **kwargs)
161 self._cache[types] = func
162 try:
--> 163 return func(*args, **kwargs)
164
165 except MDNotImplementedError:
/home/rifr/anaconda3/envs/sonair/lib/python3.4/site-packages/into-0.2.1-py3.4.egg/into/into.py in into_string_string(a, b, **kwargs)
118 def into_string_string(a, b, **kwargs):
119 r = resource(b, **kwargs)
--> 120 return into(a, r, **kwargs)
121
122
/home/rifr/anaconda3/envs/sonair/lib/python3.4/site-packages/multipledispatch/dispatcher.py in __call__(self, *args, **kwargs)
161 self._cache[types] = func
162 try:
--> 163 return func(*args, **kwargs)
164
165 except MDNotImplementedError:
/home/rifr/anaconda3/envs/sonair/lib/python3.4/site-packages/into-0.2.1-py3.4.egg/into/into.py in into_string(uri, b, **kwargs)
111 resource_ds = ds
112
--> 113 a = resource(uri, dshape=resource_ds, expected_dshape=ds, **kwargs)
114 return into(a, b, dshape=ds, **kwargs)
115
/home/rifr/anaconda3/envs/sonair/lib/python3.4/site-packages/into-0.2.1-py3.4.egg/into/regex.py in __call__(self, s, *args, **kwargs)
62
63 def __call__(self, s, *args, **kwargs):
---> 64 return self.dispatch(s)(s, *args, **kwargs)
65
66 @property
/home/rifr/anaconda3/envs/sonair/lib/python3.4/site-packages/into-0.2.1-py3.4.egg/into/resource.py in resource_all(uri, *args, **kwargs)
98 discover
99 """
--> 100 raise NotImplementedError("Unable to parse uri to data resource: " + uri)
101
102
NotImplementedError: Unable to parse uri to data resource: data.bcolz
I tested with the following small table.
name,runway,takeoff,datetime_nearest_close
S28,28,TRUE,A
S16,16,TRUE,Q
L14,14,FALSE,I
conda list | grep -E 'into|blaze|bcolz'
bcolz v0.7.1_91_g0cebb5a np19py34_1
blaze 0.7.1.post036.gd7cfa0e np19py34_149
into 0.2.1 113_g229f8a7
I tried also with into
from master but no luck.
The Chunks
type is really just anything over which we can iterate. It doesn't actually have to be used in a chunking workflow. It might be best to rename it appropriately.
Separately, we might want to make an IndexableOf
type that supports both iteration and also indexing. Whereas IterableOf(a)
takes an iterable or a function f :: -> [a]
, IndexableOf(a)
might take either an indexable (like a list) or a function f :: int -> a
and a number of elements. IndexableOf
might also be extended to something n-dimensional, so possibly f :: [int] -> a
and a shape rather than a number of elements.
For operations on files with unknown extensions like the following
$ into s3://mybucket/myfile.xyz myfile.xyz
We currently fail because we can't assign a particular node based off of the extension. One solution would be to create a Blob
type to handle unknown things like this.
Sometimes we might want to do a computation and then keep the result in the database as another table. To do this Blaze needs an append(sqlalchemy.Table, sqlalchemy.Select)
implementation. Presumably there is some way to generate an approprate INSERT INTO ...
statement through sqlalchemy.
Sometimes fields in tables were created with the AUTOINCREMENT command.
into
should ignore this column and (let sqlalchemy) take care of this field when a user inserts data that does not have this field defined.
import os
import sqlite3
import pandas as pd
import numpy as np
from blaze import *
DB = 'test.db'
TABLE = 'position'
try:
os.remove(DB)
except OSError:
pass
conn = sqlite3.connect(DB)
conn.execute(
'''CREATE TABLE {} (
id integer NOT NULL PRIMARY KEY AUTOINCREMENT,
x real NOT NULL,
y real NOT NULL,
z real NOT NULL
)'''.format(TABLE)
)
conn.close()
positions = pd.DataFrame(np.random.randn(10, 3), columns=['x', 'y', 'z'])
table_url = 'sqlite:///' + DB + '::' + TABLE
into(table_url, positions)
Is there any plan to extend odo data migrator to the sas and stata data format ?
For the time being, one have to go through pandas.DataFrame to simply convert these files to HDF5.
xref #97
it seems that a call to conn.get_bucket()
can't succeed in a reasonable amount of time
A common use of into
would be to import/convert all of the csv files in a directory. This works fine if there is a prefix to the asterix, e.g.,
into(pd.DataFrame, 'examp*.csv')
but errors if there is no prefix,
>>> into(pd.DataFrame, '*.csv')
...
IOError: File *.csv does not exist
The into
command #69 provides a command line interface to the into
function
into myfile.csv myfile.json --delimter ;
Here is a list of possible improvements
into
import times and latencyinto myfile.csv --delimiter ; myfile.json
man into
, into csv --help
, etc...@cpcloud and @quasiben came up with most of these ideas. Probably I'm forgetting a few. Please treat this comment as wiki space and add appropriately.
Sucking data from a database into memory with Python is slow. The iopro
module might be nice for a fast SQL -> np.ndarray
route for convert.
@convert.register(np.ndarray, sqlalchemy.Table)
def table_to_array(tbl, **kwrags):
turn table into ndarray and return
Converting from SQL (SQLite) to a recarray
results in a normal array. Converting from CSV to recarray
works fine.
In [9]: rcv = into(np.recarray, FILE_RECEIVERS)
In [10]: type(rcv)
Out[10]: numpy.core.records.recarray
In [11]: rcv = into(np.recarray, TABLE_RECEIVERS)
In [12]: type(rcv)
Out[12]: numpy.ndarray
Could this be due to cost=0.0
for np.array
to np.recarray
?
Converting first into DataFrame
and then into recarray
works.
We lack direct routes to copy data between remote computers without touching the local system.
into('ssh://A:myfile.csv', 'ssh://A:myfile2.csv') # Should be a remote local copy
into('ssh://A:myfile.csv', 'ssh://B:myfile.csv') # Should result in a remote call to scp
into('hdfs://A:myfile.csv', 'ssh://A.myfile.csv') # Should result in hadoop fs calls
into('ssh://A:myfile.csv', 'hdfs://A.myfile.csv') # Should result in hadoop fs calls
...
When interacting with AWS the inter-node bandwidth is very high. It'd be nice to leverage this instead of pulling down and pushing up from local.
ipdb> convert(pd.Series, set([1,2,3]))
*** AttributeError: 'NoneType' object has no attribute 'measure'
this is because discover
isn't defined on set
s
In 0.2.1, importing multiple csv's, at least one of which has null values, results in errors.
Error:
Exception: Integer column has NA values
Example:
examp1.csv:
a,b,c
1,2,3
4,5,6
7,8,9
examp2.csv:
a,b,c
1,2,3
4,,6
7,8,9
examp.py
into(pd.DataFrame, 'examp*.csv')
Error:
ValueError: cannot convert float NaN to integer
Example:
examp1.csv:
a,b,c
1,2,3
4,,6
7,8,9
examp2.csv:
a,b,c
1,,2
3,4,5
6,7,8
examp.py
into(pd.DataFrame, 'examp*.csv')
In both of these cases importing the csv's one at a time works fine.
When we interact with Python data we use full heuristic mode. This does things like say that discover('1.0') == float64
. This is appropriate when we guess the datashape of data in text files like CSV. This isn't appropriate when we grab data from Mongo which is already somewhat typed. In these cases a string is a string and we shouldn't do any guess work.
Perhaps the dual meaning of discover
should be made more explicit.
Redshift has an stl_load_errors
table that we should take advantage of
Looks like release 3.0 and master think the version is 0.2.0
See: https://github.com/ContinuumIO/odo/blob/master/odo/__init__.py#L67
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.