flink-extended / ai-flow Goto Github PK
View Code? Open in Web Editor NEWAI Flow is an open source framework that bridges big data and artificial intelligence.
License: Apache License 2.0
AI Flow is an open source framework that bridges big data and artificial intelligence.
License: Apache License 2.0
Configure AIFlow webserver with aiflow-server.yaml
Currently, the airflow event-based scheduler has only been tested with LocalExecutor, which may not meet the production deployment requirement.
It is crucial to have the event-based scheduler tested with the Kubernetes executor. This might involve some development.
Config init cli to initialize AIFlow configuration
Add server cli command that starts the notification server.
update_dataset should send an event to tigger the job defined by action_on_dataset_event
AirFlowScheduler should get airflow_deploy_path from airflow config if it is not set.
proto_to_dataset_meta
converts the properties field of the DatasetProto
incorrectly, which causes the pickle exception when read_dataset
uses the dataset meta. The exception is as follows:
TypeError: can't pickle google.protobuf.pyext._message.ScalarMapContainer objects
Currently, Airflow cannot use LocalExecutor with Sqlite under the impression that SQLite cannot support multiple connections link. However, according to link, it is allowed.
Allowing LocalExecutor to run with SQLite can release the burden to set up a Mysql database for the first-time user to try out our project.
Running LocalExecutor with SQLite is strongly discouraged in production and we should let the user know in log.
As Cli tools are added to AIFlow and Notification, the helm chart should use that to start the services.
If two processes download the same OSS file. One process may be downloading and corrupt the file while the other process is extracting it.
======================================================================
ERROR: setUpClass (api.ut_workflows.workflows.test_ops.test_ops.TestOps)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/home/runner/work/ai-flow/ai-flow/ai_flow/test/api/ut_workflows/workflows/test_ops/test_ops.py", line 51, in setUpClass
init_ai_flow_context()
File "/home/runner/work/ai-flow/ai-flow/ai_flow/api/ai_flow_context.py", line 47, in init_ai_flow_context
Error: raise Exception('init_ai_flow_client and init_ai_flow_context cannot be called at the same time.')
Exception: init_ai_flow_client and init_ai_flow_context cannot be called at the same time.
Steps to reproduce the behavior (if you can):
At present, the Flink job plugin only supports Flink version 1.11, and needs to support multiple versions of Flink like 1.14 etc.
Make NS be able to throw exceptions if errors occurred in start_listen_events
Exceptions like below happens, but no ERROR log found in main thread.
>>> _start_listen_events(None)
>>> Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.7/site-packages/notification_service/client.py", line 538, in listen
NOTIFICATION_TIMEOUT_SECONDS)
File "/usr/local/lib/python3.7/site-packages/notification_service/client.py", line 511, in list_events_from_version
response = client.notification_stub.listAllEvents(request)
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 923, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 826, in _end_unary_response_blocking
raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.RESOURCE_EXHAUSTED
details = "Received message larger than max (5592234 vs. 4194304)"
debug_error_string = "{"created":"@1636961660.153916701","description":"Received message larger than max (5592234 vs. 4194304)","file":"src/core/ext/filters/message_size/message_size_filter.cc","file_line":204,"grpc_status":8}"
>
Add server cli command that starts the AIFlow server.
The DingTalk group scanned by the QRCode could not be joined. Only can be joined by the team group. Please change dingtalk group settings
Now a job triggerd by event can access to the event by list_events
and then get the lateset event, is there a more intuitive way?
[Notification Service] Support database version migration
Currently, Artifact is identified by its name. As a result, different projects can not register artifacts with same name.
It is worthwhile to discuss if this behavior is by design(i.e. artifacts are globally unique) or the artifact can be per-project.
The workflow operations need to support the command line interfaces for maintenance.
The current notification service is an embedded service in ai flow.
Currently there are the following shortcomings:
The improvements that need to be made are as follows:
The advantages of the improvement are:
We remove the way of configuring notification uri and db of Airflow webserver from cli argument in favor of using configuration file airflow.cfg
. But, Airflow web server doesn't configure notification uri and db from airflow.cfg
[AI Flow]The sklearn batch train and stream predict example calls an illegal function get_notification_service_uri.
Config init cli to initialize notification server configuration
Now create a workflow with same code but different parameters must copy code to other directory, it's painful. Would like to offer the ability to create workflows generation based on workflow templates and parameters.
Currently the links in the project readme file still point to the wiki pages in flink-ai-extended. The links should be updated to point to this project. And the docs in flink-ai-extended should probably be removed.
Now ai-flow logs separate in many locations, it's very inconvenient to locate problem through logs.
We can display all ai-flow related logs in the AIFlow Web UI
At runtime the workflow directory is not set correctly.
FileTaskHandler
, a python log handler that handles and reads task instance logs, supports the current log mechanism which uses seq_num
and try_number
to name the log file. The remote log handler like S3TaskHandler
should support the log mechanism.
Add examples in Airflow to Nightly CI
If you are using AIFlow, first we would like to Thank You. Here, we sincerely invite you to take a minute to feedback on your usage scenario.
We are always interested in finding out who is using AIFlow, what attracted you to use it, how we can listen to your needs, and if you are interested, help promote your organization.
Pls. submit a comment in this issue to include the following information:
You can refer to the following sample answer for the format:
* Organization: XX Company
* Location: Seoul, South Korea
* Contact: [email protected]
* Version: v1.0.0
* Status: production
* Expectations(optional): Data ingest service
Thanks again for your participation!
AIFlow Community
Currently, the init function of AIFlowClient contains 3 params, however, the server_uri and notification_service_uri can be achieved from project_config. So the first 2 params are not necessary and can be removed.
def __init__(self,
server_uri=_SERVER_URI,
notification_service_uri=None,
project_config: ProjectConfig = None):
Remove the first 2 params from init function of AIFlowClient.
When a dag is submitted to scheduler, but no dagruns generated. After scheduler reboot for new loop, it will attempt remove periodic events if the dag is periodically. Then the bug will show as followed.
Debian
MariaDB
Python3.7.5
Steps to reproduce the behavior (if you can):
When the current AI Flow version is upgraded, there is no migration plan for historical data, so it needs to support version upgrades and downgrades.
The response of the listEvents
service interface returns too long which exceeds the GRPC max length for Airflow caller when the events listed are too much. It should introduce the countEvents interface for Airflow caller.
Steps to reproduce the behavior (if you can):
Using the Airflow database, the Airflow frontend shows all the events of the Notification Service, which should be required from the database of the Notificiation Service. It should separate the Notification Service database uri from the Airflow webserver. The backend of the Airflow UI could query the events of the Notification Service from the database of the Notification Service.
Hope support pause_job_scheduling for sometimes a job is periodic, now only support pause_workflow_scheduling
To fix model_version_control_dependency's scheduling bug after introducing sender field, we set the sender of events generated by model_version_control_dependency's to be '*'. But after running wide and deep demo, we find that such change will make UI fail to work as it can not find correct sender.
Run wide and deep demo with latest master code
Blob Manager Supporting S3 Storage backend
Suggestion: python SDK for S3: http://boto.readthedocs.org/en/latest/s3_tut.html
Celery Executor throws error, and stop updating TaskState.
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/contrib/jobs/event_based_scheduler_job.py", line 116, in sync
self.timers.run()
File "/usr/local/lib/python3.7/sched.py", line 151, in run
action(*argument, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/contrib/jobs/event_based_scheduler_job.py", line 104, in repeat
action(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 367, in sync
self.update_all_task_states()
File "/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 430, in update_all_task_states
state, info = state_and_info_by_celery_task_id.get(async_result.task_id)
TypeError: cannot unpack non-iterable NoneType object
Linux
Show AIFlow version in AIFlow web UI
Jupyter Notebook is a common tool for AI developers. AIFlow should supports the workflow development on the Jupyter Notebook.
Move Airflow related script from aiflow/bin to airflow/bin
ubuntu 20.04
ai-flow 0.3.dev0
Traceback (most recent call last):
File "./demo/workflows/simple_workflow/simple_workflow.py", line 27, in
main()
File "./demo/workflows/simple_workflow/simple_workflow.py", line 6, in main
af.init_ai_flow_context()
File "/root/miniconda3/lib/python3.7/site-packages/ai_flow/api/ai_flow_context.py", line 46, in init_ai_flow_context
ensure_project_registered()
File "/root/miniconda3/lib/python3.7/site-packages/ai_flow/api/ai_flow_context.py", line 57, in ensure_project_registered
client = get_ai_flow_client()
File "/root/miniconda3/lib/python3.7/site-packages/ai_flow/client/ai_flow_client.py", line 61, in get_ai_flow_client
current_uri = current_project_config().get_server_uri()
File "/root/miniconda3/lib/python3.7/site-packages/ai_flow/project/project_config.py", line 31, in get_server_uri
if len(self["server_uri"].split(',')) > 1 and not self.get_enable_ha():
KeyError: 'server_uri'
Steps to reproduce the behavior (if you can):
INFO [alembic.runtime.migration] Context impl MySQLImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 8, in
sys.exit(main())
File "/usr/local/lib/python3.7/site-packages/airflow/main.py", line 40, in main
args.func(args)
File "/usr/local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 90, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/db_command.py", line 48, in upgradedb
db.upgradedb()
File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 688, in upgradedb
command.upgrade(config, 'heads')
File "/usr/local/lib/python3.7/site-packages/alembic/command.py", line 320, in upgrade
script.run_env()
File "/usr/local/lib/python3.7/site-packages/alembic/script/base.py", line 563, in run_env
util.load_python_file(self.dir, "env.py")
File "/usr/local/lib/python3.7/site-packages/alembic/util/pyfiles.py", line 92, in load_python_file
module = load_module_py(module_id, path)
File "/usr/local/lib/python3.7/site-packages/alembic/util/pyfiles.py", line 108, in load_module_py
spec.loader.exec_module(module) # type: ignore
File "", line 728, in exec_module
File "", line 219, in _call_with_frames_removed
File "/usr/local/lib/python3.7/site-packages/airflow/migrations/env.py", line 108, in
run_migrations_online()
File "/usr/local/lib/python3.7/site-packages/airflow/migrations/env.py", line 102, in run_migrations_online
context.run_migrations()
File "", line 8, in run_migrations
File "/usr/local/lib/python3.7/site-packages/alembic/runtime/environment.py", line 851, in run_migrations
self.get_context().run_migrations(**kw)
File "/usr/local/lib/python3.7/site-packages/alembic/runtime/migration.py", line 608, in run_migrations
for step in self._migrations_fn(heads, self):
File "/usr/local/lib/python3.7/site-packages/alembic/command.py", line 309, in upgrade
return script._upgrade_revs(revision, rev)
File "/usr/local/lib/python3.7/site-packages/alembic/script/base.py", line 439, in _upgrade_revs
for script in reversed(list(revs))
File "/usr/local/lib/python3.7/site-packages/alembic/script/revision.py", line 798, in iterate_revisions
assert_relative_length=assert_relative_length,
File "/usr/local/lib/python3.7/site-packages/alembic/script/revision.py", line 1396, in _collect_upgrade_revisions
assert_relative_length=assert_relative_length,
File "/usr/local/lib/python3.7/site-packages/alembic/script/revision.py", line 1193, in parse_upgrade_target
return self.get_revisions(target)
File "/usr/local/lib/python3.7/site-packages/alembic/script/revision.py", line 528, in get_revisions
id # type:ignore [arg-type]
File "/usr/local/lib/python3.7/site-packages/alembic/script/revision.py", line 747, in _resolve_revision_number
self._revision_map
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 893, in get
obj.dict[self.name] = result = self.fget(obj)
File "/usr/local/lib/python3.7/site-packages/alembic/script/revision.py", line 189, in _revision_map
for revision in self._generator():
File "/usr/local/lib/python3.7/site-packages/alembic/script/base.py", line 136, in _load_revisions
script = Script.from_filename(self, vers, file)
File "/usr/local/lib/python3.7/site-packages/alembic/script/base.py", line 999, in from_filename
module = util.load_python_file(dir, filename)
File "/usr/local/lib/python3.7/site-packages/alembic/util/pyfiles.py", line 92, in load_python_file
module = load_module_py(module_id, path)
File "/usr/local/lib/python3.7/site-packages/alembic/util/pyfiles.py", line 108, in load_module_py
spec.loader.exec_module(module) # type: ignore
File "", line 728, in exec_module
File "", line 219, in _call_with_frames_removed
File "/usr/local/lib/python3.7/site-packages/airflow/migrations/versions/849da589634d_prefix_dag_permissions.py", line 27, in
from flask_appbuilder import SQLA
File "/usr/local/lib/python3.7/site-packages/flask_appbuilder/init.py", line 6, in
from .base import AppBuilder # noqa: F401
File "/usr/local/lib/python3.7/site-packages/flask_appbuilder/base.py", line 8, in
from .api.manager import OpenApiManager
File "/usr/local/lib/python3.7/site-packages/flask_appbuilder/api/manager.py", line 7, in
from flask_appbuilder.baseviews import BaseView
File "/usr/local/lib/python3.7/site-packages/flask_appbuilder/baseviews.py", line 21, in
from .forms import GeneralModelConverter
File "/usr/local/lib/python3.7/site-packages/flask_appbuilder/forms.py", line 16, in
from .fields import EnumField, QuerySelectField, QuerySelectMultipleField
File "/usr/local/lib/python3.7/site-packages/flask_appbuilder/fields.py", line 6, in
from wtforms.compat import string_types, text_type
ModuleNotFoundError: No module named 'wtforms.compat'
stop_workflow_execution would set the state of workflow execution which is FINISHED to KILLED.
Steps to reproduce the behavior (if you can):
stop_workflow_execution should only kill running workflow execution.
stop_workflow_execution would set the state of workflow execution which is FINISHED to KILLED.
Currently, the AIFlow and Airflow starting script check if the configuration file exists. If the AIFlow server configuration doesn't exist, it will create one at AIFLOW_HOME
directory, and some of the config options are taken from the environment variable, such as AIFLOW_DB_CONN
.
Such behavior can be confusing, since the environment variable only used during generating configuration but not during starting the AIFlow server.
Therefore, we purpose to remove all usage of environment variable. User should run a script to generate the default configuration file, modified the configuration file if needed, and then start the AIFlow server. And Airflow server will use the same patten above.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.