damavis / airflow-pentaho-plugin Goto Github PK
View Code? Open in Web Editor NEWPentaho plugin for Apache Airflow - Orquestate pentaho transformations and jobs from Airflow
License: Apache License 2.0
Pentaho plugin for Apache Airflow - Orquestate pentaho transformations and jobs from Airflow
License: Apache License 2.0
We have the table in the database with constraints
CREATE TABLE tst_constraints( id int PRIMARY KEY, val text )
We have the simple transformation that inserts the data into this table
Call the transformation in Airflow
trans = CarteTransOperator( task_id='tst_trans_fail', pdi_conn_id='pentaho', trans='/public/test/tr_tst_failed_tab_constraint' )
2.2. The task freezing, but the transformation already failed.
CarteTransOperator works properly
After discussion in Issue #29 :
If we wrap around the transformation in job, it works fine. But I want to know if it is possible to solve this issue without this.
Airflow 2.5.1 + Python 3.10 + Pentaho server CE 8.0
Hello,
In some cases Pentaho spoon.sh (used by pans.sh) returns EXIT_CODE 0 despite the fact that inner transformation has failed.
E.g. create trans.ktr with failing sub transformation in it. In such case we cannot determine if it has failed, but only looking into the logs.
Sometimes it is allowed that some sub transformation or transformation in Job can be in failed state, therefore it is ok for spoon.sh return 0, because general Job or transformation is successful.
In such cases we go line by line and detect if it has error line like:
contains_error = False
for line in iter(self.sub_process.stdout.readline, b''):
line = line.decode('utf-8').rstrip()
if not contains_error:
if re.search(r'-\s+ERROR\s+\(version', line):
contains_error = True
self.log.info(line)
self.sub_process.wait()
And then put to Xcom for Airflow tasks be able to detect such errors.
if self.xcom_push_flag:
if not contains_error:
return line
else:
return 'Untracked Error'
Can we add something like that (extra return key maybe) to airflow-pentaho-plugin for tracking such errors?
I have a transformation that should be run several times according to previous step results. I am using expand_kwargs feature to do this.
Simplified example:
t1 = CarteTransOperator.partial( task_id='tst_trans_params', pdi_conn_id='pentaho', trans='/public/test/tr_tst_input_params' ).expand_kwargs([{"params": {"PARAM": 1}},{"params": {"PARAM": 2}}])
The transformation tr_tst_input_params just outputps the input parameter into the log.
All works fine. But the log for both map indexes gathers from the last parameter value only
Log should be valid
Airflow 2.5.1 + Python 3.10 + Pentaho server CE 8.0
Hi folks!
I'm trying to use the pentaho library and using CarteTransOperator. My problem is that I created the Airflow connection with all the params, but my connection is always refused with the message:
HTTPConnectionPool(host='localhost', port=8081): Max retries exceeded with url: /kettle/executeTrans/?user=&pass=&rep=repo&trans=test&level=Basic (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f13c76d0c50>: Failed to establish a new connection: [Errno 111] Connection refused',).
Am I doing something wrong? I'm follow the documentation.
The goal is to pass airflow variables and/or xcom value into Pentaho transformation.
For example; '{{ ds }}' and 'ti.xcom_pull("previous_task_id")'
My DAG task:
trans = CarteTransOperator( dag=dag, task_id='test_trans_params', pdi_conn_id='pentaho_etl', subpath='/pentaho', trans='/public/test/tr_tst_input_params', params={'PARAM': '{{ ds }}'})
The transformation is simple: write the input parameter to log
Expected behaviour: input_param = 2023-03-03
If I put some constant or Python function instead of Jinja template, then it works fine.
For example:
params={'PARAM': str(datetime.today())}
Returns:
Can you help with this issue?
Airflow version: 2.5.1
Hello,
thank you for your plugin.
I have been trying to use it and have some question.
I am wondering how do you proceed with airflow when you need to run a job/transformation to reload historical datas. I have seen that it is possible to trigger manually the dag giving "Configuration JSON". Are you using this?
Thank you.
Erwan
Hi,
I've just started using this plugin. I have created a transformation on the Pentaho data integration server and have created a connection from Airflow to PDI. I'm using PanOperator & KitchenOperator to trigger Pentaho transformation & jobs respectively. There is a dependency created in the DAG like this: Transformation >> JOB. Even when the transformation fails the status of the Transformation is always green on the job graph and JOB is also getting triggered. I was expecting that failure should get reported and not run the subsequent downstream. Any suggestions on what am I missing or doing incorrect? My DAG is given below:
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow_pentaho.operators.KitchenOperator import KitchenOperator
from airflow_pentaho.operators.PanOperator import PanOperator
from airflow_pentaho.operators.CarteJobOperator import CarteJobOperator
from airflow_pentaho.operators.CarteTransOperator import CarteTransOperator
_DAG_NAME = "pdi_example_2"
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['[email protected]'],
'retries': 3,
'retry_delay': timedelta(minutes=10),
'email_on_failure': False,
'email_on_retry': False
}
with DAG(dag_id=DAG_NAME,
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
schedule_interval='30 0 * * *') as dag:
trans = PanOperator(
queue="pdi_2",
task_id="pdi_example_2",
directory={},
file="/path/sample.ktr",
trans={},
params={},
dag=dag)
job = KitchenOperator(
queue="pdi_3",
task_id="average_spent",
directory={},
job={},
file="/path/sample.kjb",
params={}, # Date in yyyy-mm-dd format
dag=dag)_
trans >> job
Hi!
I (for now) have to use old Airflow 1.10. Getting an error that I have found little information about and little info even in source code.
[2022-09-27 18:20:26,623] {taskinstance.py:1150} ERROR - __init__() missing 1 required positional argument: 'source' Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task result = task_copy.execute(context=context) File "/usr/local/lib/python3.8/site-packages/airflow_pentaho/operators/carte.py", line 97, in execute conn = self._get_pentaho_carte_client() File "/usr/local/lib/python3.8/site-packages/airflow_pentaho/operators/carte.py", line 90, in _get_pentaho_carte_client return PentahoCarteHook(conn_id=self.pdi_conn_id, File "/usr/local/lib/python3.8/site-packages/airflow_pentaho/hooks/carte.py", line 167, in __init__ super().__init__() TypeError: __init__() missing 1 required positional argument: 'source'
The thing is, the DAG works on my own machine (2.0 and own hosted carte), but not on the server where it is 1.10.
The airflow connections are all setup up with the extra parameters,
Is the plugin not compatible with older Airflow anymore?
The DAG:
`work_params = {
"pentaho_db_user": pentaho_dwh_db_user,
"pentaho_dwh_db_pass": pentaho_dwh_db_pass,
"pentaho_ods_db_pass": pentaho_ods_db_pass,
"pentaho_dwh_host": db_dwh_conn_params.get("db_ip"),
"pentaho_dwh_port": db_dwh_conn_params.get("db_port"),
"pentaho_dwh_db_name": db_dwh_conn_params.get("db_name"),
"pentaho_ods_host": db_ods_conn_params.get("db_ip"),
"pentaho_ods_port": db_ods_conn_params.get("db_port"),
"pentaho_ods_db_name": db_ods_conn_params.get("db_name"),
"pentaho_job_path": pentaho_job_path,
"insert_sql_path" : insert_sql_path
}
DAG_NAME = "eelis_pdi_dwh_f_lkoht"
DEFAULT_ARGS = {
'owner': 'me',
}
with DAG(dag_id=DAG_NAME,
default_args=DEFAULT_ARGS,
catchup=False,
start_date=days_ago(1),
schedule_interval='15 3 * * *') as dag:
# Define the task using the CarteJobOperator
insert_into_lkoht = CarteJobOperator(
task_id="job_testing1",
job=pentaho_job_path,
params= work_params,
dag=dag)
# ... #
insert_into_lkoht`
In airflow I can see that all the parameters etc are there and given. Not empty and finds the Pentaho job path as well.
Pip install is the newest 1.0.10 version too or should be. (some traces in older source code)
In my environment the Pentaho is not alone app on the server. So, the URL to the server is more complex: http://<server_address>:< port >/subpath
I have implemented the possibility to add subpath on my side. But whould be nice if this option will be implemented in the major version.
Hi guys,
I think I found the right place to report a problem.
I'm facing this error in start airflow webserver:
ERROR - Failed to import plugin airflow_pentaho
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/airflow/plugins_manager.py", line 210, in load_entrypoint_plugins
plugin_class = entry_point.load()
File "/usr/local/lib/python3.8/dist-packages/importlib_metadata/init.py", line 105, in load
module = import_module(match.group('module'))
File "/usr/lib/python3.8/importlib/init.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "", line 1014, in _gcd_import
File "", line 991, in _find_and_load
File "", line 975, in _find_and_load_unlocked
File "", line 671, in _load_unlocked
File "", line 783, in exec_module
File "", line 219, in _call_with_frames_removed
File "/usr/local/lib/python3.8/dist-packages/airflow_pentaho/plugin.py", line 19, in
from airflow_pentaho.hooks.carte import PentahoCarteHook
File "/usr/local/lib/python3.8/dist-packages/airflow_pentaho/hooks/carte.py", line 20, in
from packaging import version
ModuleNotFoundError: No module named 'packaging'
My environment is:
Ubuntu 20.04
OpenJDK 8
PDI 9.1
Airflow 2.1.0 & 2.0.2
Python 3.8.5
airflow-pentaho-plugin 1.0.8
Cheers,
Hello.
Using airflow-pentaho-plugin version 1.0.4 I see that parameters are not passing to transformation because they look like "-param1=param_value". But as it described in Pentaho 9.1 docs they should look like "-param:param1=param_value".
So I changed PanOperator class like this
def execute(self, context): # pylint: disable=unused-argument
conn = self._get_pentaho_client()
arguments = {
'dir': self.dir,
'trans': self.trans,
'level': self.level,
'logfile': self.logfile,
'safemode': 'true' if self.safemode else 'false',
'maxloglines': str(self.maxloglines),
'maxlogtimeout': str(self.maxlogtimeout)
}
# arguments.update(self.params)
if self.file:
arguments.update({'file': self.file})
arguments.update({'norep': 'true'})
self.command_line = conn.build_command('pan', arguments, self.params)
output = self._run_command()
if self.xcom_push_flag:
return output
And PentahoClient class
def build_command(self, command, arguments, params):
line = [self._build_tool_command(command),
self._build_connection_arguments()]
for k, val in arguments.items():
line.append(self._build_argument(k, val))
for k, val in params.items():
line.append(self._build_argument(f'param:{k}', val))
command_line = ' '.join(line)
return command_line
And everything got to work.
My task definition in Dag
trans1 = PanOperator(
dag=dag,
task_id='trans1',
xcom_push=True,
file='/home/airflow/repo/upload.ktr',
trans="",
params={"version_id": "102"})
My environment: Kubuntu 20.10, Python 3.8.6, apache airflow 2.0.1, pdi 9.1.0.0-324.
Hello,
We just want to use airflow-pentaho plugin with the Amazon Managed Workflows for Apache Airflow. But as we loaded plug-in as custom to the enviroment the following error appears on UI. Only available version of Airflow on AWS is 1.10.12 right now.
Is there a possiblity to use this plug-in in AWS? With requirements file or custom plug-in file?
"Broken DAG: [/usr/local/airflow/dags/dag_test.py] No module named 'airflow_pentaho.hooks'; 'airflow_pentaho' is not a package"
Thanks in advance..
Erinc
Hi, I'm trying to install airflow-pentaho-plugin I'm getting the following error. I'm also trying to use Python setup.py install command and the error still remains the same. Request you to help me unblock this installation process.
Environment - Python3.9
Pip version - 21.0.1
Command - sudo pip install airflow-pentaho-plugin --proxy=http://proxy:port
Error - ERROR: Could not find a version that satisfies the requirement setuptools-git-version
I've also run the requires.txt and I see the following message when I rerun:
Requirement already satisfied: xmltodict>=0.10.0 in /usr/lib/python3.9/site-packages (from -r requires.txt (line 1)) (0.12.0)
Could you please let me know what am I missing here?
Thanks,
Pavan
Hello!
Pentaho 9.4 + Carte
When using CarteTransOperator it cannot get execution status:
airflow.exceptions.AirflowException: Unexpected server response: {"webresult": {"result": "ERROR", "message": "The specified transformation [trans.ktr] could not be found", "id": null}}
It's because it passes .ktr along to transformation name. If ".ktr" is removed in _get_trans_name (carte.py file) - it works.
That's not the case in CarteJobOperator, it works.
Error running a transformation. Can anyone help?
[2022-11-09, 16:59:28 -03] {base.py:71} INFO - Using connection ID 'pdi_default' for task execution.
[2022-11-09, 16:59:28 -03] {carte.py:175} INFO - Executing /home/rmendonc/data-integration/transformacoes/LOOP.ktr
[2022-11-09, 16:59:28 -03] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow_pentaho/operators/carte.py", line 183, in execute
status = status_trans_rs['transstatus']
KeyError: 'transstatus'
[2022-11-09, 16:59:28 -03] {taskinstance.py:1406} INFO - Marking task as FAILED. dag_id=pdi_flow2, task_id=loop, execution_date=20221108T003000, start_date=20221109T195927, end_date=20221109T195928
[2022-11-09, 16:59:28 -03] {standard_task_runner.py:105} ERROR - Failed to execute job 256 for task loop ('transstatus'; 5457)
[2022-11-09, 16:59:28 -03] {local_task_job.py:164} INFO - Task exited with return code 1
[2022-11-09, 16:59:28 -03] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check
we have just upgraded from airflow v1.10.11 to airflow v2.2.4.
We are getting the following error in airflow. This dag was working correctly before the migration.
Broken DAG: [/opt/airflow/dags/dag1.py] Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/airflow/serialization/serialized_objects.py", line 578, in serialize_operator
serialize_op['params'] = cls._serialize_params_dict(op.params)
File "/usr/local/lib/python3.8/dist-packages/airflow/serialization/serialized_objects.py", line 451, in _serialize_params_dict
if f'{v.__module__}.{v.__class__.__name__}' == 'airflow.models.param.Param':
AttributeError: 'str' object has no attribute '__module__'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/airflow/serialization/serialized_objects.py", line 939, in to_dict
json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)}
File "/usr/local/lib/python3.8/dist-packages/airflow/serialization/serialized_objects.py", line 851, in serialize_dag
raise SerializationError(f'Failed to serialize DAG {dag.dag_id!r}: {e}')
airflow.exceptions.SerializationError: Failed to serialize DAG 'dag1': 'str' object has no attribute '__module__'
The dag looks like :
# DAG creation
with DAG (
dag_id="dag1",
default_args=DEFAULT_ARGS,
max_active_runs=1,
description='dag1',
schedule_interval=WORKFLOW_SCHEDULE_INTERVAL,
catchup=False
) as dag :
extract_tables = KitchenOperator (
pdi_conn_id='pdi_default',
task_id="extract_tables",
directory=PROJECT_DIRECTORY,
job="jb_01_load_ods_tables",
file=PROJECT_DIRECTORY+"jb_01_load_ods_tables.kjb",
params={
"date": '{{ ds }}'
}
)
If we remove "date": '{{ ds }}'
, the error in airflow disapears.
Is it possible that the issue is linked with airflow-pentaho-plugin?
Hello everyone, I'm facing a problem when trying to use the airflow_pentaho.operators.CarteTransOperator.
I am getting this error even after I have created the repository. CarteTransOperator just can't find my jobs. This is the log of my DAG.
ERROR Unexpected error executing the transformation: org.pentaho.di.core.exception.KettleException: Unable to find repository: pentaho_server org.pentaho.di.www.ExecuteTransServlet.openRepository(ExecuteTransServlet.java:410) org.pentaho.di.www.ExecuteTransServlet.doGet(ExecuteTransServlet.java:261) javax.servlet.http.HttpServlet.service(HttpServlet.java:687) javax.servlet.http.HttpServlet.service(HttpServlet.java:790) org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:873) org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:542) org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255) org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1700) org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255) org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1345) org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203) org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:480) org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1667) org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201) org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1247) org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144) org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:220) org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:61) org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:513) org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) org.eclipse.jetty.server.Server.handle(Server.java:505) org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370) org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267) org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305) org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103) org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117) org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333) org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310) org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168) org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126) org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366) org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:698) org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:804) java.lang.Thread.run(Thread.java:750)
This is my connection:
This is my task using the CarteTransOperator:
job1 = CarteTransOperator(
task_id="job1",
trans="/public/home/my_asewome_ktr",
params=None,
pdi_conn_id="pdi_pentaho_server",
level='Basic',
dag=dag)
The same happens when I try to run a job through the do CarteJobOperator.
Hi.
I need to set a new KETTLE_HOME value for the DAG execution.
Is that possible ?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.