Git Product home page Git Product logo

pyairbyte's Introduction

PyAirbyte

PyAirbyte brings the power of Airbyte to every Python developer. PyAirbyte provides a set of utilities to use Airbyte connectors in Python.

PyPI version PyPI - Downloads PyPI - Python Version

PyPI - Wheel

PyPI - Implementation PyPI - Format Star on GitHub

Getting Started

Watch this Getting Started Loom video or run one of our Quickstart tutorials below to see how you can use PyAirbyte in your python code.

Secrets Management

PyAirbyte can auto-import secrets from the following sources:

  1. Environment variables.
  2. Variables defined in a local .env ("Dotenv") file.
  3. Google Colab secrets.
  4. Manual entry via getpass.

Note: You can also build your own secret manager by subclassing the CustomSecretManager implementation. For more information, see the airbyte.secrets.CustomSecretManager class definiton.

Retrieving Secrets

import airbyte as ab

source = ab.get_source("source-github")
source.set_config(
   "credentials": {
      "personal_access_token": ab.get_secret("GITHUB_PERSONAL_ACCESS_TOKEN"),
   }
)

By default, PyAirbyte will search all available secrets sources. The get_secret() function also accepts an optional sources argument of specific source names (SecretSourceEnum) and/or secret manager objects to check.

By default, PyAirbyte will prompt the user for any requested secrets that are not provided via other secret managers. You can disable this prompt by passing allow_prompt=False to get_secret().

For more information, see the airbyte.secrets module.

Secrets Auto-Discovery

If you have a secret matching an expected name, PyAirbyte will automatically use it. For example, if you have a secret named GITHUB_PERSONAL_ACCESS_TOKEN, PyAirbyte will automatically use it when configuring the GitHub source.

The naming convention for secrets is as {CONNECTOR_NAME}_{PROPERTY_NAME}, for instance SNOWFLAKE_PASSWORD and BIGQUERY_CREDENTIALS_PATH.

PyAirbyte will also auto-discover secrets for interop with hosted Airbyte: AIRBYTE_CLOUD_API_URL, AIRBYTE_CLOUD_API_KEY, etc.

Contributing

To learn how you can contribute to PyAirbyte, please see our PyAirbyte Contributors Guide.

Frequently asked Questions

1. Does PyAirbyte replace Airbyte? No.

2. What is the PyAirbyte cache? Is it a destination? Yes, you can think of it as a built-in destination implementation, but we avoid the word "destination" in our docs to prevent confusion with our certified destinations list here.

3. Does PyAirbyte work with data orchestration frameworks like Airflow, Dagster, and Snowpark, Yes, it should. Please give it a try and report any problems you see. Also, drop us a note if works for you!

4. Can I use PyAirbyte to develop or test when developing Airbyte sources? Yes, you can, but only for Python-based sources.

5. Can I develop traditional ETL pipelines with PyAirbyte? Yes. Just pick the cache type matching the destination - like SnowflakeCache for landing data in Snowflake.

6. Can PyAirbyte import a connector from a local directory that has python project files, or does it have to be pip install Yes, PyAirbyte can use any local install that has a CLI - and will automatically find connectors by name if they are on PATH.

Changelog and Release Notes

For a version history and list of all changes, please see our GitHub Releases page.

pyairbyte's People

Contributors

4sushi avatar aaronsteers avatar alafanechere avatar bindipankhudi avatar colesnodgrass avatar natikgadzhi avatar sebastienn15 avatar tinomerl avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

pyairbyte's Issues

Making it easier to create a config

Problem

When currently setting up a source there is no easy way to check how a config for a connector should look like. You have to use the validate_config, which returns an Exception when given no config, use the private Method _get_spec, or look at the spec file in the connector Folder in the Airbyte Repository as the current Airbyte docs don't include the spec in JSON format. For first time users, or people not familiar with the Airbyte connectors this a hurdle when testing it out.

Proposed Change

Add a new Method to the source class, that returns the spec for the source and prints it to the terminal. Alternatively write it to a file.

Auto-locate and inject matching secrets as needed

Originally tracked in #1

Secrets management Phase 2. Auto-detect and inject

In this phase, presumably we'd be able to auto-inject secrets if they match a specific naming convention that aligns with the source name and setting name.

Tech Spec here:

https://docs.google.com/document/d/1rqoRuGRDvSYX8TmtxvpJv-XJgzfN46fLYtuMymz-QE8/edit#heading=h.c4ztxwwo0ign

Re: Priority

We can probably postpone/deprioritize this one for a little while. It will be most valuable we want the same secret value to be applicable from both local and hosted execution models.

โ˜ Cloud Interop: Basic `run_sync()` interop with Airbyte Cloud/OSS/Enterprise

This would add the following interfaces for interacting with a hosted Airbyte REST API (Cloud/OSS/Enterprise):

  1. get_cloud_connection() - Get a handle for a remotely-hosted connection.
  2. get_latest_read_result() - Get the latest result from a cloud connection sync. Should have properties like success (true/false), executed_at, completed_at, etc.
  3. run_sync() - Remotely execute a sync operation.

Pseudocode mockup:

Locally Executed Executed via Hosted Airbyte
github_source = get_source(
    "source-github",
    # ...
)
sql_cache = SnowflakeCache(
    # ...
)

# Sync data to the cache
read_result = github_source.read(
    cache=sql_cache,
)









# Get the repos dataset
repos_data = read_result.cache.streams["repos"]

# Iterate through records
for repo in repos_data:
	print(repo["name"])
# Get the Cloud connection object
conn = get_cloud_connection(
    workspace_id="...",
    connection_id="...",
    api_key="...",
)


# Get the latest run result
read_result = conn.get_latest_read_result()

# We can check basic properties of the sync:
if not read_result.success:
    # We can invoke a Cloud sync from Python:
    read_result = conn.run_sync()


# After the follow-on feature ships, you'd be also 
# be able to read from remote destinations.
# https://github.com/airbytehq/PyAirbyte/issues/110

# Get the repos dataset
repos_data = read_result.cache.streams["repos"]

# Iterate through records
for repo in repos_data:
	print(repo["name"])

Refactor progress indicator and make optional

Currently the progress indicator in airbyte-lib has several problems:

  • Managed globally and either takes over the whole terminal view all the time or is disabled completely
  • Incompatible with features like prompting the user for input
  • No way for user to control whether it's shown or not

To fix these issues, the feature should be refactored:

  • Manage state via local instance
  • Enable for the duration of the sync, then disable again for maximum compatibility
  • Allow to enable/disable via argument on read / env var, with smart default based on detected environment (e.g. in CI disable)

๐Ÿ› Bug: Cache table missing expected columns

Found this issue when reading from source-google-analytics-data-api. The error is seen in all streams I tried for this source.

Here's an example error trace for the pages stream:

[/usr/local/lib/python3.10/dist-packages/airbyte_lib/caches/base.py](https://localhost:8080/#) in _ensure_compatible_table_schema(self, stream_name, raise_on_error)
    411         if missing_columns:
    412             if raise_on_error:
--> 413                 raise exc.AirbyteLibCacheTableValidationError(
    414                     violation="Cache table is missing expected columns.",
    415                     context={

AirbyteLibCacheTableValidationError: AirbyteLibCacheTableValidationError: Cache table validation failed.
    Violation: 'Cache table is missing expected columns.'
    Missing Columns: {'pagePathPlusQueryString', 'screenPageViews', 'bounceRate', 'hostName'}

The interesting thing is that all cache files do seem to have those columns:

Screenshot 2024-02-06 at 19 38 32

๐Ÿ‘‰ Colab for reference

๐Ÿ› Bug: Unexpected failure when `select_streams()` is called before `set_config()`

Ideally, these should be runnable in either order.

Traceback log:

Traceback (most recent call last):
  File "/Users/ajsteers/Source/aj-dataops-personal/get_data.py", line 35, in <module>
    main()
  File "/Users/ajsteers/Source/aj-dataops-personal/get_data.py", line 31, in main
    read_result = get_github_data(cache=cache)
  File "/Users/ajsteers/Source/aj-dataops-personal/get_data.py", line 17, in get_github_data
    source_github.select_streams(["pull_requests", "issues"])
  File "/Users/ajsteers/Source/aj-dataops-personal/.venv/lib/python3.10/site-packages/airbyte/source.py", line 123, in select_streams
    available_streams = self.get_available_streams()
  File "/Users/ajsteers/Source/aj-dataops-personal/.venv/lib/python3.10/site-packages/airbyte/source.py", line 198, in get_available_streams
    return [s.name for s in self.discovered_catalog.streams]
  File "/Users/ajsteers/Source/aj-dataops-personal/.venv/lib/python3.10/site-packages/airbyte/source.py", line 251, in discovered_catalog
    self._discovered_catalog = self._discover()
  File "/Users/ajsteers/Source/aj-dataops-personal/.venv/lib/python3.10/site-packages/airbyte/source.py", line 179, in _discover
    with as_temp_files([self._config]) as [config_file]:
  File "/Users/ajsteers/Source/aj-dataops-personal/.venv/lib/python3.10/site-packages/airbyte/source.py", line 165, in _config
    raise exc.AirbyteConnectorConfigurationMissingError(
airbyte.exceptions.AirbyteConnectorConfigurationMissingError: AirbyteConnectorConfigurationMissingError: Connector is missing configuration.

It is true that for many sources we cannot validate that the requested streams exist until after valid credentials are given. However, a better solution might be to print a warning if stream names can't be validated, rather than raising a hard failure.

Bug: Unexpected error: "`pyarrow.lib.ArrowNotImplementedError`: Cannot write struct type '<field>' with no child field to Parquet."

Resolved by: #60

Following is most likely a side effect of introducing empty tables, related to #30

PyAirbyte % poetry run python examples/run_github.py        

Connection check succeeded for `source-github`.
Started `source-github` read operation at 10:29:46...
Traceback (most recent call last):
  File "/Users/bindipankhudi/PyAirbyte/examples/run_github.py", line 29, in <module>
    result = source.read(cache=ab.new_local_cache("github"))
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bindipankhudi/PyAirbyte/airbyte/source.py", line 567, in read
    cache.process_airbyte_messages(
  File "/Users/bindipankhudi/PyAirbyte/airbyte/_processors.py", line 219, in process_airbyte_messages
    self._process_batch(stream_name, record_batch)
  File "/Users/bindipankhudi/PyAirbyte/airbyte/_processors.py", line 238, in _process_batch
    batch_handle = self._write_batch(
                   ^^^^^^^^^^^^^^^^^^
  File "/Users/bindipankhudi/PyAirbyte/airbyte/caches/base.py", line 519, in _write_batch
    return self.file_writer.write_batch(stream_name, batch_id, record_batch)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bindipankhudi/PyAirbyte/airbyte/_file_writers/base.py", line 79, in write_batch
    return self._write_batch(stream_name, batch_id, record_batch)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bindipankhudi/PyAirbyte/airbyte/_file_writers/parquet.py", line 86, in _write_batch
    with parquet.ParquetWriter(output_file_path, schema=record_batch.schema) as writer:
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bindipankhudi/PyAirbyte/.venv/lib/python3.11/site-packages/pyarrow/parquet/core.py", line 1016, in __init__
    self.writer = _parquet.ParquetWriter(
                  ^^^^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/_parquet.pyx", line 1869, in pyarrow._parquet.ParquetWriter.__cinit__
  File "pyarrow/error.pxi", line 154, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
pyarrow.lib.ArrowNotImplementedError: Cannot write struct type 'payload' with no child field to Parquet. Consider adding a dummy child field.

Docs: Remove 'Central DB' from diagram

The diagram shows 'Central DB', which isn't really part of the model. Everything is represented by the SQLCache node, which can either be local or remote.

Image

Proposal: Add native `len()` support for the return of `get_records()` (`LazyDataset`)

We because to implement this but then reverted because the len() implementation was exhausting the iterator and then the records were not iteratable after running len() on the result.

A workaround is to wrap in len(list()), and in this case the result will be converted to a list and then counted.

Design challenge:

There's an argument that we should not directly implement len() over get_records() because it will carry the full cost of streaming all records, while discarding all records. If the user then wants to actually view the records, they will pay the cost of extracting records twice - and the results (including counts) could change between calls.

Logging this issue so that we can discuss and iterate on path forward. If we don't support directly, perhaps we can provide better guidance to users in a better error message.

Note:

The len() call is directly supported on CachedDataset objects, and there's no similar issue with performance. We have already optimized that codepath to use count() in SQL to provide optimized len() results.

๐Ÿ› Bug: `NoSuchTableError` is raised if user tries to access a stream that did not receive records

Currently we only run 'create table' when and if record batches are being processed for that stream. Since we know which stream names are selected, and since we have the schema declared in the catalog for those streams, we could optionally also create empty destination tables and initialize empty CachedDatasets for those streams.

This would be helpful because a user does not expect to get an error when checking for the results of a specific stream. Example, the "deployments" stream of the GitHub source is very likely to be empty. A user iterating through the streams they selected will get a confusing failure rather than just seeing an empty dataset.

Example error from Github (internal slack thread here):

sqlalchemy.exc.NoSuchTableError: deployments

๐Ÿ› Bug: Cannot install connectors on Databricks/Spark

When I try to install airbyte and airbyte-source-linkedin-ads, I get the following error.

INFO: pip is looking at multiple versions of <Python from Requires-Python> to determine which version is compatible with other requirements. This could take a while.
INFO: pip is looking at multiple versions of airbyte to determine which version is compatible with other requirements. This could take a while.
ERROR: Cannot install airbyte-source-linkedin-ads==0.7.0 and airbyte==0.7.2 because these package versions have conflicting dependencies.

The conflict is caused by:
    airbyte 0.7.2 depends on airbyte-cdk<0.59.0 and >=0.58.3
    airbyte-source-linkedin-ads 0.7.0 depends on airbyte-cdk==0.63.2

To fix this you could try to:
1. loosen the range of package versions you've specified
2. remove package versions to allow pip attempt to solve the dependency conflict

ERROR: ResolutionImpossible: for help visit https://pip.pypa.io/en/latest/topics/dependency-resolution/#dealing-with-dependency-conflicts

I install in databaricks using the command %pip install airbyte==0.7.2 airbyte-source-linkedin-ads==0.7.0

When I do the same in a local machine, the linkedin-ads is installed in a new venv whcih does not work in databricks.

Add multi-source support for caches

We have logged this issue to add support for data from multiple sources to be saved within the same cache.

Our implementation might already support this, since our internal caches and streams tables are (in theory) able to support data from multiple source names.

Before investing in dev side, we should probably try to prioritize some tests to confirm whether this is working or not. As things stand, this is relatively low priority.

Support config write-back

Via an Airbyte control message, the running connector can issue an update of its config object: https://docs.airbyte.com/understanding-airbyte/airbyte-protocol#airbytecontrolmessage

This is important for cases like single-use authentication tokens - some APIs only accept an authentication token once and return a new token in the response which has to be used the next time, invalidating the old token.

This message type is currently not honored by airbyte-lib - the message is silently dropped. Ideally, it's possible to handle this situation gracefully:

  • Automatically patch the config dictionary airbyte-lib works with
  • Signal to the user the config got changed so they can propagate the change to the appropriate place (as part of the sync result instance)
  • For airbyte-lib-validate-source: Update the config json file with the updated config

The last step is important to make it possible to run a proper check and read command as part of the CI steps - currently it's not possible to do this as integration test secrets are at risk of being invalidated by the test being run: airbytehq/airbyte#34044

Align column and table normalization with Dv2 destinations

To allow easy migration from airbyte-lib to hosted Airbyte, the logic to generate table and column names as well as column types need to be identical to the logic used in Java destinations.

This might already be the case in a lot of situations, but we need to make sure edge cases are handled properly, like weird characters in column and table names.

Tasks


From @aaronsteers

Reference Pages Available

These pages can be used to confirm the Dv2-compatible table schema requirements:

Quotes:

From Notion:

Notes on name transformations and per-destination rules

As a rule, we try to keep schema, table, and column names as close as possible to the source, but there are sometimes exceptions:

Bug: Type conversion failure for `events` stream of `source-klaviyo` when run in PyAirbyte

Reported in slack.

Could not convert 'false' with type str: tried to convert to boolean

I checked the events schema for Klaviyo and it doesn't seem to have any boolean-typed fields, which is odd.

Here's the full traceback:

>>> cache = ab.get_default_cache()
>>> source.select_streams(["events"])
>>> result = source.read(cache=cache)
Started `source-klaviyo` read operation at 12:56:42...
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/gsb-eng/prog/envs/andromeda/lib/python3.11/site-packages/airbyte/source.py", line 567, in read
    cache.process_airbyte_messages(
  File "/Users/gsb-eng/prog/envs/andromeda/lib/python3.11/site-packages/airbyte/_processors.py", line 218, in process_airbyte_messages
    record_batch = pa.Table.from_pylist(stream_batch)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/table.pxi", line 1876, in pyarrow.lib._Tabular.from_pylist
  File "pyarrow/table.pxi", line 5327, in pyarrow.lib._from_pylist
  File "pyarrow/table.pxi", line 3969, in pyarrow.lib.Table.from_arrays
  File "pyarrow/table.pxi", line 1448, in pyarrow.lib._sanitize_arrays
  File "pyarrow/table.pxi", line 1429, in pyarrow.lib._schema_from_arrays
  File "pyarrow/array.pxi", line 344, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 42, in pyarrow.lib._sequence_to_array
  File "pyarrow/error.pxi", line 154, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Could not convert 'false' with type str: tried to convert to boolean

๐Ÿ› Bug: columns that match SQL reserved words cause failure during insert

When the final table is created in SQL caches, field names matching SQL keywords cause the query to break.

For example when reading from the pokeapi connector, the following query is issued:

CREATE TABLE main.pokemon (
  id VARCHAR,
  name VARCHAR,
  base_experience VARCHAR,
  height VARCHAR,
  is_default VARCHAR,
  order VARCHAR,
  weight VARCHAR,
  abilities VARCHAR,
  forms VARCHAR,
  game_indices VARCHAR,
  held_items VARCHAR,
  location_area_encounters VARCHAR,
  moves VARCHAR,
  sprites VARCHAR,
  species VARCHAR,
  stats VARCHAR,
  types VARCHAR,
  past_types VARCHAR
)

As order is also a SQL keyword, the query fails.

This is a problem throughout the SQLCacheBase class which uses string concatenation to interface with the SQL database.

As a stopgap solution the individual places in the code can be patched by escaping field names one by one to avoid having to refactor the whole class:

However, mid term the refactor to sqlachemy should be performed, also to avoid security issues via SQL injection.

Let sources define their own default document rendering processes

First mockup, was subtracted from initial launch of:

Here is a rough outline of how we imagine this would work:


To inform how to render a specific stream's records as documents, this implementation proposes that
sources define a `document_rendering` annotation in their JSON schema. This property would contain
instructions for how to render records as documents, such as which properties to render as content,
which properties to render as metadata, and which properties to render as annotations.

Assuming a stream like GitHub Issues, the `document_rendering` annotation might look like this:
```json
{
    "airbyte_document_rendering": {
        "title_property": "title",
        "content_properties": ["body"],
        "frontmatter_properties": ["url", "author"],
        "metadata_properties": ["id", "created_at", "updated_at", "url"]
    }
}
AIRBYTE_DOCUMENT_RENDERING = "airbyte_document_rendering"
TITLE_PROPERTY = "title_property"
CONTENT_PROPS = "content_properties"
METADATA_PROPERTIES = "metadata_properties"


class DocumentAutoRenderer(BaseModel):
    """Automatically render a stream's records as documents.

    This class is a convenience class for automatically rendering a stream's records as documents.
    It is a thin wrapper around the `DocumentRenderer` class, and is intended to be used when the
    source does not provide a `document_rendering` annotation in its JSON schema."""

    def __init__(self, stream_metadata: ConfiguredAirbyteStream) -> None:
        """Create a new DocumentAutoRenderer."""

        render_instructions: dict | None = None
        title_prop: str | None = None
        content_props: list[str] = []
        metadata_props: list[str] = []

        if AIRBYTE_DOCUMENT_RENDERING in stream_metadata.stream.json_schema:
            render_instructions = stream_metadata.stream.json_schema[AIRBYTE_DOCUMENT_RENDERING]
            if TITLE_PROPERTY in render_instructions:
                title_prop: str | None = render_instructions[TITLE_PROPERTY] or None
            if CONTENT_PROPS in render_instructions:
                content_props: list[str] = render_instructions[CONTENT_PROPS]
            if METADATA_PROPERTIES in render_instructions:
                metadata_props: list[str] = render_instructions[METADATA_PROPERTIES]

        if stream_metadata.cursor_field:
            cursor_prop: str | None = ".".join(stream_metadata.cursor_field)

        super().__init__(
            title_property=title_prop,
            cursor_property=cursor_prop,
            content_properties=[
                key
                for key, value in stream_metadata.json_schema.get("properties", {}).items()
                if value.get("type") == "string"
            ],
            metadata_properties=[
                key
                for key, value in stream_metadata.json_schema.get("properties", {}).items()
                if value.get("type") != "string"
            ],
            primary_key_properties=stream_metadata.primary_key,
        )

Clean up docs and make PyAirbyte docs discoverable

The PyAirbyte docs on docs.airbyte.io are currently not discoverable. We should use the work started in the below PR to make those nav elements visible. We should also update all code examples and API Reference docs links, and we should make sure all references to AirbyteLib are replaced with PyAirbyte.

We don't want to do this until the public beta announcement. The PR to do so is here:

Snowflake is slow

Relative to other cache types, Snowflake is pretty slow right now. It appears the main issue is that there are lots of calls which have a round-trip time of .8s-1.5s. These add up to runtimes close to 30-40 seconds to do a simple load.

In the context of a 10-20 minute data sync, these slowdowns may not be noticeable, but they are very noticeable in our integration tests.

For now, I've already moved all Snowflake integration tests into their own integration test file, separating them from the Postgres and DuckDB tests which are much faster.

I'm also attaching a perf trace log from viztracer, for discussion and future reference.

This test ran in ~10 seconds. You can see several calls to the database taking in the .8-1.9s range:

Image

result_test_faker_read_to_snowflake_170789807552213.json.zip

๐Ÿ› Bug: SQLAlchemy error `AttributeError: id` and `KeyError: 'id'`

Found this bug.

KeyError                                  Traceback (most recent call last)
[/usr/local/lib/python3.10/dist-packages/sqlalchemy/sql/base.py](https://localhost:8080/#) in __getattr__(self, key)
   1222         try:
-> 1223             return self._index[key]
   1224         except KeyError as err:

KeyError: 'id'

The above exception was the direct cause of the following exception:

AttributeError                            Traceback (most recent call last)
7 frames
[/usr/local/lib/python3.10/dist-packages/sqlalchemy/util/compat.py](https://localhost:8080/#) in raise_(***failed resolving arguments***)
    209 
    210         try:
--> 211             raise exception
    212         finally:
    213             # credit to

AttributeError: id

The Coin API demo workbook can be used to demo this.

https://colab.research.google.com/drive/1rK2bfTU42c99rjMQGYcq2pAVpwxoV1mt#scrollTo=v9R1SWHoGS7g

DX improvement: require selection of streams before `read()`

It's been observed in beta testing that is not a great first-time experience if 30+ or 50+ streams are auto-selected. Especially for sources like GitHub, where the user probably wants one of 3-5 streams, but they'll get dozens. What makes this a more significant issue is when those streams they don't want or find care about are causing performance delays and preventing the first sync from competing successfully.

While we're at it, I think we should also rename set_streams() to select_streams(). We should also add a select_all_streams() method, for cases where the user does actually want all streams.

Clean up public interface

Currently, the source and cache classes expose lots of internals in their constructors and methods because they need to communicate with each other.

However this should be an implementation detail that's not exposed to the user:

classDiagram
    RecordProcessor <|-- SQLCacheBase
    SQLCacheBase <|-- DuckDBCacheBase
    DuckDBCacheBase <|-- DuckDBCache
    SQLCacheBase <|-- SnowflakeCache
    SQLCacheBase <|-- PostgresCache
    class Source{
        __init__(executor: Executor)
        read(cache: SQLCacheBase)
    }
    class SQLCacheBase{
        register_source(source: Source)
        get_state()
    }
    Source --> SQLCacheBase

Things like executor, register_source and get_state should not be visible by the user, but they are also not protected/private methods, as they are required for the source/cache interaction.

To fix this, let's hide these things for the end user and only expose clean interfaces:

  • Things that are only used within a class can be made protected (like discovered_catalog, configured_catalog, executor)
  • Don't expose the actual Source class, but only an interface which doesn't know about the constructor (the actual source class becomes _Source). The user should use the factory function to create the source.
  • Introduce a new class Cache which has an internal_instance: SQLCacheBase property which is not document via @private annotation in the docstring: https://pdoc.dev/docs/pdoc.html#control-what-is-documented - this way it's hidden from the reference docs
  • The Cache class delegates all public methods to its instance. When communicating with a cache (calling register_source or get_state), the _Source class is directly accessing the internal_instance to call these methods

This way the user can only "see" the intended public interface while the components can still communicate with each other. All implementation details of caches are hidden away as well.

Bug: IPython rendering used instead of `rich` when running from a python script at the terminal.

New rendering bug when running from terminal:

AJ-Steers-MacBook-Pro---GGHWM7QWPJ:airbyte-lib-private-beta ajsteers$ poetry run python examples/run_faker_to_motherduck.py 
Connection check succeeded for `source-faker`.
Started `source-faker` read operation at 09:06:36...
<IPython.core.display.Markdown object>
<IPython.core.display.Markdown object>
<IPython.core.display.Markdown object>
<IPython.core.display.Markdown object>
<IPython.core.display.Markdown object>
<IPython.core.display.Markdown object>
<IPython.core.display.Markdown object>
<IPython.core.display.Markdown object>
<IPython.core.display.Markdown object>
<IPython.core.display.Markdown object>
<IPython.core.display.Markdown object>
<IPython.core.display.Markdown object>
<IPython.core.display.Markdown object>
<IPython.core.display.Markdown object>
<IPython.core.display.Markdown object>
<IPython.core.display.Markdown object>
Completed `source-faker` read operation at 09:06:48.
Stream products: 100 records
Stream users: 10000 records
Stream purchases: 10000 records

Originally discovered here: #43 (comment)

Performance: Explore alternative to fetching all schemas in base.py

Since we already know the schema name, we could potentially check whether it exists rather than fetching all schemas and then comparing:

def _get_schemas_list(
        self,
        database_name: str | None = None,
    ) -> list[str]:
        """Return a list of all tables in the database."""
        inspector: Inspector = sqlalchemy.inspect(self.get_sql_engine())
        database_name = database_name or self.database_name
        found_schemas = inspector.get_schema_names()
        return [
            found_schema.split(".")[-1].strip('"')
            for found_schema in found_schemas
            if "." not in found_schema
            or (found_schema.split(".")[0].lower().strip('"') == database_name.lower())
        ]

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.