Git Product home page Git Product logo

airflow-client-python's Introduction

Apache Airflow Python Client

Overview

To facilitate management, Apache Airflow supports a range of REST API endpoints across its objects. This section provides an overview of the API design, methods, and supported use cases.

Most of the endpoints accept JSON as input and return JSON responses. This means that you must usually add the following headers to your request:

Content-type: application/json
Accept: application/json

Resources

The term resource refers to a single type of object in the Airflow metadata. An API is broken up by its endpoint's corresponding resource. The name of a resource is typically plural and expressed in camelCase. Example: dagRuns.

Resource names are used as part of endpoint URLs, as well as in API parameters and responses.

CRUD Operations

The platform supports Create, Read, Update, and Delete operations on most resources. You can review the standards for these operations and their standard parameters below.

Some endpoints have special behavior as exceptions.

Create

To create a resource, you typically submit an HTTP POST request with the resource's required metadata in the request body. The response returns a 201 Created response code upon success with the resource's metadata, including its internal id, in the response body.

Read

The HTTP GET request can be used to read a resource or to list a number of resources.

A resource's id can be submitted in the request parameters to read a specific resource. The response usually returns a 200 OK response code upon success, with the resource's metadata in the response body.

If a GET request does not include a specific resource id, it is treated as a list request. The response usually returns a 200 OK response code upon success, with an object containing a list of resources' metadata in the response body.

When reading resources, some common query parameters are usually available. e.g.:

v1/connections?limit=25&offset=25
Query Parameter Type Description
limit integer Maximum number of objects to fetch. Usually 25 by default
offset integer Offset after which to start returning objects. For use with limit query parameter.

Update

Updating a resource requires the resource id, and is typically done using an HTTP PATCH request, with the fields to modify in the request body. The response usually returns a 200 OK response code upon success, with information about the modified resource in the response body.

Delete

Deleting a resource requires the resource id and is typically executing via an HTTP DELETE request. The response usually returns a 204 No Content response code upon success.

Conventions

  • Resource names are plural and expressed in camelCase.

  • Names are consistent between URL parameter name and field name.

  • Field names are in snake_case.

{
    \"name\": \"string\",
    \"slots\": 0,
    \"occupied_slots\": 0,
    \"used_slots\": 0,
    \"queued_slots\": 0,
    \"open_slots\": 0
}

Update Mask

Update mask is available as a query parameter in patch endpoints. It is used to notify the API which fields you want to update. Using update_mask makes it easier to update objects by helping the server know which fields to update in an object instead of updating all fields. The update request ignores any fields that aren't specified in the field mask, leaving them with their current values.

Example:

import requests

resource = requests.get("/resource/my-id").json()
resource["my_field"] = "new-value"
requests.patch("/resource/my-id?update_mask=my_field", data=json.dumps(resource))

Versioning and Endpoint Lifecycle

  • API versioning is not synchronized to specific releases of the Apache Airflow.
  • APIs are designed to be backward compatible.
  • Any changes to the API will first go through a deprecation phase.

Trying the API

You can use a third party client, such as curl, HTTPie, Postman or the Insomnia rest client to test the Apache Airflow API.

Note that you will need to pass credentials data.

For e.g., here is how to pause a DAG with curl, when basic authorization is used:

curl -X PATCH 'https://example.com/api/v1/dags/{dag_id}?update_mask=is_paused' \\
-H 'Content-Type: application/json' \\
--user \"username:password\" \\
-d '{
    \"is_paused\": true
}'

Using a graphical tool such as Postman or Insomnia, it is possible to import the API specifications directly:

  1. Download the API specification by clicking the Download button at top of this document.
  2. Import the JSON specification in the graphical tool of your choice.
  • In Postman, you can click the import button at the top
  • With Insomnia, you can just drag-and-drop the file on the UI

Note that with Postman, you can also generate code snippets by selecting a request and clicking on the Code button.

Enabling CORS

Cross-origin resource sharing (CORS) is a browser security feature that restricts HTTP requests that are initiated from scripts running in the browser.

For details on enabling/configuring CORS, see Enabling CORS.

Authentication

To be able to meet the requirements of many organizations, Airflow supports many authentication methods, and it is even possible to add your own method.

If you want to check which auth backend is currently set, you can use airflow config get-value api auth_backends command as in the example below.

$ airflow config get-value api auth_backends
airflow.api.auth.backend.basic_auth

The default is to deny all requests.

For details on configuring the authentication, see API Authorization.

Errors

We follow the error response format proposed in RFC 7807 also known as Problem Details for HTTP APIs. As with our normal API responses, your client must be prepared to gracefully handle additional members of the response.

Unauthenticated

This indicates that the request has not been applied because it lacks valid authentication credentials for the target resource. Please check that you have valid credentials.

PermissionDenied

This response means that the server understood the request but refuses to authorize it because it lacks sufficient rights to the resource. It happens when you do not have the necessary permission to execute the action you performed. You need to get the appropriate permissions in other to resolve this error.

BadRequest

This response means that the server cannot or will not process the request due to something that is perceived to be a client error (e.g., malformed request syntax, invalid request message framing, or deceptive request routing). To resolve this, please ensure that your syntax is correct.

NotFound

This client error response indicates that the server cannot find the requested resource.

MethodNotAllowed

Indicates that the request method is known by the server but is not supported by the target resource.

NotAcceptable

The target resource does not have a current representation that would be acceptable to the user agent, according to the proactive negotiation header fields received in the request, and the server is unwilling to supply a default representation.

AlreadyExists

The request could not be completed due to a conflict with the current state of the target resource, e.g. the resource it tries to create already exists.

Unknown

This means that the server encountered an unexpected condition that prevented it from fulfilling the request.

This Python package is automatically generated by the OpenAPI Generator project:

  • API version: 2.8.0
  • Package version: 2.8.0
  • Build package: org.openapitools.codegen.languages.PythonClientCodegen For more information, please visit https://airflow.apache.org

Requirements.

Python >=3.8

Installation & Usage

pip install

You can install the client using standard Python installation tools. It is hosted in PyPI with apache-airflow-client package id so the easiest way to get the latest version is to run:

pip install apache-airflow-client

If the python package is hosted on a repository, you can install directly using:

pip install git+https://github.com/apache/airflow-client-python.git

Import check

Then import the package:

import airflow_client.client

Getting Started

Please follow the installation procedure and then run the following:

import time
import airflow_client.client
from pprint import pprint
from airflow_client.client.api import config_api
from airflow_client.client.model.config import Config
from airflow_client.client.model.error import Error

# Defining the host is optional and defaults to /api/v1
# See configuration.py for a list of all supported configuration parameters.
configuration = client.Configuration(host="/api/v1")

# The client must configure the authentication and authorization parameters
# in accordance with the API server security policy.
# Examples for each auth method are provided below, use the example that
# satisfies your auth use case.

# Configure HTTP basic authorization: Basic
configuration = client.Configuration(username="YOUR_USERNAME", password="YOUR_PASSWORD")


# Enter a context with an instance of the API client
with client.ApiClient(configuration) as api_client:
    # Create an instance of the API class
    api_instance = config_api.ConfigApi(api_client)

    try:
        # Get current configuration
        api_response = api_instance.get_config()
        pprint(api_response)
    except client.ApiException as e:
        print("Exception when calling ConfigApi->get_config: %s\n" % e)

Documentation for API Endpoints

All URIs are relative to /api/v1

Class Method HTTP request Description
ConfigApi get_config GET /config Get current configuration
ConnectionApi delete_connection DELETE /connections/{connection_id} Delete a connection
ConnectionApi get_connection GET /connections/{connection_id} Get a connection
ConnectionApi get_connections GET /connections List connections
ConnectionApi patch_connection PATCH /connections/{connection_id} Update a connection
ConnectionApi post_connection POST /connections Create a connection
ConnectionApi test_connection POST /connections/test Test a connection
DAGApi delete_dag DELETE /dags/{dag_id} Delete a DAG
DAGApi get_dag GET /dags/{dag_id} Get basic information about a DAG
DAGApi get_dag_details GET /dags/{dag_id}/details Get a simplified representation of DAG
DAGApi get_dag_source GET /dagSources/{file_token} Get a source code
DAGApi get_dags GET /dags List DAGs
DAGApi get_task GET /dags/{dag_id}/tasks/{task_id} Get simplified representation of a task
DAGApi get_tasks GET /dags/{dag_id}/tasks Get tasks for DAG
DAGApi patch_dag PATCH /dags/{dag_id} Update a DAG
DAGApi patch_dags PATCH /dags Update DAGs
DAGApi post_clear_task_instances POST /dags/{dag_id}/clearTaskInstances Clear a set of task instances
DAGApi post_set_task_instances_state POST /dags/{dag_id}/updateTaskInstancesState Set a state of task instances
DAGRunApi clear_dag_run POST /dags/{dag_id}/dagRuns/{dag_run_id}/clear Clear a DAG run
DAGRunApi delete_dag_run DELETE /dags/{dag_id}/dagRuns/{dag_run_id} Delete a DAG run
DAGRunApi get_dag_run GET /dags/{dag_id}/dagRuns/{dag_run_id} Get a DAG run
DAGRunApi get_dag_runs GET /dags/{dag_id}/dagRuns List DAG runs
DAGRunApi get_dag_runs_batch POST /dags/~/dagRuns/list List DAG runs (batch)
DAGRunApi get_upstream_dataset_events GET /dags/{dag_id}/dagRuns/{dag_run_id}/upstreamDatasetEvents Get dataset events for a DAG run
DAGRunApi post_dag_run POST /dags/{dag_id}/dagRuns Trigger a new DAG run
DAGRunApi set_dag_run_note PATCH /dags/{dag_id}/dagRuns/{dag_run_id}/setNote Update the DagRun note.
DAGRunApi update_dag_run_state PATCH /dags/{dag_id}/dagRuns/{dag_run_id} Modify a DAG run
DagWarningApi get_dag_warnings GET /dagWarnings List dag warnings
DatasetApi get_dataset GET /datasets/{uri} Get a dataset
DatasetApi get_dataset_events GET /datasets/events Get dataset events
DatasetApi get_datasets GET /datasets List datasets
DatasetApi get_upstream_dataset_events GET /dags/{dag_id}/dagRuns/{dag_run_id}/upstreamDatasetEvents Get dataset events for a DAG run
EventLogApi get_event_log GET /eventLogs/{event_log_id} Get a log entry
EventLogApi get_event_logs GET /eventLogs List log entries
ImportErrorApi get_import_error GET /importErrors/{import_error_id} Get an import error
ImportErrorApi get_import_errors GET /importErrors List import errors
MonitoringApi get_health GET /health Get instance status
MonitoringApi get_version GET /version Get version information
PermissionApi get_permissions GET /permissions List permissions
PluginApi get_plugins GET /plugins Get a list of loaded plugins
PoolApi delete_pool DELETE /pools/{pool_name} Delete a pool
PoolApi get_pool GET /pools/{pool_name} Get a pool
PoolApi get_pools GET /pools List pools
PoolApi patch_pool PATCH /pools/{pool_name} Update a pool
PoolApi post_pool POST /pools Create a pool
ProviderApi get_providers GET /providers List providers
RoleApi delete_role DELETE /roles/{role_name} Delete a role
RoleApi get_role GET /roles/{role_name} Get a role
RoleApi get_roles GET /roles List roles
RoleApi patch_role PATCH /roles/{role_name} Update a role
RoleApi post_role POST /roles Create a role
TaskInstanceApi get_extra_links GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/links List extra links
TaskInstanceApi get_log GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{task_try_number} Get logs
TaskInstanceApi get_mapped_task_instance GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index} Get a mapped task instance
TaskInstanceApi get_mapped_task_instances GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/listMapped List mapped task instances
TaskInstanceApi get_task_instance GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id} Get a task instance
TaskInstanceApi get_task_instances GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances List task instances
TaskInstanceApi get_task_instances_batch POST /dags//dagRuns//taskInstances/list List task instances (batch)
TaskInstanceApi patch_mapped_task_instance PATCH /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index} Updates the state of a mapped task instance
TaskInstanceApi patch_task_instance PATCH /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id} Updates the state of a task instance
TaskInstanceApi set_mapped_task_instance_note PATCH /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/setNote Update the TaskInstance note.
TaskInstanceApi set_task_instance_note PATCH /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/setNote Update the TaskInstance note.
UserApi delete_user DELETE /users/{username} Delete a user
UserApi get_user GET /users/{username} Get a user
UserApi get_users GET /users List users
UserApi patch_user PATCH /users/{username} Update a user
UserApi post_user POST /users Create a user
VariableApi delete_variable DELETE /variables/{variable_key} Delete a variable
VariableApi get_variable GET /variables/{variable_key} Get a variable
VariableApi get_variables GET /variables List variables
VariableApi patch_variable PATCH /variables/{variable_key} Update a variable
VariableApi post_variables POST /variables Create a variable
XComApi get_xcom_entries GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries List XCom entries
XComApi get_xcom_entry GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key} Get an XCom entry

Documentation For Models

Documentation For Authorization

By default the generated client supports the three authentication schemes:

  • Basic
  • GoogleOpenID
  • Kerberos

However, you can generate client and documentation with your own schemes by adding your own schemes in the security section of the OpenAPI specification. You can do it with Breeze CLI by adding the --security-schemes option to the breeze release-management prepare-python-client command.

Basic "smoke" tests

You can run basic smoke tests to check if the client is working properly - we have a simple test script that uses the API to run the tests. To do that, you need to:

  • install the apache-airflow-client package as described above
  • install rich Python package
  • download the test_python_client.py file
  • make sure you have test airflow installation running. Do not experiment with your production deployment
  • configure your airflow webserver to enable basic authentication In the [api] section of your airflow.cfg set:
[api]
auth_backend = airflow.api.auth.backend.session,airflow.api.auth.backend.basic_auth

You can also set it by env variable: export AIRFLOW__API__AUTH_BACKENDS=airflow.api.auth.backend.session,airflow.api.auth.backend.basic_auth

  • configure your airflow webserver to load example dags In the [core] section of your airflow.cfg set:
[core]
load_examples = True

You can also set it by env variable: export AIRFLOW__CORE__LOAD_EXAMPLES=True

  • optionally expose configuration (NOTE! that this is dangerous setting). The script will happily run with the default setting, but if you want to see the configuration, you need to expose it. In the [webserver] section of your airflow.cfg set:
[webserver]
expose_config = True

You can also set it by env variable: export AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True

  • Configure your host/ip/user/password in the test_python_client.py file
import airflow_client

# Configure HTTP basic authorization: Basic
configuration = airflow_client.client.Configuration(
    host="http://localhost:8080/api/v1", username="admin", password="admin"
)
  • Run scheduler (or dag file processor you have setup with standalone dag file processor) for few parsing loops (you can pass --num-runs parameter to it or keep it running in the background). The script relies on example DAGs being serialized to the DB and this only happens when scheduler runs with core/load_examples set to True.

  • Run webserver - reachable at the host/port for the test script you want to run. Make sure it had enough time to initialize.

Run python test_python_client.py and you should see colored output showing attempts to connect and status.

Notes for Large OpenAPI documents

If the OpenAPI document is large, imports in client.apis and client.models may fail with a RecursionError indicating the maximum recursion limit has been exceeded. In that case, there are a couple of solutions:

Solution 1: Use specific imports for apis and models like:

  • from airflow_client.client.api.default_api import DefaultApi
  • from airflow_client.client.model.pet import Pet

Solution 2: Before importing the package, adjust the maximum recursion limit as shown below:

import sys

sys.setrecursionlimit(1500)
import airflow_client.client
from airflow_client.client.apis import *
from airflow_client.client.models import *

Authors

[email protected]

airflow-client-python's People

Contributors

ashb avatar csm10495 avatar davidkatz-il avatar eladkal avatar ephraimbuddy avatar hterik avatar jedcunningham avatar kaxil avatar mik-laj avatar msumit avatar o-nikolas avatar pankajkoti avatar pierrejeambrun avatar potiuk avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

airflow-client-python's Issues

"post_variables" method is overwriting an existing variable with the same variable_id.

I'm creating a variable via airflow-client-python. If the variable exists in a local instance of airflow it overwrites the existing one. There is different behavior when I'm creating the connection with the same conn_id - it throws an exception saying Connection already exist. ID: <conn_id>.

Here is the code I use:

import getpass
import json
import socket

import airflow_client.client
import etl.utils.templates.variable as variable
from airflow_client import client
from airflow_client.client.api import variable_api
from airflow_client.client.model.variable import Variable

local_user = input("Enter Local Airflow Admin username: ")
local_pass = getpass.getpass("Enter Local Airflow Admin password: ")
local_host = socket.gethostbyname(socket.gethostname())
configuration = airflow_client.client.Configuration(
            host=f"{local_host}:8080/api/v1",
            username=local_user,
            password=local_pass)


with client.ApiClient(configuration) as api_client:
    variable_key = "NewVar"
    var_api_instance = variable_api.VariableApi(api_client)
    new_variable = Variable(key=variable_key,
                            value=variable.VARIABLE_VALUE)

    try:
        # Rewrites existing new_variable if it exists
        var_api_instance.post_variables(new_variable)
    except client.ApiException as e:
        print("Exception when calling VariableApi->post_variables: %s\n" % e)

Should we change the default behavior of the airflow-client-python when we are trying to create the variable that already exists?

Pass verify_ssl as parameter

In client configuration i would like to pass verify_ssl to False but the value is hard coded.

Could you change it to be a param ?

Thanks !

get_dag_details api is broken

Client version : 2.1.0

Code :

    api_instance = dag_api.DAGApi(api_client)
    dag_id = "dag_id_example"
    try:
        # Get a simplified representation of DAG
        api_response = api_instance.get_dag_details(dag_id)
        pprint(api_response)
    except client.ApiException as e:
        print("Exception when calling DAGApi->get_dag_details: %s\n" % e)

Error:

ApiValueError: Invalid inputs given to generate an instance of 'DAGDetailAllOf. The input data was invalid for the allOf schema 'DAGDetailAllOf' in the composed schema 'DAGDetail'. Error=Invalid type for variable 'dag_run_timeout'. Required value type is TimeDelta and passed type was NoneType at ['received_data']['dag_run_timeout']

Airflow Python Client 2.6.0rc2 fails with read-only dag_id attribute on post_dag_run

When running get_tasks API calls, the Python cllient (when used with Airflow 2.6.0) fails with:

Exception when calling DAGRunAPI->post_dag_run: `dag_id` is a read-only attribute. Use `from_openapi_data` to instantiate class with read only attributes.

Apparently because it expects not Nuill execution timeout,.

How to reproduce:

  1. breeze start-airflow --db-reset --use-airflow-version 2.6.0 --load-example-dags --load-default-connections

  2. In the terminal lnstall 2.6.0rc1 of python-client: pip install apache-airflow-client==2.6.0.rc2

  3. Ctrl-C webserver

  4. Change configuration in ~/airflow/airflow.cfg (with vim for example)

  • set [webserver] -> expose_config = True
  • enable basic authentication by adding ,airflow.api.auth.backend.basic_auth to [api] -> auth_backends
  1. run airflow webserver again

  2. Copy the following scripts to the container (for example to files folder to be able to copy it in the host) and name it test_python_client.py:

import airflow_client.client
from pprint import pprint
from airflow_client.client.api import config_api, dag_api, dag_run_api
from airflow_client.client.model.dag_run import DAGRun

configuration = airflow_client.client.Configuration(
    host="http://localhost:8080/api/v1",
    username='admin',
    password='admin'
)

dag_id = "example_bash_operator"

# Enter a context with an instance of the API client
with airflow_client.client.ApiClient(configuration) as api_client:
    # Get current configuration
    conf_api_instance = config_api.ConfigApi(api_client)
    try:
        api_response = conf_api_instance.get_config()
        pprint(api_response)
    except airflow_client.client.OpenApiException as e:
        print("Exception when calling ConfigApi->get_config: %s\n" % e)


    # Get dag list
    dag_api_instance = dag_api.DAGApi(api_client)
    try:
        api_response = dag_api_instance.get_dags()
        pprint(api_response)
    except airflow_client.client.OpenApiException as e:
        print("Exception when calling DagAPI->get_dags: %s\n" % e)

    print("Caling get tasks")
    # Get tasks for a DAG (TODO: issue#20)
    try:
        api_response = dag_api_instance.get_tasks(dag_id)
        pprint(api_response)
    except airflow_client.client.exceptions.OpenApiException as e:
        print("Exception when calling DagAPI->get_tasks: %s\n" % e)


    print("Caling post dag run")
    # Trigger a dag run (TODO: issue#21)
    dag_run_api_instance = dag_run_api.DAGRunApi(api_client)
    try:
        # Create a DAGRun object
        dag_run = DAGRun(
            dag_run_id='some_test_run',
            dag_id=dag_id,
            external_trigger=True,
        )
        api_response = dag_run_api_instance.post_dag_run(dag_id, dag_run)
        pprint(api_response)
    except airflow_client.client.exceptions.OpenApiException as e:
        print("Exception when calling DAGRunAPI->post_dag_run: %s\n" % e)

  1. Run the script

Result: post_dag_run will fail with this error without even attempting to call the API.

Caling post dag run
Exception when calling DAGRunAPI->post_dag_run: `dag_id` is a read-only attribute. Use `from_openapi_data` to instantiate class with read only attributes.

[TaskInstanceApi.get_task_instances] airflow_client.client.exceptions.ApiTypeError: Invalid type for variable 'trigger'

TaskInstanceApi.get_task_instances fails with

airflow_client.client.exceptions.ApiTypeError: Invalid type for variable 'trigger'. Required value type is Trigger and passed type was NoneType at ['received_data']['task_instances'][0]['trigger']
Code:

def test_task_instances(dag_id: str, dag_run_id: str):
    api_instance = TaskInstanceApi(airflow_api_client)
    task_instances = api_instance.get_task_instances(dag_id=dag_id, dag_run_id=dag_run_id)
    print(task_instances)

test_task_instances('dag-1-1692167919', 'dag-run-1-1692167924')

I'm successfully able to call other API methods so I believe it is not an issue of my local Airflow setup. What am I doing wrong?

Full trace:

Traceback (most recent call last):
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 1802, in get_allof_instances
    allof_instance = allof_class._from_openapi_data(**model_args, **constant_args)
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 63, in wrapped_init
    return fn(_self, *args, **kwargs)
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model/task_instance_collection_all_of.py", line 196, in _from_openapi_data
    setattr(self, var_name, var_value)
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 202, in __setattr__
    self[attr] = value
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 527, in __setitem__
    self.set_attribute(name, value)
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 174, in set_attribute
    value = validate_and_convert_types(
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 1637, in validate_and_convert_types
    input_value[index] = validate_and_convert_types(
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 1587, in validate_and_convert_types
    converted_instance = attempt_convert_item(
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 1480, in attempt_convert_item
    raise conversion_exc
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 1471, in attempt_convert_item
    return deserialize_model(input_value, valid_class,
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 1391, in deserialize_model
    return model_class._new_from_openapi_data(**kw_args)
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 63, in wrapped_init
    return fn(_self, *args, **kwargs)
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 387, in _new_from_openapi_data
    return cls._from_openapi_data(*args, **kwargs)
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 63, in wrapped_init
    return fn(_self, *args, **kwargs)
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model/task_instance.py", line 277, in _from_openapi_data
    setattr(self, var_name, var_value)
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 202, in __setattr__
    self[attr] = value
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 527, in __setitem__
    self.set_attribute(name, value)
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 174, in set_attribute
    value = validate_and_convert_types(
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 1587, in validate_and_convert_types
    converted_instance = attempt_convert_item(
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 1466, in attempt_convert_item
    raise get_type_error(input_value, path_to_item, valid_classes,
airflow_client.client.exceptions.ApiTypeError: Invalid type for variable 'trigger'. Required value type is Trigger and passed type was NoneType at ['received_data']['task_instances'][0]['trigger']

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/anov/IdeaProjects/svoe/svoe_airflow/test_airflow_api.py", line 103, in <module>
    test_task_instances('dag-1-1692167919', 'dag-run-1-1692167924')
  File "/Users/anov/IdeaProjects/svoe/svoe_airflow/test_airflow_api.py", line 91, in test_task_instances
    task_instances = api_instance.get_task_instances(dag_id=dag_id, dag_run_id=dag_run_id)
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/api/task_instance_api.py", line 1541, in get_task_instances
    return self.get_task_instances_endpoint.call_with_http_info(**kwargs)
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/api_client.py", line 868, in call_with_http_info
    return self.api_client.call_api(
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/api_client.py", line 427, in call_api
    return self.__call_api(resource_path, method,
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/api_client.py", line 242, in __call_api
    return_data = self.deserialize(
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/api_client.py", line 343, in deserialize
    deserialized_data = validate_and_convert_types(
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 1587, in validate_and_convert_types
    converted_instance = attempt_convert_item(
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 1480, in attempt_convert_item
    raise conversion_exc
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 1471, in attempt_convert_item
    return deserialize_model(input_value, valid_class,
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 1391, in deserialize_model
    return model_class._new_from_openapi_data(**kw_args)
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 63, in wrapped_init
    return fn(_self, *args, **kwargs)
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 387, in _new_from_openapi_data
    return cls._from_openapi_data(*args, **kwargs)
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 63, in wrapped_init
    return fn(_self, *args, **kwargs)
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model/task_instance_collection.py", line 201, in _from_openapi_data
    composed_info = validate_get_composed_info(
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 2010, in validate_get_composed_info
    allof_instances = get_allof_instances(self, model_args, constant_args)
  File "/Users/anov/miniconda3/envs/env_py_3-10/lib/python3.10/site-packages/airflow_client/client/model_utils.py", line 1807, in get_allof_instances
    raise ApiValueError(
airflow_client.client.exceptions.ApiValueError: Invalid inputs given to generate an instance of 'TaskInstanceCollectionAllOf'. The input data was invalid for the allOf schema 'TaskInstanceCollectionAllOf' in the composed schema 'TaskInstanceCollection'. Error=Invalid type for variable 'trigger'. Required value type is Trigger and passed type was NoneType at ['received_data']['task_instances'][0]['trigger']

Bug: `DAGApi.get_dag_details` fails if `DAG Run Timeout` is `None`

Observed Behavior:

DAGApi.get_dag_details fails if DAG Run Timeout is None.

To recreate:

First, Create a DAG and set "DAG Run Timout" as None
Next, try to access the DAG Details using the API:

    api = DAGApi(airflow_client.client)
    dag = api.get_dag_details(dag_id)

Exception:

airflow_client.client.exceptions.ApiTypeError: Invalid type for variable 'dag_run_timeout'. Required value type is TimeDelta and passed type was NoneType at ['received_data']['dag_run_timeout']

Proposed fix:

The DAG schema at dagrun_timeout should support NoneType, as this is what is supported in the Airflow API documentation.

trigger dag api is broken

Client version: 2.1.0

Code:

    dag_id = "example_bash_operator"
    dag_run_api_instance = dag_run_api.DAGRunApi(api_client)
    try:
        # Create a DAGRun object
        dag_run = DAGRun(
            dag_run_id='some_test_run',
            dag_id=dag_id,
            external_trigger=True,
        )
        api_response = dag_run_api_instance.post_dag_run(dag_id, dag_run)
        pprint(api_response)
    except airflow_client.client.exceptions.OpenApiException as e:
        print("Exception when calling DAGRunAPI->post_dag_run: %s\n" % e)

Error:

HTTP response body: {
  "detail": "Property is read-only - 'dag_id'",
  "status": 400,
  "title": "Bad Request",
  "type": "http://apache-airflow-docs.s3-website.eu-central-1.amazonaws.com/docs/apache-airflow/latest/stable-rest-api-ref.html#section/Errors/BadRequest"
}

Is the issues still under monitoring?

It seems that the codes on master branch are no longer updated since 5 months ago and the issues 20 21 27 28 29 are all keeping open though openapi had released 5.2.0 in August and even 5.3.0 recently.

User confirmation needed

I try to connect to remote airflow server
While using client I get ApiTypeError because instead of one of the arguments, the html code goes to the function, where there is User confirmation needed string
Here is my code:

import os.path
from dotenv import load_dotenv
import airflow_client.client
from airflow_client.client.api import dag_api


dotenv_path = os.path.join(os.path.dirname(__file__), 'conn_info.env')
if os.path.exists(dotenv_path):
    load_dotenv(dotenv_path)

AIRFLOW_CONN_INFO = {
    'host': os.getenv('AIRFLOW_HOST'),
    'username': os.getenv('airflow_login'),
    'password': os.getenv('airflow_password')
}

config = airflow_client.client.Configuration(
    host=AIRFLOW_CONN_INFO['host'],
    username=AIRFLOW_CONN_INFO['username'],
    password=AIRFLOW_CONN_INFO['password']
)

with airflow_client.client.ApiClient(config) as api_client:
    api_instance = dag_api.DAGApi(api_client)
    api_response = api_instance.get_dags()
    print(api_response)

Help please

How Create Users with role Admin?

I'm trying to create users using "airflow-client-python".
I can create users without defining roles. Automatically the user will be "Public".

# Imports
from airflow_client import client
from airflow_client.client.api import user_api
from pprint import pprint
from airflow_client.client.model.user import User
from airflow_client.client.model.user_collection_item_roles import UserCollectionItemRoles

# Configure
configuration = client.Configuration(
    host='http://localhost:8080/api/v1',
    username='airflow',
    password='airflow',
)

# Set User
user = User(
    first_name='Michel',
    last_name='Metran',
    username='mmetran',
    email='[email protected]',
    password='1111',
)

# Enter a context with an instance of the API client
with client.ApiClient(configuration) as api:
    # Create an instance of the API class
    api_instance = user_api.UserApi(api)
    try:
        # Create a user
        api_response = api_instance.post_user(user)
        pprint(api_response)
        
    except client.ApiException as e:
        print(f'Exception when calling UserApi:\n{e}')

The output will be...

{'active': True,
 'changed_on': '2023-08-03T23:17:12.002701',
 'created_on': '2023-08-03T23:17:12.002684',
 'email': '[email protected]',
 'fail_login_count': None,
 'first_name': 'Michel',
 'last_login': None,
 'last_name': 'Metran',
 'login_count': None,
 'roles': [{'name': 'Public'}],
 'username': 'mmetran'}

So, I tried the same "type" of the output: list to define another role...
... list not work...

How to create a user with another definition for role!?
I've tried several things and nothing works.

user = User(
    first_name='Michel',
    last_name='Metran',
    username='mmetran',
    email='[email protected]',
    password='1111',
    roles=([{'name': 'Public'}]),    # Error=Invalid type for variable '0'. Required value type is UserCollectionItemRoles and passed type was dict at ['roles'][0]
    #roles='Admin' # Error=Invalid type for variable 'roles'. Required value type is list and passed type was str at ['roles']
    #roles=['Public'], # Error=Invalid type for variable '0'. Required value type is UserCollectionItemRoles and passed type was str at ['roles'][0]
    #roles=UserCollectionItemRoles(), # Error=Invalid type for variable 'roles'. Required value type is list and passed type was UserCollectionItemRoles at ['roles']
)

API client configuration lack of support to pass verify_ssl for oauth based token authentication

Issue:
I am using airflow client with our own authentication based in azure AD , API is throwing forbidden error as stated below.

I think it is due to ssl cert verification is causing the issue, It would be great to have verify_ssl = true/false as parameter to configuration.

Github Code

https://github.com/apache/airflow-client-python/blob/8bf55dbc64d72ea318027c49ceda8d59490494c2/airflow_client/client/configuration.py#L118C3-L127C20

Suggestion for workaround

Can someone recommend any workaround to resolve my issue ?

Sample Code

import airflow_client.client
from pprint import pprint
from airflow_client.client.api import config_api


auth_backend = airflow.customauth

configuration = airflow_client.client.Configuration(
    host="http://localhost/api/v1",
    access_token='Bearer xxxxxxxxxxxx' 
)


# Enter a context with an instance of the API client
with airflow_client.client.ApiClient(configuration) as api_client:
    # Create an instance of the API class
    api_instance = config_api.ConfigApi(api_client)

    try:
        # Get current configuration
        api_response = api_instance.get_config()
        pprint(api_response)
    except airflow_client.client.ApiException as e:
        print("Exception when calling ConfigApi->get_config: %s\n" % e)    

Error:

Exception when calling VariableApi->get_variables: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Date': 'Fri, 22 Mar 2024 13:29:05 GMT', 'Content-Type': 'application/problem+json', 'Content-Length': '185', 'Connection': 'keep-alive', 'x-robots-tag':
'noindex, nofollow', 'Strict-Transport-Security': 'max-age=15724800; includeSubDomains'})
HTTP response body: {
"detail": null,
"status": 403,
"title": "Forbidden",
"type": "https://airflow.apache.org/docs/apache-airflow/2.8.1/stable-rest-api-ref.html#section/Errors/PermissionDenied"
}

Pools API broken using airflow 2.1.2

Calling pool_client.get_pools() I get this error:

Invalid inputs given to generate an instance of 'PoolCollectionAllOf'. The input data was invalid for the allOf schema 'PoolCollectionAllOf' in the composed schema 'PoolCollection'. Error=Pool has no attribute 'running_slots' at ['received_data']['pools'][0]['running_slots']

Here is the response debug

DEBUG:airflow_client.client.rest: response body: b'{\n  "pools": [\n    {\n      "name": "default_pool",\n      "occupied_slots": 1,\n      "open_slots": 47,\n      "queued_slots": 0,\n      "running_slots": 1,\n      "slots": 48\n    },\n   ],\n  "total_entries": 1\n}\n'

Get Dags call is not working.

Response from API call via Postman -
'/dags?limit=1

{
    "dag_id": "Disaster_Recovery",
    "description": null,
    "file_token": "Ii91c3IvbG9jYWwvYWlyZmxvdy9kYWdzL2NvbmNvcmRfZGFnc19kci5weSI.fWukEYmKdWxEz4D_qEy-F2BqCZw",
    "fileloc": "/usr/local/airflow/dags/concord_dags_dr.py",
    "is_active": true,
    "is_paused": false,
    "is_subdag": false,
    "owners": [
        "Airflow"
    ],
    "root_dag_id": null,
    "schedule_interval": {
        "__type": "CronExpression",
        "value": "*/5 * * * *"
    },
    "tags": []
}

Using the api-client to get the same information leads to error -

        with airflow_client.client.ApiClient(self.configuration) as api_client:
            # Create an instance of the API class
            api_instance = dag_api.DAGApi(api_client)
            try:
                api_response = api_instance.get_dags(limit=1)
                pprint(api_response)
airflow_client.client.exceptions.ApiValueError: Invalid inputs given to generate an instance of 'DAGCollectionAllOf'. The input data was invalid for the allOf schema 'DAGCollectionAllOf' in the composed schema 'DAGCollection'. Error=DAG has no attribute 'is_active' at ['received_data']['dags'][0]['is_active']

[openapi] wrong use of $ref and nullable

[description]
file: airflow/airflow/api_connexion/openapi/v1.yaml

As seen in the file above, the following usages are used in many places:

# line 2661
dag_run_timeout:
              nullable: true
              $ref: '#/components/schemas/TimeDelta'

# line 2498
sla_miss:
          $ref: '#/components/schemas/SLAMiss'
          nullable: true

Obviously the expectation is to be able to make a variable nullable while using $ref.

But the generated code is not as expected.
https://github.com/apache/airflow-client-python/blob/master/airflow_client/client/model/dag_detail.py

# line 132
@cached_property
    def openapi_types():
        """
        This must be a method because a model may have properties that are
        of type self, this must run after the class is loaded

        Returns
            openapi_types (dict): The key is attribute name
                and the value is attribute type.
        """
        lazy_import()
        return {
           ...
            'dag_run_timeout': (TimeDelta,),  # noqa: E501
           ...
        }

[expected]

return {
           ...
            'dag_run_timeout': (TimeDelta, none_type, ),  # noqa: E501
           ...
        }

[Suggest a fix]

Per the openapi spec, properties adjacent to refs are ignored:
https://github.com/OAI/OpenAPI-Specification/blob/main/versions/3.1.0.md#reference-object
This object cannot be extended with additional properties and any properties added SHALL be ignored. except for summary and description.

The right schema could be:

dag_run_timeout:
 oneOf:
  - type: null
  - $ref: '#/components/schemas/TimeDelta'

The generated code will be like:

return {
           ...
             'dag_run_timeout': (bool, date, datetime, dict, float, int, list, str, none_type,),  # noqa: E501
           ...
        }

Of course, some bugs due to nullable can be fixed, but the readability of the generated model will be worse. I am still working on that.

[related]
#20
#27

Trouble to get logs using get_logs function

I am expected to create logs of a all DAG

from airflow_client.client.api import task_instance_api

configuration = airflow_client.client.Configuration( host="http://localhost:8080/api/v1", username="********", password="********" )

with airflow_client.client.ApiClient(configuration) as api_client:
        get_task_api_response = task_api_instance.get_log(dag_id, dag_run_id, task_id, task_try_number)
        
        print(get_task_api_response)

With using above code snipped i want to access logs of a particular DAG

but in return i am getting following response

{'content': '[(\'\', "*** Log file does not exist: ' '/opt/airflow/logs/dag_id=dataset_produces_1/run_id=scheduled__2024-04-20T00:00:00+00:00/task_id=producing_task_1/attempt=1.log\\n*** ' 'Fetching from: ' 'http://:8793/log/dag_id=dataset_produces_1/run_id=scheduled__2024-04-20T00:00:00+00:00/task_id=producing_task_1/attempt=1.log\\n*** ' 'Failed to fetch log file from worker. Request URL is missing an ' '\'http://\' or \'https://\' protocol.\\n")]', 'continuation_token': 'eyJlbmRfb2ZfbG9nIjp0cnVlfQ.vXFGNvOjpNDb4rW6n4IQQ2F649M'}

Is there any way ? or any solution that i could use ?

The logical date parameter for DAGRun is incorrect

Exception when calling DAGRunAPI->post_dag_run: (400) Reason: BAD REQUEST HTTP response headers: HTTPHeaderDict({'Server': 'gunicorn', 'Date': 'Wed, 25 Oct 2023 15:29:05 GMT', 'Connection': 'close', 'Content-Type': 'application/problem+json', 'Content-Length': '245', 'Access-Control-Allow-Methods': 'POST, GET, OPTIONS, DELETE, PATCH', 'X-Robots-Tag': 'noindex, nofollow'}) HTTP response body: { "detail": "'2023-10-25T23:29:05.630191' is not a 'date-time' - 'logical_date'", "status": 400, "title": "Bad Request", "type": "https://airflow.apache.org/docs/apache-airflow/2.7.2/stable-rest-api-ref.html#section/Errors/BadRequest" }

Not possible to trigger dag run

According to documentation the below code should trigger a dug run

api_response = api_instance.post_dag_run( dag_id = "dag_name", dag_run = DAGRun(..., state=DagState("queued")))

instead it raises the API error:

raise ApiException(http_resp=r) airflow_client.client.exceptions.ApiException: (400) Reason: BAD REQUEST HTTP response headers: HTTPHeaderDict({'Date': 'Thu, 15 Dec 2022 13:18:16 GMT', 'Content-Type': 'application/problem+json', 'Content-Length': '210', 'Connection': 'keep-alive', 'X-Robots-Tag': 'noindex, nofollow', 'Strict-Transport-Security': 'max-age=15724800; includeSubDomains'}) HTTP response body: { "detail": "Property is read-only - 'state'", "status": 400, "title": "Bad Request", "type": "https://airflow.apache.org/docs/apache-airflow/2.4.1/stable-rest-api-ref.html#section/Errors/BadRequest" }

SLAMiss not allowing null value

Example:

>>> ti = TaskInstanceApi(api_client)
>>> ti.get_task_instances('tutorial', 'manual__2022-10-31T20:32:58.139267+00:00')
...
ApiValueError: Invalid inputs given to generate an instance of 'TaskInstanceCollectionAllOf'. The input data was invalid for the allOf schema 'TaskInstanceCollectionAllOf' in the composed schema 'TaskInstanceCollection'. Error=Invalid type for variable 'sla_miss'. Required value type is SLAMiss and passed type was NoneType at ['received_data']['task_instances'][0]['sla_miss']

In the raw json data:

...
"sla_miss": null,
...

This is using apache-airflow-client 2.3.0 and airflow 2.3.2

issue of get_task_instance

An exception will be raised when i use the get_task_instance method to get information about a DAG(without SLA) task.
the exception information is attached
traceback

airflow_client.client.exceptions.ApiValueError: Unsupported type: <class 'object'> for any API requests

>>> from airflow_client.client.api import dag_api
>>> api_instance = dag_api.DAGApi(api_client)
>>> api_instance.get_dags()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/dimberman/.virtualenvs/pytest-bdd/lib/python3.7/site-packages/airflow_client/client/api_client.py", line 789, in __call__
    return self.callable(self, *args, **kwargs)
  File "/Users/dimberman/.virtualenvs/pytest-bdd/lib/python3.7/site-packages/airflow_client/client/api/dag_api.py", line 486, in __get_dags
    return self.call_with_http_info(**kwargs)
  File "/Users/dimberman/.virtualenvs/pytest-bdd/lib/python3.7/site-packages/airflow_client/client/api_client.py", line 867, in call_with_http_info
    collection_formats=params['collection_format'])
  File "/Users/dimberman/.virtualenvs/pytest-bdd/lib/python3.7/site-packages/airflow_client/client/api_client.py", line 432, in call_api
    _check_type)
  File "/Users/dimberman/.virtualenvs/pytest-bdd/lib/python3.7/site-packages/airflow_client/client/api_client.py", line 244, in __call_api
    _check_type
  File "/Users/dimberman/.virtualenvs/pytest-bdd/lib/python3.7/site-packages/airflow_client/client/api_client.py", line 348, in deserialize
    configuration=self.configuration
  File "/Users/dimberman/.virtualenvs/pytest-bdd/lib/python3.7/site-packages/airflow_client/client/model_utils.py", line 1420, in validate_and_convert_types
    check_type=_check_type
  File "/Users/dimberman/.virtualenvs/pytest-bdd/lib/python3.7/site-packages/airflow_client/client/model_utils.py", line 1287, in attempt_convert_item
    valid_classes_ordered = order_response_types(valid_classes)
  File "/Users/dimberman/.virtualenvs/pytest-bdd/lib/python3.7/site-packages/airflow_client/client/model_utils.py", line 920, in order_response_types
    key=lambda class_or_instance: index_getter(class_or_instance)
  File "/Users/dimberman/.virtualenvs/pytest-bdd/lib/python3.7/site-packages/airflow_client/client/model_utils.py", line 920, in <lambda>
    key=lambda class_or_instance: index_getter(class_or_instance)
  File "/Users/dimberman/.virtualenvs/pytest-bdd/lib/python3.7/site-packages/airflow_client/client/model_utils.py", line 916, in index_getter
    raise ApiValueError("Unsupported type: %s" % class_or_instance)
airflow_client.client.exceptions.ApiValueError: Unsupported type: <class 'object'>

I think the new version of OpenAPI might have broken the client. I get these from following most of the examples except config. @mik-laj any idea what might cause this?

Handle pagination for client code

My thoughts on API client libraries are that pagination should be handled by them or at least have the option to be handled by them and the client code does not have to concern itself with calling functions again to deal with pagination etc. Now realizing this is auto-generated code is this something of interest? If so, I can put together something more concrete.

get_tasks api is broken

Client version: 2.1.0

Code:

    dag_id = "example_bash_operator"

    try:
        api_response = dag_api_instance.get_tasks(dag_id)
        pprint(api_response)
    except airflow_client.client.exceptions.OpenApiException as e:
        print("Exception when calling DagAPI->get_tasks: %s\n" % e)

Error:

Exception when calling DagAPI->get_tasks: Invalid type for variable 'execution_timeout'. Required value type is TimeDelta and passed type was NoneType at ['received_data']['tasks'][0]['execution_timeout']

Airflow Python Client 2.6.0rc1 fails with invalid type variable `execution_timeout` on get_tasks

When running get_tasks API calls, the Python cllient (when used with Airflow 2.6.0) fails with:

Caling get tasks
Exception when calling DagAPI->get_tasks: Invalid type for variable 'execution_timeout'. Required value type is TimeDelta and passed type was NoneType at ['received_data']['tasks'][0]['execution_timeout']

Apparently because it expects not Nuill execution timeout,.

How to reproduce:

  1. breeze start-airflow --db-reset --use-airflow-version 2.6.0 --load-example-dags --load-default-connection

  2. In the terminal lnstall 2.6.0rc1 of python-client: pip install apache-airlfow-client==2.6.0.rc1

  3. Ctrl-C webserver

  4. Change configuration in ~/airflow/airflow.cfg (with vim for example)

  • set [webserver] -> expose_config = True
  • enable basic authentication by adding ,airflow.api.auth.backend.basic_auth to [api] -> auth_backends
  1. Copy the following scripts to the container (for example to files folder to be able to copy it in the host) and name it test_python_client.py:
configuration = airflow_client.client.Configuration(
    host="http://localhost:8080/api/v1",
    username='admin',
    password='admin'
)

dag_id = "example_bash_operator"

# Enter a context with an instance of the API client
with airflow_client.client.ApiClient(configuration) as api_client:
    # Get current configuration
    conf_api_instance = config_api.ConfigApi(api_client)
    try:
        api_response = conf_api_instance.get_config()
        pprint(api_response)
    except airflow_client.client.OpenApiException as e:
        print("Exception when calling ConfigApi->get_config: %s\n" % e)


    # Get dag list
    dag_api_instance = dag_api.DAGApi(api_client)
    try:
        api_response = dag_api_instance.get_dags()
        pprint(api_response)
    except airflow_client.client.OpenApiException as e:
        print("Exception when calling DagAPI->get_dags: %s\n" % e)

    print("Caling get tasks")
    # Get tasks for a DAG (TODO: issue#20)
    try:
        api_response = dag_api_instance.get_tasks(dag_id)
        pprint(api_response)
    except airflow_client.client.exceptions.OpenApiException as e:
        print("Exception when calling DagAPI->get_tasks: %s\n" % e)


    print("Caling post dag run")
    # Trigger a dag run (TODO: issue#21)
    dag_run_api_instance = dag_run_api.DAGRunApi(api_client)
    try:
        # Create a DAGRun object
        dag_run = DAGRun(
            dag_run_id='some_test_run',
            dag_id=dag_id,
            external_trigger=True,
        )
        api_response = dag_run_api_instance.post_dag_run(dag_id, dag_run)
        pprint(api_response)
    except airflow_client.client.exceptions.OpenApiException as e:
        print("Exception when calling DAGRunAPI->post_dag_run: %s\n" % e)

  1. Run the script

Result: get_tasks will fail with this error (even though the API returned correctly response with 200 exit code.

Caling get tasks
Exception when calling DagAPI->get_tasks: Invalid type for variable 'execution_timeout'. Required value type is TimeDelta and passed type was NoneType at ['received_data']['tasks'][0]['execution_timeout']

Trouble to access reason behind Failed DAGs LOG

Hello , Currently i am working on a task where i have to generate logs of success and failed DAGs

With help of following code I am able to get DAG details about success DAGs I am able to get DAG_ids , Start_date , end_date ,state all
necessary things for a DAG only part I am stuck at is getting reason why a DAG failed
I have used following list of functions to help me reach here

  • get_dag_runs
  • get_dag_runs_batch
  • get_dag_run
  • get_task_instances

If any solutions please let me know

How to use cookies?

Using curl I can do the following:

$ curl -b ./cookies.txt https://url/api/v1/dag

How can I authorize using cookies file with airflow-client-python?

Thank you.

I think what I am asking for is how to add a custom additional HTTP headers to the request.

Basic authentication stopped working in 2.5.0

Hello,

Version 2.5.0 of the client library stopped generating Authorization header when sending HTTP requests to Airflow.
I did some debug of the code and I can see that eg. for ConfigApi, a code that previously looked like this:

class ConfigApi(object):
    """NOTE: This class is auto generated by OpenAPI Generator
    Ref: https://openapi-generator.tech

    Do not edit the class manually.
    """

    def __init__(self, api_client=None):
        if api_client is None:
            api_client = ApiClient()
        self.api_client = api_client
        self.get_config_endpoint = _Endpoint(
            settings={
                'response_type': (Config,),
                'auth': [
                    'Basic',
                    'Kerberos'
                ],

now looks like this

class ConfigApi(object):
    """NOTE: This class is auto generated by OpenAPI Generator
    Ref: https://openapi-generator.tech

    Do not edit the class manually.
    """

    def __init__(self, api_client=None):
        if api_client is None:
            api_client = ApiClient()
        self.api_client = api_client
        self.get_config_endpoint = _Endpoint(
            settings={
                'response_type': (Config,),
                'auth': [],

Are we doing something wrong?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.