Git Product home page Git Product logo

omniduct's Introduction

Omniduct

PyPI - Version PyPI - Python Version PyPI - Status Build Status Documentation Status

omniduct provides uniform interfaces for connecting to and extracting data from a wide variety of (potentially remote) data stores (including HDFS, Hive, Presto, MySQL, etc).

It provides:

  • A generic plugin-based programmatic API to access data in a consistent manner across different services (see supported protocols).
  • A framework for lazily connecting to data sources and maintaining these connections during the entire lifetime of the relevant Python session.
  • Automatic port forwarding of remote services over SSH where connections cannot be made directly.
  • Convenient IPython magic functions for interfacing with data providers from within IPython and Jupyter Notebook sessions.
  • Utility classes and methods to assist in maintaining registries of useful services.

omniduct's People

Contributors

danfrankj avatar delirious-lettuce avatar gloutsch avatar gthomas-slack avatar harsham4026 avatar kination avatar ljharb avatar matthewwardrop avatar naoyak avatar s3bw avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

omniduct's Issues

Some issues for cassandra protocol

Hello.
I'm trying to update protocol to support Cassandra DB. While working on, I have question for this.

from cassandra.cluster import Cluster

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('testks')

Cassandra requires List of IPs of cluster which compose database, and currently Duct receives string value for host. Which will be the good way to adapt this?

Thanks.

Hive dataframe_to_table failing on non-remote execution

When executing HiveServer2Client.dataframe_to_table() with non-SSH execution:

RuntimeError: 18/09/22 00:45:46 WARN conf.HiveConf: DEPRECATED: Configuration property hive.metastore.local no longer has any effect. Make sure to provide a valid value for hive.metastore.uris if you are connecting to a remote metastore.

Logging initialized using configuration in jar:file:/mnt/var/opt/CDH-5.3.3-1.cdh5.3.3.p2826.3087/jars/hive-common-0.13.1-cdh5.3.3.jar!/hive-log4j.properties
FAILED: ParseException line 3:29 character '\' not supported here
line 3:30 rule Identifier failed predicate: {allowQuotedId()}?
line 3:36 character '\' not supported here
line 3:37 rule Identifier failed predicate: {allowQuotedId()}?
line 3:39 character '\' not supported here
line 3:40 rule Identifier failed predicate: {allowQuotedId()}?
line 3:56 character '\' not supported here
line 3:58 character '<EOF>' not supported here

Connecting to presto through https

I'm trying to connect to presto which is reachable only via SSL. How is this achieved with omniduct? I note that pyhive have support for https, but I didn't see any way in documentation for me to specify this is available over HTTPS only.

Advice appreciated.

How about setting up some example code?

Hello.
How about add some example file, or setup in README file?
I'm trying to add some driver with other DB and try to make it fit with default duct, but I couldn't do it well because I'm not sure my test code are going right.

I think guide for API will take some time, so instead of that just sample anyone using currently could be great help.
Thanks.

tests?

this thing needs more tests. I can not figure out how to connect to filesystem or databases and load data

Add on test case

Hello.
Do you have a plan for updating test cases? If so, I just want to know how you are going to make and attach to CI process(currently in travis).
Also, how do you make a test case which need to be access on database? As I know database package should be installed for using it in test case, and it means it will be needed to be added in travis script too. Am I right?
I'm not quite good on this, so If you have some experience about it, please let me know.

Thanks.

DatabaseClient stream() and stream_to_file()

Current behaviour:

from omniduct.duct import Duct
duct = Duct.for_protocol(protocol='sqlalchemy')(...)

query = 'SELECT * FROM ...'

# 1
duct.stream(query, format='csv', batch=2)

# 2
duct.stream_to_file(query, '.../data.csv', batch=2)

# 3
duct.stream_to_file(query, '.../data.csv')

1: Batched stream() to memory repeatedly writes the column names with each batch.

2: Thus, when wrapped by stream_to_file(), the column names are written to file repeatedly for each batch

Eg:

State,City
California,San Francisco
Oregon,Portland
State,City
Texas,Houston
California,Los Angeles

3: When batch=None, stream(), and thus stream_to_file() does not write column names at all. So the output data file will not contain a column names header.

Eg:

California,San Francisco
Oregon,Portland
Texas,Houston
California,Los Angeles

In my opinion, the desired behaviour should be:

  • When streaming to csv file, the column names should be written once, as a header.
  • When streaming to memory, the generator should return only row data (no column names), like a cursor would.

What do you think about this? I can open a PR to get this done.

Thanks.

NoSQL database support

Currently we only support SQL databases; but it maybe interesting to think about adding a generic NoSQL database backend, which can be subclassed for specific databases like MongoDB or DynamoDB. We will probably think about this post 1.0.0.

@djKooks This would make your use cases feasible too.

More examples

Hello.
I'm working with some protocol, and trying to add as example. Currently, there are only few examples in document(and only for Presto), it seems good to add more.

To add on, which way will be good to add on project?

  • In document
  • Making new example directory, and add as python file
  • Making new example directory, and add as jupyter notebook file

How do you think?

Thanks.

Nonstandard YAML config format results in crash

In Omniduct 1.x, a YAML format Duct config with inconsistent depths can cause Duct.__init__ to crash.

Format is something like this:

data_environments:
    env1:
        databases:
            presto:
                protocol: presto
                host: localhost
                port: 9999
                catalog: silver
                schema: default
            hive:
                protocol: hiveserver2
                host: localhost
                port: 3333
                schema: default
                driver: pyhive
        metastore:
            host: localhost
            port: 5555

     env2:
        databases:
            presto:
                protocol: presto
                host: localhost
                port: 6666
                catalog: gold
                schema: default
            hive:
                protocol: hiveserver2
                host: localhost
                port: 2222
                cache: local
                schema: default
                driver: pyhive
        metastore:
            host: localhost
            port: 8888

default_data_environment: env1

Error:

/efs/home/naoya_kanai/repos/omniduct/omniduct/registry.py in __init__(self, config)
     56         if config:
     57             print('init: ', config)
---> 58             self.register_from_config(config)
     59 
     60     def __repr__(self):

/efs/home/naoya_kanai/repos/omniduct/omniduct/registry.py in register_from_config(self, config)
    256             names = duct_config.pop('name')
    257             print(names)
--> 258             protocol = duct_config.pop('protocol')
    259             register_magics = duct_config.pop('register_magics', True)
    260             try:

KeyError: 'protocol'

If table exists, then append and replace do not work when adding a dataframe to a table

In the sqlalchemy clinet, when I try to replace or append a pandas dataframe to a table, if it exists, I get the error of the table already existing.

When I append I get this error:
ProgrammingError: (psycopg2.errors.DuplicateTable)

When I replace I get this error:
ProgrammingError: (psycopg2.errors.DuplicateTable)

The code is similar to: (both df's are in same format, and have same columns)

dataframe_to_table(df, table="my_table")
dataframe_to_table(df2, table="my_table", if_exists="replace")   # or if_exists="append"

The only solution I have had to work around this is to query the table, concatenate with the new dataframe, drop my table and then recreate it with just using the dataframe_to_table method

Is this method broken?

Plan for documentation

Is there a plan for documentation? Because it is bit complicate to start over clearly.

Handling connection failover

@matthewwardrop are there plans to support connection failover?

For example I have two postgres instances one of which is a replica, when connection to the replica fails, I'd like to connect to the other instance. How would you recommend going about handling this case?

For me I see the option of having library try to connection on ip:host after failing, attempt the connection on a different ip:host combination.

This would also result in us having to think about the connection yaml.

Some case in connecting postgresql

Hello.
This is the bug(I'm not sure it is...) while working on some test case with connecting postgresql. I refered the sample in README, so I'm not sure it has been implemented well.

duct_registry = DuctRegistry()
sql_alchemy_client = duct_registry.new(names='ps_duct', protocol='postgresql', host='localhost', port=5432)
sql_alchemy_client.query('CREATE DATABASE ps_duct_local')

This causes issue sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) FATAL: role "_mysql" does not exist, but it was bit weird so I found out some more in database uri.

When connecting via sqlalchemy, uri for create_engine shows as postgresql://_mysql@localhost:5432/. It looks like it needs account info for access.

Am I doing something wrong?
Thanks.

Access problem in hiveserver2

Hello,
It seems my hiveserver2 client sample does not working well. Could you look on?

from omniduct.databases.hiveserver2 import HiveServer2Client
vb_host = '192.168.56.1'
vb_port = 22

hsc = HiveServer2Client(host=vb_host, port=vb_port, driver='impyla')
hsc.is_connected()  # return false

Am I doing something wrong?
If you have some simple example, please share it.

Thanks.

ssh connect error

from omniduct import Duct
DB_HOST = "data03"
remote = Duct.for_protocol('ssh')(host=DB_HOST, port=22)
remote.connect()

SSHClient: Connecting: Connecting to data03:22.Traceback (most recent call last):
File "", line 1, in
File "/data/venv/lib/python3.6/site-packages/omniduct/remotes/base.py", line 159, in connect
Duct.connect(self)
File "/data/venv/lib/python3.6/site-packages/decorator.py", line 232, in fun
return caller(func, *(extras + args), **kw)
File "/data/venv/lib/python3.6/site-packages/omniduct/utils/debug.py", line 275, in logging_scope
raise_with_traceback(e)
File "/data/venv/lib/python3.6/site-packages/future/utils/init.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
File "/data/venv/lib/python3.6/site-packages/omniduct/utils/debug.py", line 271, in logging_scope
f = func(*args, **kwargs)
File "/data/venv/lib/python3.6/site-packages/omniduct/duct.py", line 469, in connect
raise_with_traceback(e)
File "/data/venv/lib/python3.6/site-packages/future/utils/init.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
File "/data/venv/lib/python3.6/site-packages/omniduct/duct.py", line 466, in connect
self._connect()
File "/data/venv/lib/python3.6/site-packages/omniduct/remotes/ssh.py", line 162, in _connect
'{}\n\n Please report this!'.format(cmd)
AssertionError: Unexpected failure to establish a connection with the remote host with command:
ssh work@data03 -MT -S /home/work/.ssh/omniduct/work@data03 -o ControlPersist=yes -o StrictHostKeyChecking=no -o NoHostAuthenticationForLocalhost=yes -o ServerAliveInterval=60 -o ServerAliveCountMax=2 'exit'

Please report this!

but,ssh connection use cli works,

Hive DataFrame columns and push errors

HiveServer2Client has been acting weirdly since 0.9.0 and I can't seem to track down what the root causes are. Not sure if this is a config error. Any ideas @matthewwardrop ?

  1. Table name appended to columns when querying SELECT * FROM...
    When querying with SELECT * FROM <table>, the returned dataframe has table names appended with a .
    screen shot 2018-09-13 at 3 45 22 pm

  2. HiveServer2Client.dataframe_to_table() not working

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-8-8116e3eff828> in <module>()
----> 1 ap.hive.dataframe_to_table(df1, table='naoya.some_temp_table')

<decorator-gen-155> in dataframe_to_table(self, df, table, if_exists, **kwargs)

~/miniconda/envs/airpydev/lib/python3.6/site-packages/omniduct/duct.py in wrapped(f, *args, **kw)
     68         @decorator.decorator
     69         def wrapped(f, *args, **kw):
---> 70             return f(*args, **kw)
     71 
     72         mro = inspect.getmro(cls)

<decorator-gen-147> in dataframe_to_table(self, df, table, if_exists, **kwargs)

~/miniconda/envs/airpydev/lib/python3.6/site-packages/omniduct/utils/debug.py in logging_scope(func, *args, **kwargs)
    270         except Exception as e:
    271             success = False
--> 272             raise_with_traceback(e)
    273         finally:
    274             logger._scope_exit(success)

~/.local/lib/python3.6/site-packages/future/utils/__init__.py in raise_with_traceback(exc, traceback)
    417         if traceback == Ellipsis:
    418             _, _, traceback = sys.exc_info()
--> 419         raise exc.with_traceback(traceback)
    420 
    421 else:

~/miniconda/envs/airpydev/lib/python3.6/site-packages/omniduct/utils/debug.py in logging_scope(func, *args, **kwargs)
    266         success = True
    267         try:
--> 268             f = func(*args, **kwargs)
    269             return f
    270         except Exception as e:

~/miniconda/envs/airpydev/lib/python3.6/site-packages/omniduct/databases/base.py in dataframe_to_table(self, df, table, if_exists, **kwargs)
    599         """
    600         assert if_exists in {'fail', 'replace', 'append'}
--> 601         self.connect()._dataframe_to_table(df, self._parse_namespaces(table), if_exists=if_exists, **kwargs)
    602 
    603     # Table properties

~/miniconda/envs/airpydev/lib/python3.6/site-packages/omniduct/databases/hiveserver2.py in _dataframe_to_table(self, df, table, if_exists, schema, use_hive_cli, partition, sep, table_props, dtype_overrides, **kwargs)
    356             proc = self._run_in_hivecli(stmts)
    357             if proc.returncode != 0:
--> 358                 raise RuntimeError(proc.stderr.decode('utf-8'))
    359         finally:
    360             # Clean up files

RuntimeError: /bin/sh: naoya: command not found
/bin/sh: some_temp_table: command not found
/bin/sh: naoya: command not found
/bin/sh: some_temp_table: command not found
18/09/13 22:46:25 WARN conf.HiveConf: DEPRECATED: Configuration property hive.metastore.local no longer has any effect. Make sure to provide a valid value for hive.metastore.uris if you are connecting to a remote metastore.

Logging initialized using configuration in jar:file:/mnt/var/opt/CDH-5.3.3-1.cdh5.3.3.p2826.3087/jars/hive-common-0.13.1-cdh5.3.3.jar!/hive-log4j.properties
NoViableAltException(17@[184:1: tableName : (db= identifier DOT tab= identifier -> ^( TOK_TABNAME $db $tab) |tab= identifier -> ^( TOK_TABNAME $tab) );])
	at org.antlr.runtime.DFA.noViableAlt(DFA.java:158)
	at org.antlr.runtime.DFA.predict(DFA.java:144)
	at org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.tableName(HiveParser_FromClauseParser.java:4927)
	at org.apache.hadoop.hive.ql.parse.HiveParser.tableName(HiveParser.java:40229)
	at org.apache.hadoop.hive.ql.parse.HiveParser.createTableStatement(HiveParser.java:4408)
	at org.apache.hadoop.hive.ql.parse.HiveParser.ddlStatement(HiveParser.java:2138)
	at org.apache.hadoop.hive.ql.parse.HiveParser.execStatement(HiveParser.java:1392)
	at org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:1030)
	at org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:199)
	at org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:166)
	at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:417)
	at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:335)
	at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1026)
	at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1091)
	at org.apache.hadoop.hive.ql.Driver.run(Driver.java:962)
	at org.apache.hadoop.hive.ql.Driver.run(Driver.java:952)
	at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:269)
	at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:221)
	at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:431)
	at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:367)
	at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:750)
	at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:694)
	at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:633)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:483)
	at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
FAILED: ParseException line 2:47 cannot recognize input near 'naoya_kanai' '.' '.' in table name

Plan to support DB in cloud service?

Hello. It looks like a project with good concept. Nice to see it.
Just one question about roadmap. Does it have a plan to have a connector for DB on cloud service, such as DynamoDB or else?

Thanks.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.