Git Product home page Git Product logo

prefect-databricks's Introduction

Integrate Databricks jobs into your dataflow with prefect-databricks


PyPI

Visit the full docs here to see additional examples and the API reference.

The prefect-databricks collection makes it easy to coordiante Databricks jobs with other tools in your data stack using Prefect. Check out the examples below to get started!

Getting Started

Integrate with Prefect flows

Using Prefect with Databricks allows you to define and orchestrate complex data workflows that take advantage of the scalability and performance of Databricks.

This can be especially useful for data-intensive tasks such as ETL (extract, transform, load) pipelines, machine learning training and inference, and real-time data processing.

Below is an example of how you can incorporate Databricks notebooks within your Prefect flows.

Be sure to install prefect-databricks and save a credentials block to run the examples below!

If you don't have an existing notebook ready on Databricks, you can copy the following, and name it example.ipynb. This notebook, accepts a name parameter from the flow and simply prints a message.

name = dbutils.widgets.get("name")
message = f"Don't worry {name}, I got your request! Welcome to prefect-databricks!"
print(message)

Here, the flow launches a new cluster to run example.ipynb and waits for the completion of the notebook run. Replace the placeholders and run.

from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import jobs_runs_submit_and_wait_for_completion
from prefect_databricks.models.jobs import (
    AutoScale,
    AwsAttributes,
    JobTaskSettings,
    NotebookTask,
    NewCluster,
)


@flow
def jobs_runs_submit_flow(block_name: str, notebook_path: str, **base_parameters):
    databricks_credentials = DatabricksCredentials.load(block_name)

    # specify new cluster settings
    aws_attributes = AwsAttributes(
        availability="SPOT",
        zone_id="us-west-2a",
        ebs_volume_type="GENERAL_PURPOSE_SSD",
        ebs_volume_count=3,
        ebs_volume_size=100,
    )
    auto_scale = AutoScale(min_workers=1, max_workers=2)
    new_cluster = NewCluster(
        aws_attributes=aws_attributes,
        autoscale=auto_scale,
        node_type_id="m4.large",
        spark_version="10.4.x-scala2.12",
        spark_conf={"spark.speculation": True},
    )

    # specify notebook to use and parameters to pass
    notebook_task = NotebookTask(
        notebook_path=notebook_path,
        base_parameters=base_parameters,
    )

    # compile job task settings
    job_task_settings = JobTaskSettings(
        new_cluster=new_cluster,
        notebook_task=notebook_task,
        task_key="prefect-task"
    )

    run = jobs_runs_submit_and_wait_for_completion(
        databricks_credentials=databricks_credentials,
        run_name="prefect-job",
        tasks=[job_task_settings]
    )

    return run


jobs_runs_submit_flow(
    block_name="BLOCK-NAME-PLACEHOLDER"
    notebook_path="/Users/<EMAIL_ADDRESS_PLACEHOLDER>/example.ipynb",
    name="Marvin"
)

Upon execution, the notebook run should output:

Don't worry Marvin, I got your request! Welcome to prefect-databricks!

!!! info "Input dictionaries in the place of models"

Instead of using the built-in models, you may also input a valid dictionary.

For example, the following are equivalent:

```python
auto_scale=AutoScale(min_workers=1, max_workers=2)
```

```python
auto_scale={"min_workers": 1, "max_workers": 2}
```

If you have an existing Databricks job, you can run it using jobs_runs_submit_by_id_and_wait_for_completion:

from prefect import flow

from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import (
    jobs_runs_submit_by_id_and_wait_for_completion,
)


@flow
def existing_job_submit(databricks_credentials_block_name: str, job_id):
    databricks_credentials = DatabricksCredentials.load(name=block_name)

    run = jobs_runs_submit_by_id_and_wait_for_completion(
        databricks_credentials=databricks_credentials, job_id=job_id
    )

    return run

existing_job_submit(databricks_credentials_block_name="db-creds", job_id="YOUR-JOB-NAME")

Resources

For more tips on how to use tasks and flows in a Collection, check out Using Collections!

Note, the tasks within this collection were created by a code generator using the service's OpenAPI spec.

The service's REST API documentation can be found here.

Installation

Install prefect-databricks with pip:

pip install prefect-databricks

Requires an installation of Python 3.7+.

We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.

These tasks are designed to work with Prefect 2. For more information about how to use Prefect, please refer to the Prefect documentation.

Saving Credentials to Block

To use the load method on Blocks, you must already have a block document saved through code or saved through the UI.

Below is a walkthrough on saving block documents through code; simply create a short script, replacing the placeholders.

  1. Head over to Databricks.
  2. Login to your Databricks account and select a workspace.
  3. On the top right side of the nav bar, click on your account name -> User Settings.
  4. Click Access tokens -> Generate new token -> Generate and copy the token.
  5. Note down your Databricks instance from the browser URL, formatted like https://<DATABRICKS-INSTANCE>.cloud.databricks.com/
  6. Create a short script, replacing the placeholders.
from prefect_databricks import DatabricksCredentials

credentials = DatabricksCredentials(
    databricks_instance="DATABRICKS-INSTANCE-PLACEHOLDER"
    token="TOKEN-PLACEHOLDER"
)

connector.save("BLOCK_NAME-PLACEHOLDER")

Congrats! You can now easily load the saved block, which holds your credentials:

from prefect_databricks import DatabricksCredentials

DatabricksCredentials.load("BLOCK_NAME-PLACEHOLDER")

!!! info "Registering blocks"

Register blocks in this module to
[view and edit them](https://orion-docs.prefect.io/ui/blocks/)
on Prefect Cloud:

```bash
prefect block register -m prefect_databricks
```

Feedback

If you encounter any bugs while using prefect-databricks, feel free to open an issue in the prefect-databricks repository.

If you have any questions or issues while using prefect-databricks, you can find help in either the Prefect Discourse forum or the Prefect Slack community.

Feel free to star or watch prefect-databricks for updates too!

Contributing

If you'd like to help contribute to fix an issue or add a feature to prefect-databricks, please propose changes through a pull request from a fork of the repository.

Here are the steps:

  1. Fork the repository
  2. Clone the forked repository
  3. Install the repository and its dependencies:
pip install -e ".[dev]"
  1. Make desired changes
  2. Add tests
  3. Insert an entry to CHANGELOG.md
  4. Install pre-commit to perform quality checks prior to commit:
pre-commit install
  1. git commit, git push, and create a pull request

prefect-databricks's People

Contributors

ahuang11 avatar avishniakov avatar bada-s avatar chrisguidry avatar dependabot[bot] avatar desertaxle avatar discdiver avatar edmondop avatar micklaw avatar prefect-collection-synchronizer[bot] avatar serinamarie avatar urimandujano avatar zerodarkzone avatar zzstoatzz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

prefect-databricks's Issues

`NewCluster` has `node_type_id` as mandatory

Hi,

in current implementation of NewCluster it is not possible to pass in only instance_pool_id, because node_type_id is mandatory field. Obviously, if we pass both it will fail on cluster creation on Databricks side, since instance_pool_id and node_type_id are self-excluding settings for a cluster.

I can contribute to the package myself, if I'm assigned with necessary permissions (just not sure how contributing works for prefect-databricks and if it is allowed by maintainers).

Errors

>>> NewCluster(instance_pool_id="ABC", spark_version="10.4.x-scala2.12")
---------------------------------------------------------------------------
ValidationError                           Traceback (most recent call last)
Cell In [37], line 1
----> 1 NewCluster(instance_pool_id="ABC", spark_version="10.4.x-scala2.12")

File ~/Projects/GitHub/Libs/mls-mlops/.venv/lib/python3.8/site-packages/pydantic/main.py:342, in pydantic.main.BaseModel.__init__()

ValidationError: 1 validation error for NewCluster
node_type_id
  field required (type=value_error.missing)
>>> NewCluster(node_type_id="m5d.xlarge", instance_pool_id="ABC", spark_version="10.4.x-scala2.12")
Malformed Request from Databricks

Installing prefect-databricks with poetry yields installation of every version of botocore

Not sure if this is on purpose, but installing this package with poetry results in every version of botocore being installed.

Command: poetry add prefect-databricks

Related:

Workaround:

  1. Install in virtual environment using pip: poetry run pip install prefect-databricks
  2. Then add as a dependency: poetry add prefect-databricks

Goes nearly instantly. But long term solution is probably a version pinning of something related to boto3/moto etc

Running jobs_runs_submit_and_wait_for_completion without run_name

If run_name defaults to None, it will result in:

prefect.exceptions.ParameterTypeError: Flow run received invalid parameters:
 - run_name: none is not an allowed value

in jobs_runs_submit_and_wait_for_completion
    jobs_runs_state, jobs_runs_metadata = await jobs_runs_wait_for_completion(
ValueError: too many values to unpack (expected 2)

Because Prefect validates the input, but here the run_name arg isn't optional so it crashes:
https://github.com/PrefectHQ/prefect-databricks/blob/main/prefect_databricks/flows.py#L399-L400

Also need to update the type annotations in the base flow as well.

Originally reported from:
https://prefect-community.slack.com/archives/CL09KU1K7/p1664168414041899

Retrieving the output of runs with multiple tasks is not supported (jobs_runs_submit_by_id)

When using jobs_runs_submit_by_id_and_wait_for_completion with a databricks job with multiple tasks, we get the following error:

Finished in state Failed('Flow run encountered an exception. Traceback (most recent call last):\n File "/usr/local/lib/python3.9/site-packages/prefect_databricks/rest.py", line 149, in _unpack_contents\n response.raise_for_status()\n File "/usr/local/lib/python3.9/site-packages/httpx/_models.py", line 749, in raise_for_status\n raise HTTPStatusError(message, request=request, response=self)\nhttpx.HTTPStatusError: Client error \'400 Bad Request\' for url \'https://{removed}.cloud.databricks.com/api/2.1/jobs/runs/get-output?run_id=4816808\'\nFor more information check: https://httpstatuses.com/400\n\nThe above exception was the direct cause of the following exception:\n\nhttpx.HTTPStatusError: A job run with multiple tasks was provided.JSON response: {\'error_code\': \'INVALID_PARAMETER_VALUE\', \'message\': \'Retrieving the output of runs with multiple tasks is not supported. Please retrieve the output of each individual task run instead.\'}\n')

All the task in the job and the job itself completes successfully but the flow stops at this point.

Expectation / Proposal

The flow should be marked as completed if all tasks are successfully completed and fail if any of the tasks fails.

Error when running a multitask job

Expectation / Proposal

The jobs_runs_submit_by_id_and_wait_for_completion flow fails when running a multi task databricks job.
I think it has to do with some changes in the databricks jobs API 2.1.

Traceback / Example

jobs_runs_submit_by_id_and_wait_for_completion
    task_run_output = await task_run_output_future.result()
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
    result = await coro
  File "/usr/local/lib/python3.9/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/usr/local/lib/python3.9/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1655, in orchestrate_task_run
    result = await call.aresult()
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
    result = await coro
  File "/usr/local/lib/python3.9/site-packages/prefect_databricks/jobs.py", line 1238, in jobs_runs_get_output
    contents = _unpack_contents(response, responses)
  File "/usr/local/lib/python3.9/site-packages/prefect_databricks/rest.py", line 157, in _unpack_contents
    raise httpx.HTTPStatusError(
httpx.HTTPStatusError: A job run with multiple tasks was provided.JSON response: {'error_code': 'INVALID_PARAMETER_VALUE', 'message': 'Retrieving the output of runs with multiple tasks is not supported. Please retrieve the output of each individual task run instead.'}
10:27:05 PM
prefect.flow_runs
Finished in state Failed("Flow run encountered an exception. HTTPStatusError: A job run with multiple tasks was provided.JSON response: {'error_code': 'INVALID_PARAMETER_VALUE', 'message': 'Retrieving the output of runs with multiple tasks is not supported. Please retrieve the output of each individual task run instead.'}")

`ClusterAttributes` gone from `models.jobs`

In 0.1.2 we have a models.jobs generated out of jobs.yaml, but before it was generated out of jobs-2.1-aws.yaml. Current main doesn't have file named as jobs.yaml, but has the former one: jobs-2.1-aws.yaml.

Issue

Our code base was relying on ClusterAttributes which is still in the schema ClusterAttributes link.
Was this change of schema for generation intended and how to find a new schema file?

I observed that DockerImage is also gone from models.jobs.

Background

We use specific setting out of it: docker_image. If this change was intended how to pass it now?

Handling job creation when using jobs_runs_submit_by_id_and_wait_for_completion

At the moment, when using the jobs_runs_submit_by_id_and_wait_for_completion, the details of the Databricks job are not available in the parent flow or task until the job completes, and they are returned. However, for long running job one could desire to store the Databricks Job URL as an artifact, for example

https://github.com/PrefectHQ/prefect-databricks/blob/main/prefect_databricks/flows.py#L462

Expectation / Proposal

Traceback / Example

jobs_runs_submit_and_wait_for_completion raises an error on import

I have a flow which wraps the call for jobs_runs_submit_and_wait_for_completion:

@flow
def my_flow(databricks_credentials, job_task_settings):
    """Oversimplified"""
    task_run = jobs_runs_submit_and_wait_for_completion(
        databricks_credentials=databricks_credentials,
        tasks=[job_task_settings]
    )

    return task_run

But I receive the following error:

    raise SkipField(f'Callable {field.name} was excluded from schema since JSON schema has no equivalent type.')
E   pydantic.v1.schema.SkipField: Callable job_submission_handler was excluded from schema since JSON schema has no equivalent type.

Full stack trace:

 During handling of the above exception, another exception occurred:
test_run_notebook.py:6: in <module>
    from ...
../../../.../tasks/databricks/run_notebook.py:10: in <module>
    from prefect_databricks.flows import jobs_runs_submit_and_wait_for_completion
PATH/lib/python3.9/site-packages/prefect_databricks/flows.py:61: in <module>
    async def jobs_runs_submit_and_wait_for_completion(
PATH/lib/python3.9/site-packages/prefect/flows.py:1382: in flow
    Flow(
PATH/lib/python3.9/site-packages/prefect/context.py:186: in __register_init__
    original_init(__self__, *args, **kwargs)
PATH/lib/python3.9/site-packages/prefect/flows.py:307: in __init__
    self.parameters = parameter_schema(self.fn)
PATH/lib/python3.9/site-packages/prefect/utilities/callables.py:340: in parameter_schema
    create_schema(
PATH/lib/python3.9/site-packages/prefect/utilities/callables.py:296: in create_v1_schema
    return model.schema(by_alias=True)
PATH/lib/python3.9/site-packages/pydantic/v1/main.py:664: in schema
    s = model_schema(cls, by_alias=by_alias, ref_template=ref_template)
PATH/lib/python3.9/site-packages/pydantic/v1/schema.py:188: in model_schema
    m_schema, m_definitions, nested_models = model_process_schema(
PATH/lib/python3.9/site-packages/pydantic/v1/schema.py:582: in model_process_schema
    m_schema, m_definitions, nested_models = model_type_schema(
PATH/lib/python3.9/site-packages/pydantic/v1/schema.py:632: in model_type_schema
    warnings.warn(skip.message, UserWarning)
E   UserWarning: Callable job_submission_handler was excluded from schema since JSON schema has no equivalent type.

I am using poetry for dependency management.

My pyproject.toml: (oversimplified)

python = ">=3.8,<4"
prefect = ">=2.14.14"
pydantic = ">=2"
prefect-databricks = ">=0.2.3"

Python version 3.9.6

How to use docker_image for new jobs

I need to use docker_image property ClusterAttributes, yet is not very clear how to use this class, since there is no other reference in the code except for its definition. Either implementation or documentation is missing.

Could anyone clarify how to use it?

DatabricksCredentials doesn't work as a Parameter for Prefect 2 Deployments

It seems there is some kind of problem specifying a Deployment that has as one of its parameters a DatabricksCredentials instance.

I am creating the deployment using the Python API:

Deployment.build_from_flow(
    flow=my_flow,
    ...
    parameters={
        ...
        "databricks_credentials": DatabricksCredentials("block_name"),
    }
)

Running the deployment fails with a parameter validation error:

Validation of flow parameters failed with error: 
Traceback (most recent call last):
  File "/root/.cache/pypoetry/virtualenvs/ozawa-prefect-8Nt7Egqj-py3.9/lib/python3.9/site-packages/prefect/engine.py", line 423, in retrieve_flow_then_begin_flow_run
    parameters = flow.validate_parameters(flow_run.parameters)
  File "/root/.cache/pypoetry/virtualenvs/ozawa-prefect-8Nt7Egqj-py3.9/lib/python3.9/site-packages/prefect/flows.py", line 394, in validate_parameters
    raise ParameterTypeError.from_validation_error(exc) from None
prefect.exceptions.ParameterTypeError: Flow run received invalid parameters:
 - databricks_credentials: field required
 - databricks_credentials: field required

And to be clear, the error is misleading. databricks_credentials is the name of the parameter of type DatabricksCredentials, and I did provide a value. It's just that it seems not to get recognized or accepted for some reason.

If I manually run the flow from my laptop -- i.e. bypassing the deployment mechanism -- it works fine.

As a work-around for deployments, I am specifying the credentials block name, and then loading the credentials into an instance of DatabricksCredentials as part of the flow, as opposed to passing it in as an input parameter.

Prefect version: 2.10.20
Prefect Databricks version: 0.1.6

job_submission_handler no longer supported by recent versions of Prefect (2.14?)

The job_submission_handler introduced with #89 is no longer working with recent versions of Prefect due to:

/usr/local/lib/python3.9/site-packages/prefect/utilities/callables.py:296: UserWarning: Callable 
job_submission_handler was excluded from schema since JSON schema has no equivalent type.

This is likely because job_submission_handler is Optional[Callable]. It was working with Prefect 2.12.x. The error first appeared after upgrading my Prefect Server to 2.14.10. It seems that now Prefect flow's parameters must be JSON-encodable, and Callable is not.

Expectation / Proposal

Is this a regression introduced by Prefect in a recent release, or is it the desired behaviour?

Traceback / Example

Tutorial error # 2

The tutorial here incorrectly states that the notebook syntax is

"/Users/<EMAIL_ADDRESS_PLACEHOLDER>/example.ipynb",

but it only works with "/Users/<EMAIL_ADDRESS_PLACEHOLDER>/example".

Prefect Flow/Databricks API premature failure

Issue

Using the jobs_runs_submit_and_wait_for_completion flow calls the Databricks API at set intervals to ensure the job is still running. Our team has run into several cases where Prefect cannot connect to the API (returns a 5xx error) and fails the entire flow. However when we log into our Databricks instance, we see that the job is still running. This has happened 4/7 times in the last week for a daily scheduled flow and seems to only happen on job runs that are longer than an hour.

Proposal/Solution

One idea would be to add a retry parameter to make sure that there are 2 consecutive API failures prior to failing the entire flow. Also open to any other solutions/ideas.

`DatabricksJobInternalError` provides no details on what the internal error

When a Databricks job run triggered using jobs_runs_submit_by_id_and_wait_for_completion raises an internal error, a DatabricksJobInternalError is raised with a generic error message. There is no easy way to get the internal error message captured by the databricks job run output from the DatabricksJobInternalError.

Here is an example DatabricksJobInternalError error message for a single-task Databricks job. Notice that it does not include the necessary information, such as run id, to pass to subsequent prefect-databricks tasks to retrieve the internal error message.

prefect_databricks.flows.DatabricksJobInternalError: Databricks Jobs Runs Submit ID 562802111131369 encountered an internal error: Task test_job failed. This caused all downstream tasks to get skipped..

Proposal

When raising the DatabricksJobInternalError, add a "cause" exception with the error message captured by the Databricks run. The error message can be retrieved using the existing jobs_runs_get_output task.

error_message = jobs_runs_get_output(run_id)["error"]  # Or somthing similar
raise DatabricksJobInternalError(
  f"Databricks Jobs Runs Submit ID "
  f"{job_id} "
  f"encountered an internal error: {jobs_runs_state_message}.",
) from Exception(error_message)

Add collection sync workflow using cruft

Add cruft to repo to allow synchronization of this collection with the original template.
Cruft can be added by running cruft link. Note that a starting commit will need to be specified.
Using the commit of the prefect-collection-template closest to the generation date of this repo
is a good default.

Databricks flow that runs an existing job and waits for it to return

This is more of a feature request . . .

The utility functions of this repo allow you to create, run, and wait for result for a custom databricks job (the definition of which you pass in to the function), but there isn't a simple way to run and wait for result for an existing databricks job by job id. This would be very helpful.

jobs_runs_wait_for_completion doesn't retry correctly

Expectation / Proposal

I may be misunderstanding something, but I have a custom flow that calls jobs_runs_wait_for_completion under the hood. I am calling my custom flow with additional options like this:

custom_flow.with_options(retries=1)

However, when the underlying Databricks job fails, I see it acknowledged by Prefect but I don't see the Databricks job ever actually rerun.

Am I misunderstanding something basic about how to use jobs_runs_wait_for_completion in combination with with_options(retries=) to get automatic retries of the called Databricks job?

Traceback / Example

17:37:34.392 | INFO    | Flow run 'amorphous-sawfly' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
17:37:36.726 | INFO    | Flow run 'encouraging-bustard' - Finished in state Completed()
17:37:36.860 | ERROR   | Flow run 'amorphous-sawfly' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/nchammas/Library/Caches/pypoetry/virtualenvs/ozawa-prefect-LtscNPhu-py3.9/lib/python3.9/site-packages/prefect/engine.py", line 833, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/Users/nchammas/Library/Caches/pypoetry/virtualenvs/ozawa-prefect-LtscNPhu-py3.9/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
    return await asyncio.wrap_future(self.future)
  File "/Users/nchammas/Library/Caches/pypoetry/virtualenvs/ozawa-prefect-LtscNPhu-py3.9/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/nchammas/dev/spx/ozawa/prefect2/ozawa_prefect/flows/databricks/__init__.py", line 66, in run_job_or_fail
    raise Exception(
Exception: {'result_state': 'FAILED', 'run_page_url': 'https://...cloud.databricks.com/?o=xxx#job/126796463608738/run/28379095', 'notebook_output': {}, 'error': 'ArithmeticException: / by zero'}
17:37:37.000 | ERROR   | Flow run 'amorphous-sawfly' - Finished in state Failed("Flow run encountered an exception. Exception: {'result_state': 'FAILED', 'run_page_url': 'https://...cloud.databricks.com/?o=xxx#job/126796463608738/run/28379095', 'notebook_output': {}, 'error': 'ArithmeticException: / by zero'}")

ACL not serializable for `jobs_runs_submit_and_wait_for_completion`

Hello,

I was trying to set proper ACL for RunSubmit using jobs_runs_submit_and_wait_for_completion as follows:

import os
from asyncio import run

from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import jobs_runs_submit_and_wait_for_completion
from prefect_databricks.models.jobs import (
    AccessControlList,
    AccessControlRequestForGroup,
    AccessControlRequestForUser,
    CanManage,
    CanView,
    NewCluster,
    RunSubmitTaskSettings,
    SparkPythonTask,
)

owners = ["[email protected]"]
acl = [
    AccessControlRequestForUser(
        permission_level=CanManage.canmanage,
        user_name=user,
    )
    for user in owners
]
acl.append(
    AccessControlRequestForGroup(group_name="users", permission_level=CanView.canview)
)

run(
    jobs_runs_submit_and_wait_for_completion(
        databricks_credentials=DatabricksCredentials(
            databricks_instance=os.environ["DATABRICKS_INSTANCE"],
            token=os.environ["DATABRICKS_TOKEN"],
        ),
        tasks=[
            RunSubmitTaskSettings(
                new_cluster=NewCluster(
                    spark_version="10.4.x-scala2.12",
                    num_workers=1,
                    node_type_id="m5d.large",
                ),
                spark_python_task=SparkPythonTask(python_file="s3://foo/bar.py"),
                task_key="foo",
            )
        ],
        access_control_list=acl,
    )
)

Running it results in following:

12:51:21.752 | INFO    | prefect.engine - Created flow run 'space-otter' for flow 'Submit jobs runs and wait for completion'
12:51:22.371 | INFO    | Flow run 'space-otter' - Created task run 'jobs_runs_submit-c00eee75-0' for task 'jobs_runs_submit'
12:51:22.372 | INFO    | Flow run 'space-otter' - Submitted task run 'jobs_runs_submit-c00eee75-0' for execution.
12:51:22.547 | ERROR   | Task run 'jobs_runs_submit-c00eee75-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/***/.venv/lib/python3.8/site-packages/prefect/engine.py", line 1215, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/***/.venv/lib/python3.8/site-packages/prefect_databricks/jobs.py", line 1634, in jobs_runs_submit
    response = await execute_endpoint.fn(
  File "/***/.venv/lib/python3.8/site-packages/prefect_databricks/rest.py", line 132, in execute_endpoint
    response = await getattr(client, http_method)(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_client.py", line 1848, in post
    return await self.request(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_client.py", line 1520, in request
    request = self.build_request(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_client.py", line 360, in build_request
    return Request(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_models.py", line 339, in __init__
    headers, stream = encode_request(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_content.py", line 211, in encode_request
    return encode_json(json)
  File "/***/.venv/lib/python3.8/site-packages/httpx/_content.py", line 174, in encode_json
    body = json_dumps(json).encode("utf-8")
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 179, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type CanManage is not JSON serializable
12:51:22.610 | ERROR   | Task run 'jobs_runs_submit-c00eee75-0' - Finished in state Failed('Task run encountered an exception: TypeError: Object of type CanManage is not JSON serializable\n')
12:51:22.610 | ERROR   | Flow run 'space-otter' - Encountered exception during execution:
Traceback (most recent call last):
  File "/***/.venv/lib/python3.8/site-packages/prefect/engine.py", line 603, in orchestrate_flow_run
    result = await flow_call()
  File "/***/.venv/lib/python3.8/site-packages/prefect_databricks/flows.py", line 265, in jobs_runs_submit_and_wait_for_completion
    multi_task_jobs_runs = await multi_task_jobs_runs_future.result()
  File "/***/.venv/lib/python3.8/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/***/.venv/lib/python3.8/site-packages/prefect/states.py", line 74, in _get_state_result
    raise await get_state_exception(state)
  File "/***/.venv/lib/python3.8/site-packages/prefect/engine.py", line 1215, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/***/.venv/lib/python3.8/site-packages/prefect_databricks/jobs.py", line 1634, in jobs_runs_submit
    response = await execute_endpoint.fn(
  File "/***/.venv/lib/python3.8/site-packages/prefect_databricks/rest.py", line 132, in execute_endpoint
    response = await getattr(client, http_method)(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_client.py", line 1848, in post
    return await self.request(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_client.py", line 1520, in request
    request = self.build_request(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_client.py", line 360, in build_request
    return Request(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_models.py", line 339, in __init__
    headers, stream = encode_request(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_content.py", line 211, in encode_request
    return encode_json(json)
  File "/***/.venv/lib/python3.8/site-packages/httpx/_content.py", line 174, in encode_json
    body = json_dumps(json).encode("utf-8")
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 179, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type CanManage is not JSON serializable
12:51:22.670 | ERROR   | Flow run 'space-otter' - Finished in state Failed('Flow run encountered an exception. TypeError: Object of type CanManage is not JSON serializable\n')
Traceback (most recent call last):
  File "tmp/issue.py", line 25, in <module>
    run(
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/***/.venv/lib/python3.8/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/***/.venv/lib/python3.8/site-packages/prefect/engine.py", line 237, in create_then_begin_flow_run
    return await state.result(fetch=True)
  File "/***/.venv/lib/python3.8/site-packages/prefect/states.py", line 74, in _get_state_result
    raise await get_state_exception(state)
  File "/***/.venv/lib/python3.8/site-packages/prefect/engine.py", line 603, in orchestrate_flow_run
    result = await flow_call()
  File "/***/.venv/lib/python3.8/site-packages/prefect_databricks/flows.py", line 265, in jobs_runs_submit_and_wait_for_completion
    multi_task_jobs_runs = await multi_task_jobs_runs_future.result()
  File "/***/.venv/lib/python3.8/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/***/.venv/lib/python3.8/site-packages/prefect/states.py", line 74, in _get_state_result
    raise await get_state_exception(state)
  File "/***/.venv/lib/python3.8/site-packages/prefect/engine.py", line 1215, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/***/.venv/lib/python3.8/site-packages/prefect_databricks/jobs.py", line 1634, in jobs_runs_submit
    response = await execute_endpoint.fn(
  File "/***/.venv/lib/python3.8/site-packages/prefect_databricks/rest.py", line 132, in execute_endpoint
    response = await getattr(client, http_method)(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_client.py", line 1848, in post
    return await self.request(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_client.py", line 1520, in request
    request = self.build_request(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_client.py", line 360, in build_request
    return Request(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_models.py", line 339, in __init__
    headers, stream = encode_request(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_content.py", line 211, in encode_request
    return encode_json(json)
  File "/***/.venv/lib/python3.8/site-packages/httpx/_content.py", line 174, in encode_json
    body = json_dumps(json).encode("utf-8")
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 179, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type CanManage is not JSON serializable

Once I remove ACL parameter - request will go to Databricks normally.

I tried to solve serialization issue with modifying parmission level objects like this:

class IsOwner(str, Enum):
    """
    Perimssion that represents ownership of the job.
    """

    isowner = "IS_OWNER"

After that request with ACL reach Databricks, but results in httpx.HTTPStatusError: The request was malformed. See JSON response for error details.JSON response: {'error_code': 'BAD_REQUEST', 'message': 'Principal name not defined'}

On top of that I tried different definitions of ACLs, like:

acl = [
    AccessControlRequest.parse_obj(
        AccessControlRequestForUser(
            permission_level=CanManage.canmanage,
            user_name=user,
        )
    )
    for user in owners
]
acl.append(
    AccessControlRequest.parse_obj(
        AccessControlRequestForGroup(
            group_name="users", permission_level=CanView.canview
        )
    )
)

Also tried keeping only users as viewers - same error.

Any advise would be helpful, since this is critical feature: job is normally executed on behalf of automation user.

`socket.gaierror: [Errno -2] Name or service not known` error

Summary

I went through the getting started example here but I receive a socket.gaierror: [Errno -2] Name or service not known error.

from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.jobs import jobs_list


@flow
def example_execute_endpoint_flow():
    databricks_credentials = DatabricksCredentials.load("databricks")
    jobs = jobs_list(
        databricks_credentials,
        limit=5
    )
    return jobs

example_execute_endpoint_flow()

I checked that it's not because of problems with the credentials by

  1. Using the Databricks API endpoint directly with the same instance and token which worked.
  2. I checked if it's due to the blocks by using something like
DatabricksCredentials(databricks_instance=MY_INSTANCE, token=MY_TOKEN)

This resulted in the same error

Error

08:49:44.624 | INFO    | prefect.engine - Created flow run 'spry-malkoha' for flow 'example-execute-endpoint-flow'
08:49:45.145 | INFO    | Flow run 'spry-malkoha' - Created task run 'jobs_list-8bffe913-0' for task 'jobs_list'
08:49:45.149 | INFO    | Flow run 'spry-malkoha' - Executing 'jobs_list-8bffe913-0' immediately...
08:49:45.378 | ERROR   | Task run 'jobs_list-8bffe913-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/httpcore/_exceptions.py", line 8, in map_exceptions
    yield
  File "/usr/local/lib/python3.8/dist-packages/httpcore/backends/asyncio.py", line 109, in connect_tcp
    stream: anyio.abc.ByteStream = await anyio.connect_tcp(
  File "/usr/local/lib/python3.8/dist-packages/anyio/_core/_sockets.py", line 189, in connect_tcp
    gai_res = await getaddrinfo(
  File "/usr/local/lib/python3.8/dist-packages/anyio/_core/_sockets.py", line 496, in getaddrinfo
    gai_res = await get_asynclib().getaddrinfo(
  File "/usr/local/lib/python3.8/dist-packages/anyio/_backends/_asyncio.py", line 1754, in getaddrinfo
    result = await get_running_loop().getaddrinfo(
  File "/usr/lib/python3.8/asyncio/base_events.py", line 825, in getaddrinfo
    return await self.run_in_executor(
  File "/usr/lib/python3.8/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/lib/python3.8/socket.py", line 918, in getaddrinfo
    for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno -2] Name or service not known

During handling of the above exception, another exception occurred:

I can add the full stack trace but it is quite long.

Versions

prefect             2.3.1
prefect-databricks  0.1.1

Running async code with .fn() syntax

Trying to run the databricks_runs_get function using the ".fn" syntax:

    run_details = jobs_runs_get.fn(
        run_id = run_id,
        databricks_credentials = databricks_credentials,
        include_history = True
    )

the reason for doing this is that I don't want this "job_runs_get" job to be recorded in the UI as a genuine job run (it's just polling an API to get results, and ends up making the UI hard to read).

I would expect to be able to run this task using ".fn" as above - the code would still run and would still poll the databricks endpoint, but I wouldn't get a "task run" record in the UI.

Instead when I call this function I get an error:

  (Prefect2) C:\Users\jboorn\code\Prefect2\prefect2-databricks>python test.py
  11:11:06.502 | INFO    | prefect.engine - Created flow run 'elastic-dodo' for flow 'run_test'
  11:11:07.368 | INFO    | Flow run 'elastic-dodo' - Created task run 'jobs_run_now-511d20f2-0' for task 'jobs_run_now'
  11:11:07.370 | INFO    | Flow run 'elastic-dodo' - Executing 'jobs_run_now-511d20f2-0' immediately...
  11:11:08.262 | INFO    | Task run 'jobs_run_now-511d20f2-0' - Finished in state Completed()
  11:11:08.263 | ERROR   | Flow run 'elastic-dodo' - Encountered exception during execution:
  Traceback (most recent call last):
    File "C:\Users\jboorn\code\Prefect2\lib\site-packages\prefect\engine.py", line 636, in orchestrate_flow_run
      result = await run_sync(flow_call)
    File "C:\Users\jboorn\code\Prefect2\lib\site-packages\prefect\utilities\asyncutils.py", line 154, in run_sync_in_interruptible_worker_thread
      async with anyio.create_task_group() as tg:
    File "C:\Users\jboorn\code\Prefect2\lib\site-packages\anyio\_backends\_asyncio.py", line 662, in __aexit__
      raise exceptions[0]
    File "C:\Users\jboorn\code\Prefect2\lib\site-packages\anyio\to_thread.py", line 31, in run_sync
      return await get_asynclib().run_sync_in_worker_thread(
    File "C:\Users\jboorn\code\Prefect2\lib\site-packages\anyio\_backends\_asyncio.py", line 937, in run_sync_in_worker_thread
      return await future
    File "C:\Users\jboorn\code\Prefect2\lib\site-packages\anyio\_backends\_asyncio.py", line 867, in run
      result = context.run(func, *args)
    File "C:\Users\jboorn\code\Prefect2\lib\site-packages\prefect\utilities\asyncutils.py", line 135, in capture_worker_thread_and_result
      result = __fn(*args, **kwargs)
    File "C:\Users\jboorn\code\Prefect2\prefect2-databricks\test.py", line 8, in run_test
      run_databricks_job(607897, 1800)
    File "C:\Users\jboorn\code\Prefect2\prefect2-databricks\databricks.py", line 25, in run_databricks_job
      lifecycle_state = run_details['state']['life_cycle_state']
  TypeError: 'coroutine' object is not subscriptable
  11:11:08.427 | ERROR   | Flow run 'elastic-dodo' - Finished in state Failed("Flow run encountered an exception. TypeError: 'coroutine' object is not subscriptable\n")
  Traceback (most recent call last):
    File "C:\Users\jboorn\code\Prefect2\prefect2-databricks\test.py", line 11, in <module>
      run_test()
    File "C:\Users\jboorn\code\Prefect2\lib\site-packages\prefect\flows.py", line 448, in __call__
      return enter_flow_run_engine_from_flow_call(
    File "C:\Users\jboorn\code\Prefect2\lib\site-packages\prefect\engine.py", line 164, in enter_flow_run_engine_from_flow_call
      return anyio.run(begin_run)
    File "C:\Users\jboorn\code\Prefect2\lib\site-packages\anyio\_core\_eventloop.py", line 70, in run
      return asynclib.run(func, *args, **backend_options)
    File "C:\Users\jboorn\code\Prefect2\lib\site-packages\anyio\_backends\_asyncio.py", line 292, in run
      return native_run(wrapper(), debug=debug)
    File "C:\Python310\lib\asyncio\runners.py", line 44, in run
      return loop.run_until_complete(main)
    File "C:\Python310\lib\asyncio\base_events.py", line 641, in run_until_complete
      return future.result()
    File "C:\Users\jboorn\code\Prefect2\lib\site-packages\anyio\_backends\_asyncio.py", line 287, in wrapper
      return await func(*args)
    File "C:\Users\jboorn\code\Prefect2\lib\site-packages\prefect\client\utilities.py", line 47, in with_injected_client
      return await fn(*args, **kwargs)
    File "C:\Users\jboorn\code\Prefect2\lib\site-packages\prefect\engine.py", line 244, in create_then_begin_flow_run
      return await state.result(fetch=True)
    File "C:\Users\jboorn\code\Prefect2\lib\site-packages\prefect\states.py", line 89, in _get_state_result
      raise await get_state_exception(state)
    File "C:\Users\jboorn\code\Prefect2\lib\site-packages\prefect\engine.py", line 636, in orchestrate_flow_run
      result = await run_sync(flow_call)
    File "C:\Users\jboorn\code\Prefect2\lib\site-packages\prefect\utilities\asyncutils.py", line 154, in run_sync_in_interruptible_worker_thread
      async with anyio.create_task_group() as tg:
    File "C:\Users\jboorn\code\Prefect2\lib\site-packages\anyio\_backends\_asyncio.py", line 662, in __aexit__
      raise exceptions[0]
    File "C:\Users\jboorn\code\Prefect2\lib\site-packages\anyio\to_thread.py", line 31, in run_sync
      return await get_asynclib().run_sync_in_worker_thread(
    File "C:\Users\jboorn\code\Prefect2\lib\site-packages\anyio\_backends\_asyncio.py", line 937, in run_sync_in_worker_thread
      return await future
    File "C:\Users\jboorn\code\Prefect2\lib\site-packages\anyio\_backends\_asyncio.py", line 867, in run
      result = context.run(func, *args)
    File "C:\Users\jboorn\code\Prefect2\lib\site-packages\prefect\utilities\asyncutils.py", line 135, in capture_worker_thread_and_result
      result = __fn(*args, **kwargs)
    File "C:\Users\jboorn\code\Prefect2\prefect2-databricks\test.py", line 8, in run_test
      run_databricks_job(607897, 1800)
    File "C:\Users\jboorn\code\Prefect2\prefect2-databricks\databricks.py", line 25, in run_databricks_job
      lifecycle_state = run_details['state']['life_cycle_state']
  TypeError: 'coroutine' object is not subscriptable
  
  (Prefect2) C:\Users\jboorn\code\Prefect2\prefect2-databricks>python test.py
  C:\Users\jboorn\code\Prefect2\prefect2-databricks\test.py:11: RuntimeWarning: coroutine 'create_then_begin_flow_run' was never awaited
    run_test()
  RuntimeWarning: Enable tracemalloc to get the object allocation traceback

I think the issue has to do with the async nature of the call - the outside "wrapper" code isn't async, but the inner code (job_runs_get) is.

Tutorial Error

When running the DX tutorial, the credential page is ambiguous on what is the exact format of the DX credentials.

Expectation / Proposal

This line makes it look like the correct credentials are the letters before .cloud...:

Note down your Databricks instance from the browser URL, formatted like https://<DATABRICKSINSTANCE>.cloud.databricks.com/

However, the example script only runs with the full URL as the instance:
xxx.cloud.databricks.com.

A minor issue, but I hope to save some time to people running the example the first time.

Package version: prefect-databricks==0.1.6

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.