Git Product home page Git Product logo

verified-sources's Introduction

data load tool (dlt) — verified sources repository


This repository contains verified dlt sources and demo pipelines for each source that you can use as a starting point for your project.

How to add a verified source to your dlt project

dlt offers an init command that will clone and inject any source with an example pipeline from this repository into your project, setup the credentials and python dependencies. Please follow the step by step instructions in our docs.

How to contact us and get help

Join our slack by following the invitation link

  • If you have added a source and something does not work, post in the technical-help channel
  • If you want to contribute sources, customization or a fix, post in the dlt-contributors channel

Reporting a source bug

Follow this link: bug report template

Requesting a source implementation

Follow this link: source request template

How to contribute

Find step by step instruction as well as troubleshooting help in CONTRIBUTING.md.

Building blocks

Find a list of the building blocks of dlt with links to our official documentation in BUILDING-BLOCKS.md.

verified-sources's People

Contributors

adrianbr avatar ahoffix avatar aksestok avatar astrakhantsevaaa avatar burnash avatar cmpadden avatar cybermaxs avatar dat-a-man avatar deanja avatar ethanve avatar glebzhidkov avatar ilyafaer avatar jorritsandbrink avatar mebusila avatar mermi avatar nacho0605 avatar redicane avatar rudolfix avatar sauravganguli avatar sbakiu avatar sehnem avatar sh-rp avatar steinitzu avatar sultaniman avatar tydunn avatar violetm avatar willi-mueller avatar yuchangz avatar z3z1ma avatar zem360 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

verified-sources's Issues

Firebase community pipeline

What is a community pipeline

A "community pipeline" is a track to quickly share a working pipeline for a particular data source. Such pipeline will be distributed with dlt init in community pipelines section. Before accepting for distribution we'll review your code and make sure it is passing all required checks.

Please tell us about your pipeline below, then open a PR and link this issue in it.

Quick pipeline info

  • Name of the pipeline: Firebase
  • What is the data source: link
  • I tried following destinations with it: duck

Why it got built?

  • I run it for fun
  • I run it on my local laptop
  • I run it in production

How much of your involvement can we expect?

We want to optimize the time you spent on this contribution. To successfully merge the PR we may require some fixes in the submitted code:

  • I'm ok with implementing the code review fixes

Things below are 100% optional but are helping us to make your pipeline a verified one - with tests and running in our CI

  • I can share my test data or a test account with dltHub
  • I'm ok with writing a short doc

Additional context

Anything we should know about your pipeline

Notion verified pipeline

Planned Pipeline Info

  • name: notion
  • category and description: CRM, loads the database data from Notion API

You can fill the data below when writing spec.

Use Cases

  • Retrieve and load CRM data for sales lead analysis
  • Extracting project management data: Import project tasks, assignees, due dates, and progress

Sources / Resources / Endpoints

  • notion_databases(database_ids: Optional[List[Dict[str, str]]] = None) - receives a list of dictionaries each containing a database id and a name to use as a table in a destination.

Test account / test data

  • A notion account with an an integration set up is needed.

Implementation tasks

Below you have a proposal for implementation tasks

  • implement all the described sources, resources and endpoints
  • make sure that the Use Cases can be easily executed by the pipeline's user by providing demonstrations of all the use cases in the [pipeline_name]_pipeline.py
  • all the use cases are tested
  • if there are any customizations you must test them, including unit tests for custom code

Add SQL database pipeline using SQLAlchemy

I'm working on the SQL database pipeline.

  • DLT source accepting a database URL and optionally a schema name or list of tables

  • Support a wide variety of database engines (by using SQLALchemy)

  • Incremental loading support.
    Possible to do this with a user supplied cursor column. The query can then be structured as e.g.
    SELECT * FROM table WHERE cursor_column > 'last_dlt_state_value' ORDER BY cursor_column

    If the table has an auto-increment primary key it may be possible to use it as cursor transparently.

  • Support all SQL data types.
    Generally handled by SQLAlchemy, but some types are not supported by dlt.
    I found DATE with BQ destination not working, and INTERVAL not supported at all.

  • Dependencies in poetry and requirements.txt. Separate reuirements-*.txt for common database drivers could be handy.

  • Create test source database. Should be enough to test with Postgres. Populate with mock data of all possible types.

  • Add test cases to run the pipeline and validate data is loaded

setup basic CI process

One dlt-hub/dlt#141 is merged the following will be done:

    • The secrets.toml and config.toml will be reorganized and documented in README
    • The github CI will be configured with the required secrets. Sharing of secrets for sources and destinations will be documented
    • PRs to master will be required to pass CI:
    • workflow 1: lint (make lint)
    • --workflow 2: run all tests against all pipelines (make tests)--

Possible problem:
CI jobs triggered form forks do not have access to the repo secrets. I'll figure out how to deal with that. They probably must be present in the fork

Algolia HN Search community pipeline

What is a community pipeline

A "community pipeline" is a track to quickly share a working pipeline for a particular data source. Such pipeline will be distributed with dlt init in community pipelines section. Before accepting for distribution we'll review your code and make sure it is passing all required checks.

Please tell us about your pipeline below, then open a PR and link this issue in it.

Quick pipeline info

  • Name of the pipeline: Algolia HN Search
  • What is the data source: Algolia HN Search API
  • I tried following destinations with it: DuckDB

Why it got built?

  • I run it for fun
  • I run it on my local laptop
  • I run it in production

How much of your involvement can we expect?

We want to optimize the time you spent on this contribution. To successfully merge the PR we may require some fixes in the submitted code:

  • I'm ok with implementing the code review fixes

Things below are 100% optional but are helping us to make your pipeline a verified one - with tests and running in our CI

  • I can share my test data or a test account with dltHub
  • I'm ok with writing a short doc

Additional context

Anything we should know about your pipeline

review and improvements to google sheets pipeline

Motivation
We look for a perfect balance between simplicity and feature richness in all our pipelines. That's because we want our users to be able to hack and customize them. Hacking means that the code of the pipeline itself will be changed to cover specific needs of particular user. We want to support that and make it easy - by writing clean and hackable code and proposing customizations as optional filters, transformations, settings etc.

Current status
We have two version of the google sheets pipeline:

The problem with the first is that it is probably too simple: does not support datetime detection for example
The problem with second one is that it got too complicated to be hacked (IMHO)

Task

  • Review code of both pipelines and propose or a new implementation or a set of improvements to existing ones
  • Write your proposal as an issue here
  • You can use any library you want
  • Special wish: we want to support OAUTH authentication in a way that we can reuse in other pipelines (will be ported to python-dlt core at some point)
  • Do it like you will implement it :)

Pipedrive custom fields extracting the field enums instead of the actual values

Pipeline name
pipedrive

Describe the bug
For some of the pipedrive deals fields, we are receiving an enum of the value rather than the actual value. Not sure why it happens for some of the fields having dropdown selector with multiple values.
For example Lead Source Channel field - in pipedrive it has text values but the data extraction gives their enums.

  • name of the dlt source causing the problem: pipedrive deals (possibly also fields in some other endpoints). These are mostly the custom fields

  • which destination you are loading into - bigquery

To Reproduce
Steps to reproduce the behavior: run pipedrive pipeline and check the fields for their values. Some of the errored fields and their corresponding hash strings values are mentioned below:

  • type_of_contract - 5a329a696fd92748ef43d16be56d20aee7c1992c
  • lead_source_channelx - f64a73bbe706f17d7b66c6f0a317711870a6c846
  • billing_cycle - c73beea5abc26ffb6fca1b51d2a4abc4eed6fe36
  • if_company_cancellation_period - ed104988783561f80a0b338ad5d96d6381bc82a8
  • if_company_contract_period - 647752a4c2786eb30c8487451b41eb9f620cc151
  • if_member_cancellation_period - d8c170be11c1d1757113b63847194d3002f3d261
  • if_member_contract_period - 74517be67da1d5295c80333ba294816285f92bf9
  • minimum_contract_length_monthsx - d39dfba978c0cbedaf8871bef32a481fc21f3472
  • lead_source - 3a98d9a73575c8cd313034ccba0039f155062958
  • lead_type - 12baefc8ae36b5bcd6c4748c0b1fb29648ddfef9

Expected behavior
The field should extract the actual value instead of the value enums. Ref. screenshot, the field lead_source_channelx should have values like AM, Fair/Event, Inbound, Outbound, Referral, etc instead of the values like 13, 15, 16, 17, etc.

Stack traces and other evidence
Please refer screenshots attached
Screenshot 2023-05-04 at 13 21 07
Screenshot 2023-05-04 at 13 21 42

Running environment

  • Cloud Composer
  • dlt Version - 0.2.6a1

edamam pipeline

What is a community pipeline

A "community pipeline" is a track to quickly share a working pipeline for a particular data source. Such pipeline will be distributed with dlt init in community pipelines section. Before accepting for distribution we'll review your code and make sure it is passing all required checks.

Please tell us about your pipeline below, then open a PR and link this issue in it.

Quick pipeline info

  • Name of the pipeline: edamam
  • What is the data source: edamam website link
  • I tried following destinations with it: duckdb

Why it got built?

  • I run it for fun
  • I run it on my local laptop
  • I run it in production

How much of your involvement can we expect?

We want to optimize the time you spent on this contribution. To successfully merge the PR we may require some fixes in the submitted code:

  • I'm ok with implementing the code review fixes

Things below are 100% optional but are helping us to make your pipeline a verified one - with tests and running in our CI

  • I can share my test data or a test account with dltHub
  • I'm ok with writing a short doc

Additional context

Anything we should know about your pipeline

Jira verified pipeline

Planned Pipeline Info

I want to create a jira pipeline

You can fill the data below when writing spec.

Use Cases

  • I want to reflect my issues and projects and users in a db for reporting. I want to understand how quicjkly issues transition in those dimensions.
  • I want to query jira and get back the data as the JQL offers it in the web interface. I want to replace this in the db. I would use this for custom analyses.

Sources / Resources / Endpoints

We want 2 sources:

  • Jira - entities, data
  • jira search - can custom query for issues etc.

Source 1: Jira search:

  • One resource, that allows sending a query to get the stateful data and replace it at destination

Source 2: Jira
Resources:

issues - stateful, must be merged

  • dependent: issue_comments - incrementable per issue but not sure it's a good idea to store last val
  • dependent: issue_transitions - same as above
  • descriptive statuses: If not verbose in parent object, also get those
  • resolutions
  • worklogs

projects

  • If the details about projects are not verbose, get details too:
  • project_categories
  • project_types
  • versions

users

  • roles

Customization

None

Test account / test data

Implementation tasks

Below you have a proposal for implementation tasks

  • implement all the described sources, resources and endpoints
  • make sure that the Use Cases can be easily executed by the pipeline's user by providing demonstrations of all the use cases in the [pipeline_name]_pipeline.py
  • all the use cases are tested
  • if there are any customizations you must test them, including unit tests for custom code

setup CI: run the `dlt init` with all the pipelines

This CI workflow will run dlt init for all the pipelines in the repo to check if they are compatible with the command

    • dlt init runs without errors
    • the secrets and correct and when filled form CI, the example script for the pipeline runs and loads data

Sheets getting skipped in pipelines/pipelines/google_sheets/google_sheets.py

In the code below, when sheet_range_name is of the form "'Sheet 1'!A1:Z1000", then the line sheet_range_name = sheet_range_name.split("!")[0] leaves the single quotes in the string "'Sheet 1'", which results in a key error and the table is skipped

    values = api_calls.get_data_batch(service=service, spreadsheet_id=spreadsheet_id, range_names=range_names)
    for i in range(len(values)):
        # get range name and metadata for sheet
        sheet_range_name = values[i]["range"]
        if sheet_range_name in metadata_dict:
            sheet_meta_batch = metadata_dict[sheet_range_name]
            # check if this is a named range and change the name so the range table can be saved with its proper name
            if sheet_meta_batch["name"]:
                sheet_range_name = sheet_meta_batch["name"]
        else:
            # if range doesn't exist as a key for metadata then it means the key is just the sheet name and google sheets api response just filled the range or that the sheet is skipped because
            # it was empty
            sheet_range_name = sheet_range_name.split("!")[0]
            try:
                sheet_meta_batch = metadata_dict[sheet_range_name]
            except KeyError:
                # sheet is not there because it was popped from metadata due to being empty
                logging.warning(f"Skipping data for empty range: {sheet_range_name}")
                continue

review and improvements to sql_database

@steinitzu thanks for the high quality work and your effort with testing! this looks amazing and I think at some point we'll move it to dlt-core sources...

here's my review (once #16 is merged)

  1. please use ConnectionStringCredentials as a type for database_url
@dlt.source
def sql_database(
    database_url: str = dlt.secrets.value,

why?
users can pass a connection string (like now), a dict with parsed string (I can also support passing sqlalchemy engine directly - I do it for duckdb for eg.). on top of that dlt is able to reflect ConnectionStringCredentials (which is a dataclass) and generate secrets files and deployment configs automatically. I cannot do that for string

from dlt.common.configuration.specs import ConnectionStringCredentials
engine = create_engine(database_url.to_native_representation())

see also here: https://github.com/dlt-hub/dlt/blob/0.2.0a20/docs/examples/sources/sql_query.py#L59

  1. You use batching in SqlAlchemy :thubsup:. Please do the same with dlt: you can actually return a list of rows in yield in table_rows. See here:
    https://github.com/dlt-hub/dlt/blob/0.2.0a20/docs/examples/sources/jsonl.py#L10

  2. I'd restrucutre code a little.

  • there's @dlt.source for a database. this is good!
  1. I'd add a standalone resource that you can use to query particular tables with a table specific settings (ie. write disposition, the cursor/uniq keys etc.)
    example: https://github.com/dlt-hub/dlt/blob/0.2.0a20/docs/examples/sources/sql_query.py#L55 (also shows how to modify resource name/write disposition within @dlt.resource function)

both source and resource may use table_rows as generator.

note that you can pass a list of resources/sources/generators to dlt.run method so people can pick their own tables.

  1. in
@dlt.source
def sql_database(
    database_url: ConnectionStringCredentials = dlt.secrets.value,
    schema: Optional[str] = dlt.config.value,
    table_names: Optional[List[str]] = dlt.config.value,
    write_disposition: str = 'replace',
) -> List[DltResource]:

write_disposition: str = 'replace' is not used at all. any plans for it?

  1. I love the last value. I think this should be part of standard library (so you can decorate your resources with similar parameters). Here's my proposal how to structure it better
  1. metadata.reflect(bind=engine)
    you should not reflect the whole database if table_names are provided. the code will be uglier... but reflection on certain databases costs.

  2. code structure: I'd put the _make_query and cursor related code in a helper module. then the main module is easier to hack


tests:

  1. please keep your fixtures with your tests - as long as we need to share them with other pipelines
  2. use unique names for your schemas. test may be run in parallel on shared resources.
  3. we need test cases for incremental load, for cases when you reflect tables or pass them directly etc.
  4. we somehow need to test more sources. we can reuse destination credenitals (so we have ducdb, redshift, bigquery). I can set up MySQL in AWS
    I'm not saying we shoudl do it now but please prepare your test so we can provide a list of connection strings to test against.

Mux community pipeline

What is a community pipeline

A "community pipeline" is a track to quickly share a working pipeline for a particular data source. Such pipeline will be distributed with dlt init in community pipelines section. Before accepting for distribution we'll review your code and make sure it is passing all required checks.

Please tell us about your pipeline below, then open a PR and link this issue in it.

Quick pipeline info

Why it got built?

  • I run it for fun
  • I run it on my local laptop
  • I run it in production

How much of your involvement can we expect?

We want to optimize the time you spent on this contribution. To successfully merge the PR we may require some fixes in the submitted code:

  • I'm ok with implementing the code review fixes

Things below are 100% optional but are helping us to make your pipeline a verified one - with tests and running in our CI

  • I can share my test data or a test account with dltHub
  • I'm ok with writing a short doc

Additional context

Anything we should know about your pipeline

require that pipelines contain `requirements.txt` with the `dlt` dependency

Background
When pipeline code is upgraded, it often requires a more recent dlt version that the one that user has. Right now declaring the dlt dependency in pipeline requirements.txt is not mandatory. See dlt-hub/dlt#241 for more

** Requirements**

    • improve the linter stage in pipelines repo to require that dlt dependency is added. the dependency should contain no extras. it should contain a version range
    • most of the pipelines do not have dlt dependency in their requirements. add those in pipelines repo
    • add this information to README

google play community pipeline

What is a community pipeline

A "community pipeline" is a track to quickly share a working pipeline for a particular data source. Such pipeline will be distributed with dlt init in community pipelines section. Before accepting for distribution we'll review your code and make sure it is passing all required checks.

Please tell us about your pipeline below, then open a PR and link this issue in it.

Quick pipeline info

  • Name of the pipeline: google_play
  • What is the data source: google play reviews and rating links
  • I tried following destinations with it: duckdb

Why it got built?

  • I run it for fun
  • I run it on my local laptop
  • I run it in production

How much of your involvement can we expect?

We want to optimize the time you spent on this contribution. To successfully merge the PR we may require some fixes in the submitted code:

  • I'm ok with implementing the code review fixes

Things below are 100% optional but are helping us to make your pipeline a verified one - with tests and running in our CI

  • I can share my test data or a test account with dltHub
  • I'm ok with writing a short doc

Additional context

Anything we should know about your pipeline

youtube data pipeline

What is a community pipeline

A "community pipeline" is a track to quickly share a working pipeline for a particular data source. Such pipeline will be distributed with dlt init in community pipelines section. Before accepting for distribution we'll review your code and make sure it is passing all required checks.

YouTube Analytics is a tool provided by YouTube that allows content creators to track the performance of their videos and channel. It provides various metrics such as views, watch time, engagement, and demographics, that can help creators better understand their audience and improve their content strategy.

Quick pipeline info

  • Name of the pipeline: youtube_analytic
  • What is the data source: link
  • I tried following destinations with it: duckdb

Why it got built?

  • I run it for fun
  • I run it on my local laptop
  • I run it in production

How much of your involvement can we expect?

We want to optimize the time you spent on this contribution. To successfully merge the PR we may require some fixes in the submitted code:

  • I'm ok with implementing the code review fixes

Things below are 100% optional but are helping us to make your pipeline a verified one - with tests and running in our CI

  • I can share my test data or a test account with dltHub
  • I'm ok with writing a short doc

Additional context

Anything we should know about your pipeline

setup CI: run all tests for past `python-dlt` versions

This CI workflow will take all the python-dlt versions started with the one present in pyproject.toml and ending at the newest version in PyPI and run all the tests on local destinations (currentlly duckdb) to make sure we are forward-compatible

Make pipedrive incremental with merge write disposition

Pipedrive supports incremental loading via recents endpoint. The endpoint returns newly changed entities since a given date. From testing, it seems they are distinct.

In order to update the data at the destination, we should

  1. on first run, full load the data from regular endpoints
  2. on subsequent run, get "recents" data and "merge" it into the regular tables on entity ID. The data from recents should be unique on entity ID, but I would double check/enforce it. For the endpoints that recents do not support, we continue doing "replace"

Pipedrive pipeline

This is an updated version of the pipedrive pipeline built by Viktor.

I fixed and added Martin's requirements

  • deal flow endpoint
  • deal participants endpoint

once uploaded, it will be shared back to the workshop participants who can further give feedback or improve

[RFC] Standardizing docstrings in pipelines repository

Problem

The pipelines repository contains various docstring formats. This could make it difficult for users to understand and maintain the code. The lack of uniformity across the codebase also could hinder collaboration and creates an inconsistent experience for contributors.

Proposed Solution

  • Standardize docstring format across the pipelines repository.
  • Choose a widely used, well-documented format.
  • Update existing code and enforce the format for future contributions.

Benefits of Standardization

  • Improved readability and maintainability.
  • Easier collaboration and code reviews.

Existing Docstring Formats

  1. Sphinx Autodoc

    • Pros:
      • Widely used and well-documented.
      • Adopted by Python codebase.
      • Works seamlessly with Sphinx.
    • Cons:
      • Can be more verbose and complex than other formats.
  2. Google Style

    • Pros:
      • Less verbose than Sphinx.
      • Adopted by big projects such as PyTorch and TensorFlow.
    • Cons:
      • Might be less detailed then Sphinx.
  3. Numpydoc

    • Pros:
      • Less verbose than Sphinx.
      • Adopted by major data analysis libraries: NumPy, scikit-learn, Pandas.
    • Cons:
      • Might be less detailed then Sphinx.

Key difference between the last two: Google Style uses indentation while Numpydoc uses underlines to separate sections.
See also: https://www.sphinx-doc.org/en/master/usage/extensions/napoleon.html

Next Steps

  • We'd love to hear what you think about the different docstring formats.
  • After we settle on a format, we'll update the codebase to follow the chosen style.
  • We'll also add some guidelines to the contributing docs to make sure everyone's on the same page for future contributions.

Please share your thoughts on this proposal. Feedback on the proposed solution and preferences for the docstring format are welcome.

setup CI: run only tests from modified pipelines

Improve all the workflows so only tests for pipelines modified in current PR are run.

Motivation

  1. My biggest worry is that our API Credentials and accounts will get locked due to excessive CI traffic.
  2. People should be able to merge their changes even if other pipelines are (temporarily) out of order.

Tasks

    • find the affected files by comparing current branch to master
    • identify pipelines and run tests only for them
    • apply to all workflows

Google Sheet pipeline

Objective
The task objective is to build a pipeline for google sheets with functionalities similar to existing singer tap (https://github.com/singer-io/tap-google-sheets). most of the requirements below make our pipeline more user friendly than a tap

In principle, the pipeline dlt.source method, for a givem spreadsheet_id or just url to a spreadsheet (copied from the browser) , should find all tabs containing data, convert them to separate resources and then yield. Look at the very simple example here (https://github.com/dlt-hub/dlt/blob/devel/docs/examples/google_sheets.py)

Requirements

  • create dlt.source function should accept spredsheet_id/url, a list of tab names (optional, if not present you should take all tabs), and other configuration see below
  • users should be able to pass credentials the dlt way. see example how google credentials are passed (https://github.com/dlt-hub/dlt/blob/devel/docs/examples/google_sheets.py ).
  • use replace as write disposition for all tabs (we may do incremental loads using the state, but later)
  • figure out how the singer tap handles headers row. do they autodetect or always use first row as field names? As we yield json documents we need names of the fields so in case there are no headers we must create field names ourselves. let's talk on slack
  • you must be able to recognize data types of the fields. how singer tap does that? is there metadata support in google api? dlt is able to infer the data types correctly but let's try to do it better
  • dates must be handled correctly. we need a clever way to recognize that given field is a date. how singer tap does that?
  • name your pipeline google_sheets
  • remember to add required dependencies (1) with poetry to a right group (2) to requirements.txt. I think you'll need google-api-python-client to handle google

Testing

  • create a few spreadsheets for testing: with single tab, many tabs, with all possible data types etc.
  • create "problematic" datasheets for example with a tab where values are missing or where we have irregular data, charts etc. to see what happens
  • write tests that loads and checks if such data is loaded as expected
  • we'll have more tests once we start working

How to start
I'd start with creating a few test cases, try to run singer tap with them to see the results and then extend the example google sheets

Places community pipeline

What is a community pipeline

A "community pipeline" is a track to quickly share a working pipeline for a particular data source. Such pipeline will be distributed with dlt init in community pipelines section. Before accepting for distribution we'll review your code and make sure it is passing all required checks.

Please tell us about your pipeline below, then open a PR and link this issue in it.

Quick pipeline info

Why it got built?

  • I run it for fun
  • I run it on my local laptop
  • I run it in production

How much of your involvement can we expect?

We want to optimize the time you spent on this contribution. To successfully merge the PR we may require some fixes in the submitted code:

  • I'm ok with implementing the code review fixes

Things below are 100% optional but are helping us to make your pipeline a verified one - with tests and running in our CI

  • I can share my test data or a test account with dltHub
  • I'm ok with writing a short doc

Additional context

Use Case Dashboard: https://lookerstudio.google.com/u/0/reporting/1cd67fec-6189-434b-8598-65e3fdb921a9/page/spuPD

Google Ads basic pipeline

Planned Pipeline Info

Please provide info below when opening the issue to give use heads-up on what is planned.

  • name google_ads
  • category and description of the pipeline: Ad network

Use Cases

Use case 1: Custom query source

  1. User can pass one or multiple queries + associated table names to a function to create dynamically multiple resources. It should be demonstrated for the user how to pass more than 1 query.

Use case 2: Pre-selected tables source
User can select to consume this source and they will get a preconfigured set of resources/queries/reports.

dimension tables:
These seem to be tables with descriptive data, that should be considered stateful dimensions We should load them with replace load disposition.
accounts
ad_group_ads
ad_group_ad_labels
ad_groups
ad_group_labels
campaign_labels
click_view
keyword
geographic

Report Tables: These always contain a date, we should append increment on that date
campaigns
account_performance_report
ad_group_ad_report
display_keyword_report
display_topics_report
shopping_performance_report
user_location_report

Sources / Resources / Endpoints

We use the reporting api
https://developers.google.com/google-ads/api/docs/reporting/overview

We want 2 sources

  1. google_ads_query: A configurable source where the user has to pass their queries
  2. google_ads: A preconfigured source where they get the data listed above (17 queries)

Customization

As far as we can tell there is no special customisation code needed, if it turns out that there is, please consult dlt team how to approach it.

Test account / test data

to be filled

Implementation tasks

Below you have a proposal for implementation tasks

  • implement all the described sources, resources and endpoints
  • make sure that the Use Cases can be easily executed by the pipeline's user by providing demonstrations of all the use cases in the [pipeline_name]_pipeline.py
  • all the use cases are tested
  • if there are any customizations you must test them, including unit tests for custom code

improve standard tests

Write a test helper asserting a happy path for each pipeline

  1. pipeline is run against all destinations with a full_refresh
  2. pipeline loads correctly
  3. for each table in the schema there's data in the destination that conforms to the schema

shopify pipeline

Planned Pipeline Info

Please provide info below when opening the issue to give use heads-up on what is planned.

  • name of the pipeline to be used by dlt init and to be placed in pipelines folder: [e.g. pipedrive]
  • category and description of the pipeline: [e.g. CRM, loads the relevant data from pipedrive api]

You can fill the data below when writing spec.

Use Cases

Please provide descriptions up to 3 most important use cases that users of this pipeline do. Those use cases will be:

  • implemented
  • reviewed
  • demonstrated in pipeline script
  • documented
    use case description is not only targeted at the developers but also by people creating test account and doing demos.

Sources / Resources / Endpoints

Define pipeline interface to the user in terms of sources and resources.

  • enumerate all the sources with information form which endpoints the data comes from
  • ideally provide the config arguments to each source (ie. start dates, report ranges etc.)
  • you can use pseudocode to show how you intend to use the source
  • provide the default write disposition for resources in the source (all append, all replace?)
  • in the sources identify the incremental and merge resources and specify them in a reasonable way (ideally by giving the cursor columns - what is the last value really?, primary keys and merge keys).

Customization

Enumerate everything that goes beyond standard dlt building blocks. Suggest the implementation

  • use of state
  • in the code or as additional transform, filter or map function
  • ask dlt team for help if it looks like complex software task

Test account / test data

  • specify what data you expect in the test dataset. otherwise refer to use cases
  • specify what kind of account access you need, include tool's name, required plan or features needed

Implementation tasks

Below you have a proposal for implementation tasks

  • implement all the described sources, resources and endpoints
  • make sure that the Use Cases can be easily executed by the pipeline's user by providing demonstrations of all the use cases in the [pipeline_name]_pipeline.py
  • all the use cases are tested
  • if there are any customizations you must test them, including unit tests for custom code

add verified pipelines marker and ban the marker in forks

Background
verified pipeline” - we have a spec for it and it runs as specified, have tests so we are able to prove that it still runs on CI. also has verified dataset and docs

dlt init shows them in green or flagged with “verified”

“community pipeline” - contributed by a community member with a "fast track" merge. We do not have it in our CI. We just verify that there is no malicious code and it is good enough for our contrib repo.

dlt init shows them with a warning

Verified pipeline contain verified.py python module that can be imported from pipeline folder. this file will be currently empty.

Info on how dlt init uses that: dlt-hub/dlt#293

Tasks

    • Add verified.py to all pipelines' folders that have working tests and running CI. (as per definition)
    • Add a linter job that gets activated only on pull requests from forks that add/modify verified.py file (in any folder)

Workable verified pipeline

Planned Pipeline Info

  • name of the pipeline: workable
  • category and description of the pipeline: loads the relevant data from Workable API

Use Cases

  • load data from any endpoint with replace mode, except endpoint "candidates" - we can load it incrementally
  • load data from dependent endpoints with @dlt.transformer
  • load filtered data by date

Sources / Resources / Endpoints

Sources:

workable_source
Retrieves data from the Workable API for the specified endpoints.
For almost all endpoints, Workable API responses do not provide keys "updated_at", so in most cases we are forced to load the date in 'replace' mode. 'Сandidates' are the only endpoints that have a key 'updated_at', which means that we can update the data incrementally in 'merge' mode.

Resources:

  • "members",
  • "recruiters",
  • "stages",
  • "requisitions",
  • "jobs",
  • "custom_attributes",
  • "candidates"

Resources that depend on another resource are implemented as transformers, so they can re-use the original resource data without re-downloading:

resources with write_disposition='merge'

  • "candidates_activities"
  • "candidates_offer"

resources with write_disposition='replace'

  • "jobs_activities"
  • "jobs_application_form"
  • "jobs_questions"
  • "jobs_stages"
  • "jobs_custom_attributes"
  • "jobs_members"
  • "jobs_recruiters"

Test account / test data

Workable test account provide test data.

Add examples to sql pipeline

As usage documentation, we need some examples in the SQL pipeline that show people how to configure the pipeline

Existing example: How to load all database
usual use case

  • Load some tables with "replace"
  • load non stateful tables with "append"
  • load some tables with merge

So we would like to show the following pipeline examples:

  1. "load all tables" A case where we have a list of tables. The user will configure incremental loading for some tables and leave the rest to load as "replace"
    2 "load selected tables" Same as before, but the user must specify "replace" tables as well, and anything else defaults to not load.

We should demonstrate two methods:

  1. with sql_database where we reflect the tables and selectively apply hints as specified above and load
  2. with sql_table standalone resource where user knows the name of the tables and configures each one then loads together.

We may actually use one of public mysql databases as a demo source: ie. test test_load_mysql_data_load and

# reflect a database
credentials = ConnectionStringCredentials("mysql+pymysql://[email protected]:4497/Rfam")
database = sql_database(credentials)

where we also demonstrate how to pass credentials explicitly...

Airtable

Quick pipeline info

Why it got built?

  • I run it for fun
  • I run it on my local laptop
  • I run it in production

How much of your involvement can we expect?

We want to optimize the time you spent on this contribution. To successfully merge the PR we may require some fixes in the submitted code:

  • I'm ok with implementing the code review fixes

Things below are 100% optional but are helping us to make your pipeline a verified one - with tests and running in our CI

  • I can share my test data or a test account with dltHub
  • I'm ok with writing a short doc

Additional context

Anything we should know about your pipeline
Use case: collect information about workshop attendees and analyze it compared to other events, e.g.

  • how many events did they attend?
  • how often do more experienced people attend the workshop than less experienced people?

fix typing in google sheet pipeline and module names/configs

google sheet pipeline does not type check. it was fixed to work with Python 3.8 in https://github.com/dlt-hub/pipelines/pull/16
When this is merged (or by branching from it) please fix the following

    • add google sheets to mypy checks by placing py.typed properly. see the new README
    • switch to python 3.8
    • make lint muss pass with typecheck
    • your example pipeline has a wrong file name and has no main, see the chess or any other pipeline example
    • import logging -> import the dlt logging, not the root logger! from dlt.common import logger works like a standard log
    • move samples xls to tests or remove it altogether if we do not need it
    • allow spreadsheet id to be passed via config, same for ranges. then you can remove
# constants
SPREADSHEET_ID = ""
SPREADSHEET_URL = ""

also from tests and just provide proper configuration for our test case. btw new test case is now here:
https://docs.google.com/spreadsheets/d/1HhWHjqouQnnCIZAFa2rL6vT91YRN8aIhts22SUUR580/edit#gid=0
8. clean imports a little. I've changed the relevant internal import to relative (to make CI work). please change the rest to the order:
python libs - libraries/dependencies - dlt - your local imports
I'm not forcing that with a linter

salesforce pipeline

Planned Pipeline Info

Please provide info below when opening the issue to give use heads-up on what is planned.

  • name of the pipeline to be used by dlt init and to be placed in pipelines folder: [e.g. pipedrive]
  • category and description of the pipeline: [e.g. CRM, loads the relevant data from pipedrive api]

You can fill the data below when writing spec.

Use Cases

Please provide descriptions up to 3 most important use cases that users of this pipeline do. Those use cases will be:

  • implemented
  • reviewed
  • demonstrated in pipeline script
  • documented
    use case description is not only targeted at the developers but also by people creating test account and doing demos.

Sources / Resources / Endpoints

Define pipeline interface to the user in terms of sources and resources.

  • enumerate all the sources with information form which endpoints the data comes from
  • ideally provide the config arguments to each source (ie. start dates, report ranges etc.)
  • you can use pseudocode to show how you intend to use the source
  • provide the default write disposition for resources in the source (all append, all replace?)
  • in the sources identify the incremental and merge resources and specify them in a reasonable way (ideally by giving the cursor columns - what is the last value really?, primary keys and merge keys).

Customization

Enumerate everything that goes beyond standard dlt building blocks. Suggest the implementation

  • use of state
  • in the code or as additional transform, filter or map function
  • ask dlt team for help if it looks like complex software task

Test account / test data

  • specify what data you expect in the test dataset. otherwise refer to use cases
  • specify what kind of account access you need, include tool's name, required plan or features needed

Implementation tasks

Below you have a proposal for implementation tasks

  • implement all the described sources, resources and endpoints
  • make sure that the Use Cases can be easily executed by the pipeline's user by providing demonstrations of all the use cases in the [pipeline_name]_pipeline.py
  • all the use cases are tested
  • if there are any customizations you must test them, including unit tests for custom code

Add pipedrive customization - Field rename

  • Implement custom field rename for pipedrive.
  • The fields should be renamed on the initial deployment and the rename “recipe” persisted in state and re-used on subsequent runs (so any field renames later do not change the schema). New custom fields should also be added to the state with their first-encountered name and this renaming would be reused on subsequent runs. In order do rename fields, the user would do it manually or by dropping the state.
  • The auto-rename should be optional and it should be easy to toggle on/off for the end user.

Fields in zendesk pipelines

Pipeline name
zendesk
Describe the bug
in Zendesk souce like tickets,

  1. there are values in the column that has prefix like __dc.cc_--_cm_-_. This kind of prefix needs to remove. e.g. there is a value in column contact_reason : "__dc.cc_--_cm_-_i_am_unable_to_check_in__", it should look like Check in Management::I am unable to check in.

To Reproduce
Steps to reproduce the behavior:

Expected behavior
A clear and concise description of what you expected to happen.
I would like to remove the prefix
Stack traces and other evidence
Screenshot 2023-05-11 at 15 17 05
Running environment

  • Cloud Composer
  • dlt Version - 0.2.6a1

FB Ads basic pipeline

Planned Pipeline Info

Please provide info below when opening the issue to give use heads-up on what is planned.

  • name of the pipeline to be used by dlt init and to be placed in pipelines folder: facebook_ads
  • category and description of the pipeline: [e.g. CRM, loads the relevant data from pipedrive api]

You can fill the data below when writing spec.

Use Cases

Read my ad performance. Every day I should know how much money I spent, and what happened to it (views, clicks etc). So I expect I can either send a query or get all the data somehow.

I expect data on any granularity so that means highest - so creative. But my data engineer might say keyword costs too much and let's use adgroup or campaign. I expect data to be loaded daily. I expect to have the performance metrics and the dimensions describing them (creative, adgroup, campaign,any details about them that might be avaialble such as the targetted segment)

Sources / Resources / Endpoints

We will have 2 sources: 1 for custom query, 1 preconfigured
Source1:
facebook_ads

Reports:
Here you get reports on campaign/adgroup/ad level.
Create resources for each of those levels, selecting all possible fields.
Since these are reports with dates, increment on date with append based on last value.
https://developers.facebook.com/docs/marketing-api/insights/
https://developers.facebook.com/docs/instagram/ads-api/guides/get-ad-insights/

Dimensions:
For the granularities levels as above, you need to also get the dimension tables
you can get it from facebook_business.adobjects.adaccount library
https://github.com/facebook/facebook-python-business-sdk/blob/main/facebook_business/adobjects/adaccount.py
I could not find great docs but there exist examples online and in other frameworks pipelines.
Dimension tables write disposition is replace

  • campaign
  • ad group
  • ad

Source2:
facebook_ads_query
should have a dynamically definable resource (queries, their load disposition, table names)
should use ad insights - the python wrapper has a method that accepts field names
The user should be able to pass a custom query (fields, breakdowns, anything customisable)
They should be able to define how to increment this - demonstrate one example of a "date" based append increment and one "replace" one
They should be able to pass multiple queries with multiple table names and loading configurations. Show how in example

Customisations

None

Test account / test data

pending

Implementation tasks

Below you have a proposal for implementation tasks

  • implement all the described sources, resources and endpoints
  • make sure that the Use Cases can be easily executed by the pipeline's user by providing demonstrations of all the use cases in the [pipeline_name]_pipeline.py
  • all the use cases are tested
  • if there are any customizations you must test them, including unit tests for custom code

create test account for ASANA and add test

We need a test account for Asana pipeline.

  • create a test account and add credentials to 1password
  • add the required test data by checking resources in the asana pipeline
  • create a standard test that loads all the data. check if the data is present. you can use github tests as a reference

use -e-mail auth as default in Zendesk

Currently dlt init creates oauth credentials as default in secrets.toml. It is the most complicated method. We need a simpler method as default. Let's do email + password

how to change: dlt init will use the first item in union of credentials, so shift the email+password as first in sources

Stripe verified pipeline

Description

Stripe is an online payment company that offers software and APIs for processing payments and business management.

This pipeline uses Stripe API and dlt to load data such as Customer, Subscription, Event and etc. to the database and to calculate the MRR and churn rate.

  • name of the pipeline is "stripe_analytics"
  • category and description of the pipeline: loads the relevant data from Stripe API and calculates some metrics.

Use Cases

  1. Pipeline provides automatic data collection using Stripe API and storing them in a database (DuckDB, Postgres, Redshift, BigQuery). You will be able to store information about customers, all their purchases, subscriptions, as well as information about your products and etc.

You can load data from any endpoint load_data(endpoints=("Customer", "Product")).

You can load data in Incremental mode load_incremental_endpoints(endpoints=("Event", "Invoice")).

  1. With the pipeline you can calculate the most important metrics (MRR and subscription churn rate) and store them in a database as a resource. Use load_data_and_get_metrics().

Sources / Resources / Endpoints

We have two sources: stripe_source and incremental_stripe_source.

  1. stripe_source:
    Retrieves data from the Stripe API for the specified endpoints. For almost all endpoints, Stripe API responses do not provide key "updated", so in most cases we are forced to load the data in 'replace' mode. This source is suitable for all types of endpoints, including 'Events', 'Invoice' etc, but these endpoints can also be loaded in incremental mode (see source incremental_stripe_source).

  2. incremental_stripe_source:
    As Stripe API does not include the "updated" key in its responses, we are only able to perform incremental downloads from endpoints where all objects are uneditable. This source yields the resources with incremental loading based on "append" mode. You will load only the newest data without duplicating and without downloading a huge amount of data each time.

To calculate the metrics in load_data_and_get_metrics(), we need endpoints Subscription and Event. All endpoints become resources automatically.

Resource Subscription:

  • With Subscriptions, customers make recurring payments for access to a product. Subscriptions require you to retain more information about your customers than one-time purchases do because you need to charge customers in the future.
  • Updated Endpoint. Object states can change over time, for example, customers can unsubscribe, so that the status of the subscription changes from active to canceled.
  • To get a list of subscriptions from Stripe API we used method list. The list method returns a maximum of 100 elements. To get all the elements we iterate over the pages using the key "has_more".
  • By default, method list returns a list of subscriptions that have not been canceled. Passing in a value of all will return subscriptions of all statuses. In current pipeline we set status all for subscriptions.

Resource Event:

  • Events are the way of letting you know when something interesting happens in your account. When an interesting event occurs, stripe create a new Event object. For example, when a charge succeeds, stripe create a charge.succeeded event; and when an invoice payment attempt fails, stripe create an invoice.payment_failed event. Note that many API requests may cause multiple events to be created. For example, if you create a new subscription for a customer, you will receive both a customer.subscription.created event and a charge.succeeded event.
  • Incremental Endpoint. Objects do not change over time. If a new event occurs, it is simply added to Event. For example, if the customer canceled the subscription, then a new event will appear with the type "customer.subscription.deleted".
  • To get a list of events from Stripe API we used method list. List events, going back up to 30 days.

Resource Metrics:

  • Stripe Billing Analytics Dashboard provides a summarized view of your account, which provides a lot of useful information like MRR, churn, and so on. Unfortunately, there is not an API for the Stripe Billing Analytics Dashboard. So, we can’t get the dashboard out to Python directly. But Stripe does provide some guidance on how they calculate those metrics.
  • Metrics are saved to the database as a resource each time you run a function from stripe_analytics import metrics_resource
  • You can add your own metrics in this resource (see Customisation).

Metrics calculation

You can find some metrics implementation in the pipelines/stripe_analytics/metrics.py script.

MRR

Monthly Recurring Revenue (MRR) can be thought of as the total amount of monthly revenue you can reliably expect to receive on a recurring basis. It is one of the most important metrics for a SAAS business to track, as it provides a forward looking measure of growth and predicted revenue. You can calculate the approximate MRR by summing the monthly-normalized amounts of all subscriptions from which payment is being collected at that time.

  • First, we need to get all the subscription data. Here we set status="all" to get all subscription data including canceled subscription. We can also not include this argument to only get the “active” and “past_due” status since we don’t use the “canceled” status in this calculation.
  • Then we get the information about the subscription plan, i.e., amount, and interval (yearly or monthly plan).
  • If there is a discount on someone’s plan, we get the information about the discount.
  • Next, we calculate the normalized monthly plan amount by normalizing the yearly amount and also applying the discount.
  • Finally, MRR is calculated as the sum of the normalized monthly plan amount for those who have an “active” or “past_due” status.

This calculation is based on the Stripe article, and a blog post 1 and a blog post 2.

Churn rate

The churn rate is measured by the sum of churned subscribers in the past 30 days divided by the number of active subscribers as of 30 days ago, plus any new subscribers in those 30 days.

  • First, we calculate the number of churned subscribers in the past 30 days. We can either use the event data or we can use the subscription data and see who canceled the subscription in the past 30 days.
  • Second, we calculate the number of active or past-due subscriptions.
  • Then we can calculate the churn rate based on these two numbers.

This calculation is based on the descriptions from this article.

Test account / test data

Stripe's test account doesn't provide any test data, therefore, you need to accumulate your own product data, or generate test data.

Customisations

There is a few additional features. With this pipeline you can also:

  • Add other endpoints.
  • Calculate other metrics based on data.

Custom endpoints

If you want to add another endpoint, just enumerate them in a tuple and pass to the function.
For example, you want to load data for the next endpoints: Price and Coupon. So run the function stripe_source() with the argument endpoints=("Price", "Coupon"):

    pipeline = dlt.pipeline(
        pipeline_name="stripe_analytics",
        destination="duckdb",
        dataset_name="stripe_data",
    )
    source = stripe_source(endpoints=("Price", "Coupon"))
    load_info = pipeline.run(source)
    print(load_info)

Full list of the Stripe API endpoints you can find here.

Most popular endpoints:

  • Subscription
  • Account
  • Coupon
  • Customer
  • PaymentMethod
  • Product
  • Price

Possible Incremental endpoints:

  • Event
  • BalanceTransaction
  • Invoice

port all pipelines to sessions with retry and ban certain imports

After upgrading to 0.2.0a27

  1. please replace requests with retry session from dlt-hub/dlt#147 in all pipelines in the repository
  2. remove re-retry where it was used just for http re-retry
  3. use tenacity instead of re-retry where it cannot be removed (I do not think we have such case)
  4. please pay attention to chess pipeline which does requests in parallel (@defer). our session will work right?
  5. ban import using flake extension we use in dlt repo

Following imports should be banned

  1. requests
  2. datetime and bare pendulum. (take it from dlt)
  3. port code to pendulum if necessary

Matomo basic pipeline

Description

Matomo basic pipeline

Use Cases

  • Request tracking event data that was tracked with Matomo. I should have this data in a table that can be incremented on any schedule so I can keep it near live. I should be able to optionally also get the stateful user and update (merge) this data with events.
  • Request aggregated report data on data tracked with Matomo. I expect this to always be aggregated on date, and incremented on date, so I can see the data up to end of yesterday, today. it is loaded incrementally in a nightly load.
  • Request user defined report. I expect that it has a date column on which it is loaded incrementally in a nightly load.

Sources / Resources / Endpoints

We want TWO sources with the folowing resources:

We want the sources to be split by schedule:
matomo_reports: Contains Reporting and Custom reporting
matomo_events: Contains events and dependent users

Reporting resource

Raw data "events"

  • Raw data: https://developer.matomo.org/api-reference/reporting-api#Live we want this "raw data" api too - this should allow to export raw data from matomo
    Resource:
  • Live.getLastVisitsDetails: Named "events" - This should be incremented as above. Always start from all time (which is a last 10d rss) and increment from last load.
  • Live.getVisitorProfile. named "visitors" This should be a dependent resource on events and request the profile for those users that showed up in the event increment. The visitors profile should be merged with each increment.

Custom reports

We want to get custom reports too, as a method alternative to the first reports resource.

  • CustomReports.getCustomReport (idSite, period, date, idCustomReport, segment = '', expanded = '', flat = '', idSubtable = '', columns = '') [ No example available ]
  • I am not sure if we can get the report name automatically for the table name - if not provide the user with an option
  • we can always request and increment on date so please use the same increment method as above.

Customization

I believe there should be no customisation used for this pipeline.
If it looks like we should use something, please notify the dlt team.

Test account / test data

Use the public demo
https://demo.matomo.cloud/index.php?module=CoreHome&action=index&idSite=1&period=day&date=yesterday#?period=day&date=yesterday&category=Dashboard_Dashboard&subcategory=1

Implementation tasks

Below you have a proposal for implementation tasks

  • implement all the described sources, resources and endpoints
  • make sure that the Use Cases can be easily executed by the user with the implemented by **providing demonstrations in all the use cases in the [pipeline_name]_pipeline.py
  • all the use cases are tested
  • if there are any customizations you must test them, including unit tests for custom code

Sql pipeline mysql issue

I tried to run the sql pipeline with a local mysql db, and it seems there's an issue around session/connection object usage.

trace:

(python-dlt-verified-pipelines-py3.9) bash-3.2$ python3 sql_database_pipeline.py
Traceback (most recent call last):
File "/Users/adrian/Library/Caches/pypoetry/virtualenvs/python-dlt-verified-pipelines-7y5Gudle-py3.9/lib/python3.9/site-packages/dlt/pipeline/pipeline.py", line 316, in extract
self._extract_source(storage, source, max_parallel_items, workers)
File "/Users/adrian/Library/Caches/pypoetry/virtualenvs/python-dlt-verified-pipelines-7y5Gudle-py3.9/lib/python3.9/site-packages/dlt/pipeline/pipeline.py", line 743, in _extract_source
extract_id = self._iterate_source(storage, source, pipeline_schema, max_parallel_items, workers)
File "/Users/adrian/Library/Caches/pypoetry/virtualenvs/python-dlt-verified-pipelines-7y5Gudle-py3.9/lib/python3.9/site-packages/dlt/pipeline/pipeline.py", line 758, in _iterate_source
extractor = extract(extract_id, source, storage, max_parallel_items=max_parallel_items, workers=workers)
File "/Users/adrian/Library/Caches/pypoetry/virtualenvs/python-dlt-verified-pipelines-7y5Gudle-py3.9/lib/python3.9/site-packages/dlt/extract/extract.py", line 81, in extract
for pipe_item in PipeIterator.from_pipes(source.resources.selected_pipes, max_parallel_items=max_parallel_items, workers=workers, futures_poll_interval=futures_poll_interval):
File "/Users/adrian/Library/Caches/pypoetry/virtualenvs/python-dlt-verified-pipelines-7y5Gudle-py3.9/lib/python3.9/site-packages/dlt/extract/pipe.py", line 422, in next
pipe_item = self._get_source_item()
File "/Users/adrian/Library/Caches/pypoetry/virtualenvs/python-dlt-verified-pipelines-7y5Gudle-py3.9/lib/python3.9/site-packages/dlt/extract/pipe.py", line 570, in _get_source_item
item = next(gen)
File "/Users/adrian/PycharmProjects/ll/python-dlt-verified-pipelines/pipelines/sql_database/source.py", line 19, in table_rows
with conn.execute(table.select()) as result:
AttributeError: enter

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/Users/adrian/PycharmProjects/ll/python-dlt-verified-pipelines/pipelines/sql_database_pipeline.py", line 24, in
info = pipeline.run(data)
File "/Users/adrian/Library/Caches/pypoetry/virtualenvs/python-dlt-verified-pipelines-7y5Gudle-py3.9/lib/python3.9/site-packages/dlt/pipeline/pipeline.py", line 100, in _wrap
step_info = f(self, *args, **kwargs)
File "/Users/adrian/Library/Caches/pypoetry/virtualenvs/python-dlt-verified-pipelines-7y5Gudle-py3.9/lib/python3.9/site-packages/dlt/pipeline/pipeline.py", line 131, in _wrap
return f(self, *args, **kwargs)
File "/Users/adrian/Library/Caches/pypoetry/virtualenvs/python-dlt-verified-pipelines-7y5Gudle-py3.9/lib/python3.9/site-packages/dlt/pipeline/pipeline.py", line 491, in run
self.extract(data, table_name=table_name, write_disposition=write_disposition, columns=columns, schema=schema)
File "/Users/adrian/Library/Caches/pypoetry/virtualenvs/python-dlt-verified-pipelines-7y5Gudle-py3.9/lib/python3.9/site-packages/dlt/pipeline/pipeline.py", line 100, in _wrap
step_info = f(self, *args, **kwargs)
File "/Users/adrian/Library/Caches/pypoetry/virtualenvs/python-dlt-verified-pipelines-7y5Gudle-py3.9/lib/python3.9/site-packages/dlt/pipeline/pipeline.py", line 74, in _wrap
rv = f(self, *args, **kwargs)
File "/Users/adrian/Library/Caches/pypoetry/virtualenvs/python-dlt-verified-pipelines-7y5Gudle-py3.9/lib/python3.9/site-packages/dlt/pipeline/pipeline.py", line 60, in _wrap
return f(self, *args, **kwargs)
File "/Users/adrian/Library/Caches/pypoetry/virtualenvs/python-dlt-verified-pipelines-7y5Gudle-py3.9/lib/python3.9/site-packages/dlt/pipeline/pipeline.py", line 131, in _wrap
return f(self, *args, **kwargs)
File "/Users/adrian/Library/Caches/pypoetry/virtualenvs/python-dlt-verified-pipelines-7y5Gudle-py3.9/lib/python3.9/site-packages/dlt/pipeline/pipeline.py", line 325, in extract
raise PipelineStepFailed(self, "extract", exc, runner.LAST_RUN_METRICS, ExtractInfo()) from exc
dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at stage extract with exception:

<class 'AttributeError'> __enter__ (python-dlt-verified-pipelines-py3.9) bash-3.2$ B

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.