Git Product home page Git Product logo

airflow-pentaho-plugin's Introduction

Pentaho Airflow plugin

Build Status codecov PyPI PyPI - Downloads

This plugins runs Jobs and Transformations through Carte servers. It allows to orchestrate a massive number of trans/jobs taking care of the dependencies between them, even between different instances. This is done by using CarteJobOperator and CarteTransOperator

It also runs Pan (transformations) and Kitchen (Jobs) in local mode, both from repository and local XML files. For this approach, use KitchenOperator and PanOperator

Requirements

  1. A Apache Airflow system deployed.
  2. One or many working PDI CE installations.
  3. A Carte server for Carte Operators.

Setup

The same setup process must be performed on webserver, scheduler and workers (that runs this tasks) to get it working. If you want to deploy specific workers to run this kind of tasks, see Queues, in Airflow Concepts section.

Pip package

First of all, the package should be installed via pip install command.

pip install airflow-pentaho-plugin

Airflow connection

Then, a new connection needs to be added to Airflow Connections, to do this, go to Airflow web UI, and click on Admin -> Connections on the top menu. Now, click on Create tab.

Use HTTP connection type. Enter the Conn Id, this plugin uses pdi_default by default, the username and the password for your Pentaho Repository.

At the bottom of the form, fill the Extra field with pentaho_home, the path where your pdi-ce is placed, and rep, the repository name for this connection, using a json formatted string like it follows.

{
    "pentaho_home": "/opt/pentaho",
    "rep": "Default"
}

Carte

In order to use CarteJobOperator, the connection should be set different. Fill host (including http:// or https://) and port for Carte hostname and port, username and password for PDI repository, and extra as it follows.

{
    "rep": "Default",
    "carte_username": "cluster",
    "carte_password": "cluster"
}

Usage

CarteJobOperator

CarteJobOperator is responsible for running jobs in remote slave servers. Here it is an example of CarteJobOperator usage.

# For versions before 2.0
# from airflow.operators.airflow_pentaho import CarteJobOperator

from airflow_pentaho.operators.carte import CarteJobOperator

# ... #

# Define the task using the CarteJobOperator
avg_spent = CarteJobOperator(
    conn_id='pdi_default',
    task_id="average_spent",
    job="/home/bi/average_spent",
    params={"date": "{{ ds }}"},  # Date in yyyy-mm-dd format
    dag=dag)

# ... #

some_task >> avg_spent >> another_task

KitchenOperator

Kitchen operator is responsible for running Jobs. Lets suppose that we have a defined Job saved on /home/bi/average_spent in our repository with the argument date as input parameter. Lets define the task using the KitchenOperator.

# For versions before 2.0
# from airflow.operators.airflow_pentaho import KitchenOperator

from airflow_pentaho.operators.kettle import KitchenOperator

# ... #

# Define the task using the KitchenOperator
avg_spent = KitchenOperator(
    conn_id='pdi_default',
    queue="pdi",
    task_id="average_spent",
    directory="/home/bi",
    job="average_spent",
    params={"date": "{{ ds }}"},  # Date in yyyy-mm-dd format
    dag=dag)

# ... #

some_task >> avg_spent >> another_task

CarteTransOperator

CarteTransOperator is responsible for running transformations in remote slave servers. Here it is an example of CarteTransOperator usage.

# For versions before 2.0
# from airflow.operators.airflow_pentaho import CarteTransOperator

from airflow_pentaho.operators.carte import CarteTransOperator

# ... #

# Define the task using the CarteJobOperator
enriche_customers = CarteTransOperator(
    conn_id='pdi_default',
    task_id="enrich_customer_data",
    job="/home/bi/enrich_customer_data",
    params={"date": "{{ ds }}"},  # Date in yyyy-mm-dd format
    dag=dag)

# ... #

some_task >> enrich_customers >> another_task

PanOperator

Pan operator is responsible for running transformations. Lets suppose that we have one saved on /home/bi/clean_somedata. Lets define the task using the PanOperator. In this case, the transformation receives a parameter that determines the file to be cleaned.

# For versions before 2.0
# from airflow.operators.airflow_pentaho import PanOperator

from airflow_pentaho.operators.kettle import PanOperator

# ... #

# Define the task using the PanOperator
clean_input = PanOperator(
    conn_id='pdi_default',
    queue="pdi",
    task_id="cleanup",
    directory="/home/bi",
    trans="clean_somedata",
    params={"file": "/tmp/input_data/{{ ds }}/sells.csv"},
    dag=dag)

# ... #

some_task >> clean_input >> another_task

For more information, please see sample_dags/pdi_flow.py

airflow-pentaho-plugin's People

Contributors

adolnik avatar dependabot[bot] avatar piffall avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

airflow-pentaho-plugin's Issues

Log Level Problem

Hello,

The CarteJobOperator's "level" argument does not work sadly. As I was searching through the code, I found a bug. That is, in the Class PentahoCarteHook > PentahoCarteClient there is a hard code of "Debug" for the level parameter. Can you look into it please?

image

Thank you.

Return code 0 PanOperator for failed transformations and Xcom

Hello,

In some cases Pentaho spoon.sh (used by pans.sh) returns EXIT_CODE 0 despite the fact that inner transformation has failed.
E.g. create trans.ktr with failing sub transformation in it. In such case we cannot determine if it has failed, but only looking into the logs.
Sometimes it is allowed that some sub transformation or transformation in Job can be in failed state, therefore it is ok for spoon.sh return 0, because general Job or transformation is successful.
In such cases we go line by line and detect if it has error line like:

               contains_error = False
                for line in iter(self.sub_process.stdout.readline, b''):
                    line = line.decode('utf-8').rstrip()
                    if not contains_error: 
                        if re.search(r'-\s+ERROR\s+\(version', line):
                            contains_error = True
                    self.log.info(line)
                self.sub_process.wait()

And then put to Xcom for Airflow tasks be able to detect such errors.


     if self.xcom_push_flag:
            if not contains_error:
                return line
            else:
                return 'Untracked Error'

Can we add something like that (extra return key maybe) to airflow-pentaho-plugin for tracking such errors?

Parameters format in command line

Hello.

Using airflow-pentaho-plugin version 1.0.4 I see that parameters are not passing to transformation because they look like "-param1=param_value". But as it described in Pentaho 9.1 docs they should look like "-param:param1=param_value".

So I changed PanOperator class like this

def execute(self, context):  # pylint: disable=unused-argument
        conn = self._get_pentaho_client()

        arguments = {
            'dir': self.dir,
            'trans': self.trans,
            'level': self.level,
            'logfile': self.logfile,
            'safemode': 'true' if self.safemode else 'false',
            'maxloglines': str(self.maxloglines),
            'maxlogtimeout': str(self.maxlogtimeout)
        }
        # arguments.update(self.params)
        if self.file:
            arguments.update({'file': self.file})
            arguments.update({'norep': 'true'})

        self.command_line = conn.build_command('pan', arguments, self.params)
        output = self._run_command()

        if self.xcom_push_flag:
            return output

And PentahoClient class

        def build_command(self, command, arguments, params):
            line = [self._build_tool_command(command),
                    self._build_connection_arguments()]
            for k, val in arguments.items():
                line.append(self._build_argument(k, val))
            for k, val in params.items():
                line.append(self._build_argument(f'param:{k}', val))

            command_line = ' '.join(line)
            return command_line

And everything got to work.
My task definition in Dag

    trans1 = PanOperator(
        dag=dag,
        task_id='trans1',
        xcom_push=True,
        file='/home/airflow/repo/upload.ktr',
        trans="",
        params={"version_id": "102"})

My environment: Kubuntu 20.10, Python 3.8.6, apache airflow 2.0.1, pdi 9.1.0.0-324.

CarteTransOperator.expand_kwargs gathers incorrect log by map index

Scenario:

I have a transformation that should be run several times according to previous step results. I am using expand_kwargs feature to do this.
Simplified example:
t1 = CarteTransOperator.partial( task_id='tst_trans_params', pdi_conn_id='pentaho', trans='/public/test/tr_tst_input_params' ).expand_kwargs([{"params": {"PARAM": 1}},{"params": {"PARAM": 2}}])
The transformation tr_tst_input_params just outputps the input parameter into the log.

Result:

All works fine. But the log for both map indexes gathers from the last parameter value only
image
image

On Pentaho side:
image
image
image

Expected result:

Log should be valid

Environment

Airflow 2.5.1 + Python 3.10 + Pentaho server CE 8.0

CarteTransOperator works incorrect when transformation failed

Scenario (simplified)

  1. We have the table in the database with constraints
    CREATE TABLE tst_constraints( id int PRIMARY KEY, val text )

  2. We have the simple transformation that inserts the data into this table
    image

  3. Call the transformation in Airflow
    trans = CarteTransOperator( task_id='tst_trans_fail', pdi_conn_id='pentaho', trans='/public/test/tr_tst_failed_tab_constraint' )

Result

  1. If table is empty (no constraints inconsistency) all works as expected
  2. If the table contains inserted data, I have found two possible scenarios (the second is more often):
    2.1. The task finished with SUCCESS flag
    image
    image

2.2. The task freezing, but the transformation already failed.
image

Expected result

CarteTransOperator works properly

Additions

After discussion in Issue #29 :
If we wrap around the transformation in job, it works fine. But I want to know if it is possible to solve this issue without this.

Environment

Airflow 2.5.1 + Python 3.10 + Pentaho server CE 8.0

airflow-pentaho-plugin installation error

Hi, I'm trying to install airflow-pentaho-plugin I'm getting the following error. I'm also trying to use Python setup.py install command and the error still remains the same. Request you to help me unblock this installation process.
Environment - Python3.9
Pip version - 21.0.1
Command - sudo pip install airflow-pentaho-plugin --proxy=http://proxy:port
Error - ERROR: Could not find a version that satisfies the requirement setuptools-git-version

I've also run the requires.txt and I see the following message when I rerun:
Requirement already satisfied: xmltodict>=0.10.0 in /usr/lib/python3.9/site-packages (from -r requires.txt (line 1)) (0.12.0)

Could you please let me know what am I missing here?

Thanks,
Pavan

CarteTransOperator.params does not work with Jinja templates

The goal is to pass airflow variables and/or xcom value into Pentaho transformation.
For example; '{{ ds }}' and 'ti.xcom_pull("previous_task_id")'
My DAG task:
trans = CarteTransOperator( dag=dag, task_id='test_trans_params', pdi_conn_id='pentaho_etl', subpath='/pentaho', trans='/public/test/tr_tst_input_params', params={'PARAM': '{{ ds }}'})
The transformation is simple: write the input parameter to log
image

The result:
image

Expected behaviour: input_param = 2023-03-03

If I put some constant or Python function instead of Jinja template, then it works fine.
For example:
params={'PARAM': str(datetime.today())}
Returns:
image

Can you help with this issue?
Airflow version: 2.5.1

ModuleNotFoundError: No module named 'packaging'

Hi guys,

I think I found the right place to report a problem.
I'm facing this error in start airflow webserver:
ERROR - Failed to import plugin airflow_pentaho
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/airflow/plugins_manager.py", line 210, in load_entrypoint_plugins
plugin_class = entry_point.load()
File "/usr/local/lib/python3.8/dist-packages/importlib_metadata/init.py", line 105, in load
module = import_module(match.group('module'))
File "/usr/lib/python3.8/importlib/init.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "", line 1014, in _gcd_import
File "", line 991, in _find_and_load
File "", line 975, in _find_and_load_unlocked
File "", line 671, in _load_unlocked
File "", line 783, in exec_module
File "", line 219, in _call_with_frames_removed
File "/usr/local/lib/python3.8/dist-packages/airflow_pentaho/plugin.py", line 19, in
from airflow_pentaho.hooks.carte import PentahoCarteHook
File "/usr/local/lib/python3.8/dist-packages/airflow_pentaho/hooks/carte.py", line 20, in
from packaging import version
ModuleNotFoundError: No module named 'packaging'

My environment is:
Ubuntu 20.04
OpenJDK 8
PDI 9.1
Airflow 2.1.0 & 2.0.2
Python 3.8.5
airflow-pentaho-plugin 1.0.8

Cheers,

Connection refused with the right host

Hi folks!
I'm trying to use the pentaho library and using CarteTransOperator. My problem is that I created the Airflow connection with all the params, but my connection is always refused with the message:
HTTPConnectionPool(host='localhost', port=8081): Max retries exceeded with url: /kettle/executeTrans/?user=&pass=&rep=repo&trans=test&level=Basic (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f13c76d0c50>: Failed to establish a new connection: [Errno 111] Connection refused',).
Am I doing something wrong? I'm follow the documentation.

Error running a transformation

Error running a transformation. Can anyone help?

[2022-11-09, 16:59:28 -03] {base.py:71} INFO - Using connection ID 'pdi_default' for task execution.
[2022-11-09, 16:59:28 -03] {carte.py:175} INFO - Executing /home/rmendonc/data-integration/transformacoes/LOOP.ktr
[2022-11-09, 16:59:28 -03] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow_pentaho/operators/carte.py", line 183, in execute
status = status_trans_rs['transstatus']
KeyError: 'transstatus'
[2022-11-09, 16:59:28 -03] {taskinstance.py:1406} INFO - Marking task as FAILED. dag_id=pdi_flow2, task_id=loop, execution_date=20221108T003000, start_date=20221109T195927, end_date=20221109T195928
[2022-11-09, 16:59:28 -03] {standard_task_runner.py:105} ERROR - Failed to execute job 256 for task loop ('transstatus'; 5457)
[2022-11-09, 16:59:28 -03] {local_task_job.py:164} INFO - Task exited with return code 1
[2022-11-09, 16:59:28 -03] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check

Job status is always Success

Hi,
I've just started using this plugin. I have created a transformation on the Pentaho data integration server and have created a connection from Airflow to PDI. I'm using PanOperator & KitchenOperator to trigger Pentaho transformation & jobs respectively. There is a dependency created in the DAG like this: Transformation >> JOB. Even when the transformation fails the status of the Transformation is always green on the job graph and JOB is also getting triggered. I was expecting that failure should get reported and not run the subsequent downstream. Any suggestions on what am I missing or doing incorrect? My DAG is given below:

from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow_pentaho.operators.KitchenOperator import KitchenOperator
from airflow_pentaho.operators.PanOperator import PanOperator
from airflow_pentaho.operators.CarteJobOperator import CarteJobOperator
from airflow_pentaho.operators.CarteTransOperator import CarteTransOperator

_DAG_NAME = "pdi_example_2"
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['[email protected]'],
'retries': 3,
'retry_delay': timedelta(minutes=10),
'email_on_failure': False,
'email_on_retry': False
}

with DAG(dag_id=DAG_NAME,
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
schedule_interval='30 0 * * *') as dag:

trans = PanOperator(
        queue="pdi_2",
        task_id="pdi_example_2",
        directory={},
        file="/path/sample.ktr",
        trans={},
        params={},
        dag=dag)
        
job = KitchenOperator( 
            queue="pdi_3",
            task_id="average_spent",
            directory={},
            job={},
            file="/path/sample.kjb",
            params={},  # Date in yyyy-mm-dd format
            dag=dag)_

trans >> job

Proposal. Subpath option

In my environment the Pentaho is not alone app on the server. So, the URL to the server is more complex: http://<server_address>:< port >/subpath
I have implemented the possibility to add subpath on my side. But whould be nice if this option will be implemented in the major version.

Airflow 1.10 and this plugin

Hi!

I (for now) have to use old Airflow 1.10. Getting an error that I have found little information about and little info even in source code.

[2022-09-27 18:20:26,623] {taskinstance.py:1150} ERROR - __init__() missing 1 required positional argument: 'source' Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task result = task_copy.execute(context=context) File "/usr/local/lib/python3.8/site-packages/airflow_pentaho/operators/carte.py", line 97, in execute conn = self._get_pentaho_carte_client() File "/usr/local/lib/python3.8/site-packages/airflow_pentaho/operators/carte.py", line 90, in _get_pentaho_carte_client return PentahoCarteHook(conn_id=self.pdi_conn_id, File "/usr/local/lib/python3.8/site-packages/airflow_pentaho/hooks/carte.py", line 167, in __init__ super().__init__() TypeError: __init__() missing 1 required positional argument: 'source'

The thing is, the DAG works on my own machine (2.0 and own hosted carte), but not on the server where it is 1.10.
The airflow connections are all setup up with the extra parameters,

Is the plugin not compatible with older Airflow anymore?

The DAG:

`work_params = {
"pentaho_db_user": pentaho_dwh_db_user,
"pentaho_dwh_db_pass": pentaho_dwh_db_pass,
"pentaho_ods_db_pass": pentaho_ods_db_pass,
"pentaho_dwh_host": db_dwh_conn_params.get("db_ip"),
"pentaho_dwh_port": db_dwh_conn_params.get("db_port"),
"pentaho_dwh_db_name": db_dwh_conn_params.get("db_name"),
"pentaho_ods_host": db_ods_conn_params.get("db_ip"),
"pentaho_ods_port": db_ods_conn_params.get("db_port"),
"pentaho_ods_db_name": db_ods_conn_params.get("db_name"),
"pentaho_job_path": pentaho_job_path,
"insert_sql_path" : insert_sql_path
}

DAG_NAME = "eelis_pdi_dwh_f_lkoht"
DEFAULT_ARGS = {
'owner': 'me',
}

with DAG(dag_id=DAG_NAME,
default_args=DEFAULT_ARGS,
catchup=False,
start_date=days_ago(1),
schedule_interval='15 3 * * *') as dag:

# Define the task using the CarteJobOperator
insert_into_lkoht = CarteJobOperator(
    task_id="job_testing1",
    job=pentaho_job_path,
    params= work_params, 
    dag=dag)

# ... #

insert_into_lkoht`

In airflow I can see that all the parameters etc are there and given. Not empty and finds the Pentaho job path as well.
Pip install is the newest 1.0.10 version too or should be. (some traces in older source code)

Amazon Managed Workflows for Apache Airflow - Custom Plugin Issue

Hello,

We just want to use airflow-pentaho plugin with the Amazon Managed Workflows for Apache Airflow. But as we loaded plug-in as custom to the enviroment the following error appears on UI. Only available version of Airflow on AWS is 1.10.12 right now.

Is there a possiblity to use this plug-in in AWS? With requirements file or custom plug-in file?

"Broken DAG: [/usr/local/airflow/dags/dag_test.py] No module named 'airflow_pentaho.hooks'; 'airflow_pentaho' is not a package"

Thanks in advance..

Erinc

usage of this plugin

Hello,

thank you for your plugin.
I have been trying to use it and have some question.
I am wondering how do you proceed with airflow when you need to run a job/transformation to reload historical datas. I have seen that it is possible to trigger manually the dag giving "Configuration JSON". Are you using this?

Thank you.

Erwan

cannot get transformation result status Carte

Hello!
Pentaho 9.4 + Carte
When using CarteTransOperator it cannot get execution status:
airflow.exceptions.AirflowException: Unexpected server response: {"webresult": {"result": "ERROR", "message": "The specified transformation [trans.ktr] could not be found", "id": null}}

It's because it passes .ktr along to transformation name. If ".ktr" is removed in _get_trans_name (carte.py file) - it works.

That's not the case in CarteJobOperator, it works.

Unable to find repository: pentaho_server

Hello everyone, I'm facing a problem when trying to use the airflow_pentaho.operators.CarteTransOperator.

I am getting this error even after I have created the repository. CarteTransOperator just can't find my jobs. This is the log of my DAG.

ERROR Unexpected error executing the transformation: org.pentaho.di.core.exception.KettleException: Unable to find repository: pentaho_server org.pentaho.di.www.ExecuteTransServlet.openRepository(ExecuteTransServlet.java:410) org.pentaho.di.www.ExecuteTransServlet.doGet(ExecuteTransServlet.java:261) javax.servlet.http.HttpServlet.service(HttpServlet.java:687) javax.servlet.http.HttpServlet.service(HttpServlet.java:790) org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:873) org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:542) org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255) org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1700) org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255) org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1345) org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203) org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:480) org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1667) org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201) org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1247) org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144) org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:220) org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:61) org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:513) org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) org.eclipse.jetty.server.Server.handle(Server.java:505) org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370) org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267) org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305) org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103) org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117) org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333) org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310) org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168) org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126) org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366) org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:698) org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:804) java.lang.Thread.run(Thread.java:750)

This is my connection:

image

This is my task using the CarteTransOperator:
    job1 = CarteTransOperator(
        task_id="job1",
        trans="/public/home/my_asewome_ktr",
        params=None,
        pdi_conn_id="pdi_pentaho_server",
        level='Basic',
        dag=dag) 

The same happens when I try to run a job through the do CarteJobOperator.

issues with "params"

we have just upgraded from airflow v1.10.11 to airflow v2.2.4.
We are getting the following error in airflow. This dag was working correctly before the migration.

Broken DAG: [/opt/airflow/dags/dag1.py] Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/airflow/serialization/serialized_objects.py", line 578, in serialize_operator
    serialize_op['params'] = cls._serialize_params_dict(op.params)
  File "/usr/local/lib/python3.8/dist-packages/airflow/serialization/serialized_objects.py", line 451, in _serialize_params_dict
    if f'{v.__module__}.{v.__class__.__name__}' == 'airflow.models.param.Param':
AttributeError: 'str' object has no attribute '__module__'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/airflow/serialization/serialized_objects.py", line 939, in to_dict
    json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)}
  File "/usr/local/lib/python3.8/dist-packages/airflow/serialization/serialized_objects.py", line 851, in serialize_dag
    raise SerializationError(f'Failed to serialize DAG {dag.dag_id!r}: {e}')
airflow.exceptions.SerializationError: Failed to serialize DAG 'dag1': 'str' object has no attribute '__module__'

The dag looks like :

# DAG creation
with DAG (
    dag_id="dag1",
    default_args=DEFAULT_ARGS,
    max_active_runs=1,
    description='dag1',
    schedule_interval=WORKFLOW_SCHEDULE_INTERVAL,
    catchup=False
) as dag :

    extract_tables = KitchenOperator (
        pdi_conn_id='pdi_default',
        task_id="extract_tables",
        directory=PROJECT_DIRECTORY,
        job="jb_01_load_ods_tables",
        file=PROJECT_DIRECTORY+"jb_01_load_ods_tables.kjb",
        params={
            "date": '{{ ds }}'
        }
    )

If we remove "date": '{{ ds }}', the error in airflow disapears.

Is it possible that the issue is linked with airflow-pentaho-plugin?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.