Git Product home page Git Product logo

connector-x's Introduction

ConnectorX status discussions Downloads

Load data from to , the fastest way.

ConnectorX enables you to load data from databases into Python in the fastest and most memory efficient way.

What you need is one line of code:

import connectorx as cx

cx.read_sql("postgresql://username:password@server:port/database", "SELECT * FROM lineitem")

Optionally, you can accelerate the data loading using parallelism by specifying a partition column.

import connectorx as cx

cx.read_sql("postgresql://username:password@server:port/database", "SELECT * FROM lineitem", partition_on="l_orderkey", partition_num=10)

The function will partition the query by evenly splitting the specified column to the amount of partitions. ConnectorX will assign one thread for each partition to load and write data in parallel. Currently, we support partitioning on numerical columns (cannot contain NULL) for SPJA queries.

Installation

pip install connectorx

Check out here to see how to build python wheel from source.

Performance

We compared different solutions in Python that provides the read_sql function, by loading a 10x TPC-H lineitem table (8.6GB) from Postgres into a DataFrame, with 4 cores parallelism.

Time chart, lower is better.

time chart

Memory consumption chart, lower is better.

memory chart

In conclusion, ConnectorX uses up to 3x less memory and 21x less time (3x less memory and 13x less time compared with Pandas.). More on here.

How does ConnectorX achieve a lightning speed while keeping the memory footprint low?

We observe that existing solutions more or less do data copy multiple times when downloading the data. Additionally, implementing a data intensive application in Python brings additional cost.

ConnectorX is written in Rust and follows "zero-copy" principle. This allows it to make full use of the CPU by becoming cache and branch predictor friendly. Moreover, the architecture of ConnectorX ensures the data will be copied exactly once, directly from the source to the destination.

How does ConnectorX download the data?

Upon receiving the query, e.g. SELECT * FROM lineitem, ConnectorX will first issue a LIMIT 1 query SELECT * FROM lineitem LIMIT 1 to get the schema of the result set.

Then, if partition_on is specified, ConnectorX will issue SELECT MIN($partition_on), MAX($partition_on) FROM (SELECT * FROM lineitem) to know the range of the partition column. After that, the original query is split into partitions based on the min/max information, e.g. SELECT * FROM (SELECT * FROM lineitem) WHERE $partition_on > 0 AND $partition_on < 10000. ConnectorX will then run a count query to get the partition size (e.g. SELECT COUNT(*) FROM (SELECT * FROM lineitem) WHERE $partition_on > 0 AND $partition_on < 10000). If the partition is not specified, the count query will be SELECT COUNT(*) FROM (SELECT * FROM lineitem).

Finally, ConnectorX will use the schema info as well as the count info to allocate memory and download data by executing the queries normally.

Once the downloading begins, there will be one thread for each partition so that the data are downloaded in parallel at the partition level. The thread will issue the query of the corresponding partition to the database and then write the returned data to the destination row-wise or column-wise (depends on the database) in a streaming fashion.

Supported Sources & Destinations

Example connection string, supported protocols and data types for each data source can be found here.

For more planned data sources, please check out our discussion.

Sources

  • Postgres
  • Mysql
  • Mariadb (through mysql protocol)
  • Sqlite
  • Redshift (through postgres protocol)
  • Clickhouse (through mysql protocol)
  • SQL Server
  • Azure SQL Database (through mssql protocol)
  • Oracle
  • Big Query
  • Trino
  • ODBC (WIP)
  • ...

Destinations

  • Pandas
  • PyArrow
  • Modin (through Pandas)
  • Dask (through Pandas)
  • Polars (through PyArrow)

Documentation

Doc: https://sfu-db.github.io/connector-x/intro.html Rust docs: stable nightly

Next Plan

Checkout our discussion to participate in deciding our next plan!

Historical Benchmark Results

https://sfu-db.github.io/connector-x/dev/bench/

Developer's Guide

Please see Developer's Guide for information about developing ConnectorX.

Supports

You are always welcomed to:

  1. Ask questions & propose new ideas in our github discussion.
  2. Ask questions in stackoverflow. Make sure to have #connectorx attached.

Organizations and Projects using ConnectorX

To add your project/organization here, reply our post here

Citing ConnectorX

If you use ConnectorX, please consider citing the following paper:

Xiaoying Wang, Weiyuan Wu, Jinze Wu, Yizhou Chen, Nick Zrymiak, Changbo Qu, Lampros Flokas, George Chow, Jiannan Wang, Tianzheng Wang, Eugene Wu, Qingqing Zhou. ConnectorX: Accelerating Data Loading From Databases to Dataframes. VLDB 2022.

BibTeX entry:

@article{connectorx2022,
  author    = {Xiaoying Wang and Weiyuan Wu and Jinze Wu and Yizhou Chen and Nick Zrymiak and Changbo Qu and Lampros Flokas and George Chow and Jiannan Wang and Tianzheng Wang and Eugene Wu and Qingqing Zhou},
  title     = {ConnectorX: Accelerating Data Loading From Databases to Dataframes},
  journal   = {Proc. {VLDB} Endow.},
  volume    = {15},
  number    = {11},
  pages     = {2994--3003},
  year      = {2022},
  url       = {https://www.vldb.org/pvldb/vol15/p2994-wang.pdf},
}

Contributors

wangxiaoying
Xiaoying Wang
dovahcrow
Weiyuan Wu
Wukkkinz-0725
Null
Yizhou150
Yizhou
zen-xu
ZhengYu, Xu
wseaton
Will Eaton
AnatolyBuga
Anatoly Bugakov
Jordan-M-Young
Jordan M. Young
domnikl
Dominik Liebler
auyer
Rafael Passos
jinzew
Null
gruuya
Marko Grujic
alswang18
Alec Wang
lBilali
Lulzim Bilali
ritchie46
Ritchie Vink
houqp
QP Hou
wKollendorf
Null
glennpierce
Glenn Pierce
jorgecarleitao
Jorge Leitao
quambene
Null
CBQu
CbQu
tschm
Thomas Schmelzer
maxb2
Matthew Anderson
therealhieu
Hieu Minh Nguyen
FerriLuli
FerriLuli
alexander-beedie
Alexander Beedie
zzzdong
Null
venkashank
Null
surister
Ivan
phanindra-ramesh
Null
messense
Messense
kotval
Kotval
albcunha
Null
rursprung
Ralph Ursprung
MatsMoll
Mats Eikeland Mollestad
marianoguerra
Mariano Guerra
kevinheavey
Kevin Heavey
kayhoogland
Kay Hoogland
deepsourcebot
DeepSource Bot
AndrewJackson2020
Andrew Jackson
Cabbagec
Brandon
Amar1729
Amar Paul
aljazerzen
Aljaž Mur Eržen

connector-x's People

Contributors

alexander-beedie avatar alswang18 avatar anatolybuga avatar auyer avatar cbqu avatar dependabot[bot] avatar domnikl avatar dovahcrow avatar ferriluli avatar github-actions[bot] avatar glennpierce avatar gruuya avatar houqp avatar jinzew avatar jordan-m-young avatar jorgecarleitao avatar lbilali avatar maxb2 avatar quambene avatar ritchie46 avatar therealhieu avatar tschm avatar venkashank avatar wangxiaoying avatar wkollendorf avatar wseaton avatar wukkkinz-0725 avatar yizhou150 avatar zen-xu avatar zzzdong avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

connector-x's Issues

Feature gate dependencies

With the new arrow support, I think connector-x has value as a rust crate as well. However, as a crate owner writing a library, I'd not want to depend on connector-x because it has a lot of dependencies (increasing compile times), I'd don't want to compile.

I think it would be really valuable if the dependencies could be opt-in and activated with feature gates.

Support Array types

Hi,

I tried to use ConnectorX to load dataframe from a postgreSQL database with the following code:

import connectorx as cx

tableName   = "semantic_search"
dataFrame = cx.read_sql('postgresql://postgres:postgres@localhost:5432/embeddings_sts_tf', "select * from " + tableName)

but got the following error:

thread '<unnamed>' panicked at 'not implemented: _float8', connectorx/src/sources/postgres/typesystem.rs:78:22
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
  File "/home/matthieu/Code/Python/postgreSQL_creation.py", line 142, in <module>
    dataFrame = cx.read_sql('postgresql://{user}:{pw}@{host}:5432/{db}'.format(host=conn_info["host"], db=conn_info["database"], user=conn_info["user"], pw=conn_info["password"]), "select * from " + tableName)
  File "/home/matthieu/anaconda3/envs/sts-transformers-gpu-fresh/lib/python3.8/site-packages/connectorx/__init__.py", line 99, in read_sql
    result = _read_sql(
pyo3_runtime.PanicException: not implemented: _float8

Thanks for helping!

MySQL source parsing NULL value error

import polars as pl
import pandas as pd
from sqlalchemy import create_engine
import pyarrow
# 
print(pl.__version__)
# 0.8.20
print(pd.__version__)
# 1.3.0
pip list | grep connector*
# connectorx                    0.2.0
pyarrow.__version__
# '4.0.1'

# pandas first
sql = '''select ORDER_ID from tables     '''
engine = create_engine('mysql+pymysql://root:***@*.*.*.*:*')
df = pd.read_sql_query(sql, engine)
df.dtypes
# ORDER_ID                      int64

# polars second
conn = "mysql://root:***@*.*.*.*:*"
pdf = pl.read_sql(sql, conn)
`
---------------------------------------------------------------------------
PanicException                            Traceback (most recent call last)
<timed exec> in <module>

~/miniconda3/envs/test/lib/python3.8/site-packages/polars/io.py in read_sql(sql, connection_uri, partition_on, partition_range, partition_num)
    556     """
    557     if _WITH_CX:
--> 558         tbl = cx.read_sql(
    559             conn=connection_uri,
    560             query=sql,

~/miniconda3/envs/test/lib/python3.8/site-packages/connectorx/__init__.py in read_sql(conn, query, return_type, protocol, partition_on, partition_range, partition_num)
    126             raise ValueError("You need to install pyarrow first")
    127 
--> 128         result = _read_sql(
    129             conn,
    130             "arrow",

PanicException: Could not retrieve i64 from Value

`

Gen figures

Baselines: Dask, Modin (Dask), Pandas
Parameters: # partitions, network bandwidth (200Mbps, 10Gbps), pandas with chunk,
Metrics: time, peak mem

Improve pandas dataframe allocation

Currently it takes 30s for allocating the tpch lineitem x 10 table. To compare, rust requires 10us (maybe unfair), pure numpy takes 4 secs.

produce borrowed types

Currently, Produce<T> does not carry a lifetime. This means it can only produce owned types but not borrowed types. Producing borrowed types is important to the zero-copy goal.

We are currently fine even with zero-copy string because Postgres uses Bytes for the internal buffer. Cloning a Bytes just means cloning the Arc under it. However, this might not be true for arbitrary data sources.

I had an initial attempt under the borrowed-produce branch, but still, some works need to be done to make it compile.

Employee Department MYSQL Creation Issue

We had given task as below attachment -

Employee Issue 1

We written below query which is not working i.e. it does not fetching desire result -

With New Dept_id as (select  x.dpt_code from department x where x.dpt_code not in 
(select e.dpt_id from emplyee e join
employee e where e.dep_id=d.dpt_code))
With New Dept_name as (select  x.dpt_name from department x )
SELECT employee.emp_id, employee.emp_name, employee.hire_date, employee.jon_name, employee.dept_id, New Dept_id, New Dept_name from employee JOIN department on (employee.dep_id=department.dpt_code) JOIN 

Cache 0.1

Summary

Simple cache support for query result in order to speed up loading the same query multiple times. Extensible to more sophisticated design and implementation.

Design-level Explanation Actions

  • Investigate existing cache / materialized view solutions for external storage
  • Decide the scope of the first version:
    • what to cache (admission and eviction)
    • how to use the cache
    • how to maintain the cache
  • Decide the storage engine to store local cache in the first version (sqlite)
  • Design the implementation of cache logic

Design-level Explanation

Scope of Cache 0.1

  • What to cache
    • Manually add query result to cache
    • Do not remove cached data (do not consider limited disk space)
  • How to use the cache
    • Load the entire cache if and only if exactly match on the entire query (without do any filtering or manipulation)
  • How to maintain the cache
    • Manually force refreshing the cache when issuing query
    • Refresh the entire cache (do not consider incremental refresh)

API Design

User Interface

def read_sql(..., enable_cache=True, force_query=False)
  • enable_cache(string or bool, optional(default True)) - Whether or not to cache the result data. If False is set, do not cache the result. If True is set, cache the result to /tmp with connection and sql as name. If a string is set, cache the result to the corresponding path.
  • force_query(bool, optional(default False)) - Whether or not to force download the data from database no matter there is a cache or not. If True, also update the local cache.

Logical workflow

workflow

Cache Module Implementation

pub trait Cache {
    fn init(conn: str) -> Result<()>; // init cache source, init metadata if not exists
    fn query_match(query: str) -> Result<(Vec<str>, Vec<str>)>; // lookup metadata, split query into probe query and remainder query, and partition each query
    fn post_execute(dests: Vec<Box<dyn Destination>>) -> Result<Destination>; // produce final result
}

overview

Implementation-level Explanation

Left: current implementation, Right: implementation supporting cache

implementation

Either cache_queries or db_queries will be empty in the first version that only support exactly match.

Rational and Alternatives

  • Able to extend to more cache backends, data format and policies in the future.
  • Easy to incorporate in the current workflow of ConnectorX
  • Able to make use current loading and writing mechanism in ConnectorX

Prior Art

Some related works:

Future Possibilities

  • Support different external storages for cache (e.g. Redis)
  • Support using cache on partial match queries
    • partial attribute
    • partial predicate
    • partial table (in join)
  • Admission and eviction policy given limited space budget
  • Incremental refresh the cache

Implementation-level Actions

  • Add cache source on decided storage
  • Add cache destination on decided storage
  • Support multiple source to multiple destination logic (source partition -> destination partition combinator)
  • read_sql API support
  • Add tests
  • Add documentation

Additional Tasks

  • This task is put into a correct pipeline (Development Backlog or In Progress).
  • The label of this task is setting correctly.
  • The issue is assigned to the correct person.
  • The issue is linked to related Epic.
  • The documentation is changed accordingly.
  • Tests are added accordingly.

Update Connection Pooling Crate

It would be nice if we could get logs from rust over the wire for debug purposes, preferably configurable from the Python client.

I have a supposedly posgtres compatible source that fails due to

RuntimeError: Cannot get metadata for the queries, last error: Some(Error { kind: UnexpectedMessage, cause: None })

In the meantime, any tips for debugging this?

uuid object

for now we convert uuid to string object in pandas, make it to uuid object (align with pandas)

Postgres SSL Support

Currently no TLS implementation is supported, so connecting like this fails if the DB mandates an SSL/TLS connection:

df = cx.read_sql(f"postgresql://{username}:{password}@host:35432/schema?sslmode=require", 'select 1;', return_type='pandas')

Error:

RuntimeError: timed out waiting for connection: error performing TLS handshake: no TLS implementation configured

Is this a planned feature? It seems to be supported upstream in rust-postgres: https://docs.rs/postgres-native-tls/0.5.0/postgres_native_tls/

The lack of SSL support (even with sslmode=require) makes it difficult to use connector-x in enterprise-y environments.

RuntimeError: TypeError: Argument 'placement' has incorrect type

File "D:\BOT\PostgreSQLConnection\venv\lib\site-packages\connectorx_init_.py", line 98, in read_sql
partition_query=partition_query,
RuntimeError: TypeError: Argument 'placement' has incorrect type (expected pandas.libs.internals.BlockPlacement, got list)
OR
File "D:\BOT\PostgreSQLConnection\venv\lib\site-packages\connectorx_init
.py", line 98, in read_sql
partition_query=partition_query,
RuntimeError: TypeError: Argument 'placement' has incorrect type (expected pandas._libs.internals.BlockPlacement, got int)

Document the internal mechanism

  1. cx will issue a min/max query to get the range of the partition column
  2. cx will generate queries for each partition
  3. cx will issue a count query for each partition
  4. cx will allocate the memory for the destination (e.g. pandas)
  5. cx will run queries and download data in parallel
  6. cx will return the data to the user

dump of the original plan

read_sql(): read data from database to pandas/dask dataframe through a sql query

Introduction

Databases is one of the most commonly used data source that data scientists fetch data from. However, the transformation process to load data from database and convert it into dataframes for further analyze is usually heavy-weight. The read_sql function aims to speed up the process through the following features:

  • Query partition: split a big query into a bunch of small queries so we can make the procedures like query execution, data transfer and format conversion in parallel and merge the results of small queries in the end
  • Result cache: offer persistence of the fetched data, do not need to repeatedly download data in situations like notebook restart or applying different tasks on the same dataset
  • Fast data conversion: speed up the csv to pandas process using 1. parallelism and 2. directly write to pre-allocated pandas dataframe memory buffer

User API

read_sql(sql, conn, cache=True, force_download=False, par_column=None, par_min=None, par_max=None, par_num=None, dask=False)

Parameters

  • sql(string) - The sql query for fetching the data
  • conn(string) - Connection string uri (e.g. postgresql://username:password@host:port/dbname)
  • cache(string or bool, optional(default True)) - Whether or not to cache the result data. If False is set, do not cache the result. If True is set, cache the result to /tmp with connection and sql as name. If a string is set, cache the result to the corresponding path
  • force_download(bool, optional(default False)) - Whether or not to force download the data from database no matter there is a cache or not
  • par_column(string, optional(default None)) - Name of column used to partition the query (Must be a integer column). If None is set, do not do partition
  • par_min(int, optional(default None)) - The minimum value to be requested from the partition column col. If None is set, do not do partition
  • par_max(int, optional(default None)) - The maximum value to be requested from the partition column col. If None is set, do not do partition
  • par_num(int, optional(default None)) - Number of queries to split. If None is set, do not do partition
  • dask(bool, optional(default False)) - Whether to return Dask dataframe instead of Pandas dataframe

Result

Pandas/Dask DataFrame

Related Works

Plan

  • Target use case: Fetch data from PostgreSQL to pandas dataframe
  • Tasks (expect save time calculated on TPCH scale=10, lineitem table (60M rows), 10 workers, 158s in total):
    • Implement parallel read_csv in Rust arrow - contribute code to arrow (expect save time 12% [158s->138s])
    • Read directly into pandas memory from DB, do not need to convert arrow to pandas (expect save time 52% [158s->75s])
    • Implement a cache on the client side for reloading the same data
      • Finish the functionality, do not consider incremental update
      • Research on how to incremental update
    • Partition the query and connect to DB in parallel (naive partition to 10 queries compared with 1 query saves time 67% [490s -> 158s])
      • Finish the functionality, do not consider how to partition
      • Research on how to do the partition

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.