Git Product home page Git Product logo

engine's Introduction

Engine CircleCI

A processing framework for executing jobs using the OpenFn ecosystem of language packs.

Installation

If available in Hex, the package can be installed by adding openfn_engine to your list of dependencies in mix.exs:

def deps do
  [
    {:engine, github: "OpenFn/engine", tag: "v0.7.2"}
  ]
end

Using

As part of your application

With this approach the available jobs and their triggers are stored in memory and loaded in via a YAML file.

  1. Add a module
defmodule MyApp.Engine do
  use Engine.Application, otp_app: :my_app
end
  1. Add it to your supervision tree
# application.ex
  def start(_type, _args) do

    children = [
      ...
      MyApp.Engine, [
        project_config: "file://" <> project_yaml_path,
        adaptors_path: Path.join(project_dir, "priv/openfn")
      ]
    ]

  end
  1. Add calls to process messages

Wherever you need to process a message, for example in a Phoenix controller. For example:

  ...
  alias Engine.Message

  def receive(conn, _other) do
    body =
      conn
      |> Map.fetch!(:body_params)
      |> Jason.encode!()
      |> Jason.decode!()

    runs = Microservice.Engine.handle_message(%Message{body: body})

    {status, data} =
          {:accepted,
           %{
             "meta" => %{"message" => "Data accepted and processing has begun."},
             "data" => Enum.map(runs, fn run -> run.job.name end),
             "errors" => []
           }}

    conn
    |> put_status(status)
    |> json(data)
  end

  ...
  1. Add some callbacks (optional)
defmodule MyApp.GenericHandler do
  use Engine.Run.Handler
  require Logger

  def on_log_emit(str, _context) do
    Logger.debug("#{inspect(str)}")
  end
end

Running without a supervisor

It's possible to use Engine without it being in your supervision tree. A common reason would be using some other queueing mechanism.

This can be achieved by calling start/2 directly on a handler:

defmodule MyApp.Handler do
  use Engine.Run.Handler
end

MyApp.Handler.start(run_spec)
# => %Result{...}

Configuration

A note on adaptors_path:

By default everything is installed into $PWD/priv/openfn.

Currently with the ShellRuntime module, we require NPM modules to be installed in a global style. Just like with npm install -g, except we control where those packages will be installed using the --prefix argument. Without using global installs you run the risk of new packages installed by Adaptor.Service overwriting all currently installed packages.

Callbacks

When using the Handler module, there are several callbacks that you can provide to hook into various steps in the processing pipeline:

  • on_log_emit/2
    Log chunks as they come out of the Log Agent, strings are emitted for each complete grapheme (i.e. a complete and decodeable chunk of utf-8 data). Depending on the job being executed it is possible for this callback to be called quite a lot. It's up to you to buffer this up before forwarding.

  • on_start/1
    When a job is about to be processed, this is called with the context that was provided to it with start/2.

  • on_finish/2
    When a job is has ended, contains the %Result{} struct and context as arguments.

Mix Tasks

  • openfn.install.runtime
    Assuming NodeJS is installed, it will install the latest versions of the most basic language packs.

Documentation can be generated with ExDoc and published on HexDocs. Once published, the docs can be found at https://hexdocs.pm/openfn_engine.

engine's People

Contributors

stuartc avatar taylordowns2000 avatar

Stargazers

 avatar  avatar

Watchers

 avatar  avatar  avatar

Forkers

wanecode

engine's Issues

project.yaml can use URI scheme for job expressions and credential bodies

in a project.yaml, users should be able to specify URI paths to job expressions, rather than expressions as stings. See portability proposal v3: https://docs.openfn.org/documentation/portability#proposal-v3

Note how credentials are shaped: they either have keys and values inside a keyed object, or only have a path to the full body (i.e., some-credential.json)

jobs:
  job-1:
    expression: "alterState(state => ({ ...state, changed: true }))"
    trigger: trigger-1
  job-2:
    expression: file:///home/taylor/cool-job.js
    trigger: trigger-1
  job-3:
    expression: github:openfn/sample/myJob.js
    trigger: trigger-1
  job-4:
    expression: http://www.github.com/openfn/sample2.js
    trigger: trigger-1

credentials:
  my-secret-credential:
    username: '******'
    password: '******'
  my-other-credential: 'file://gcp_credential.json'

we should start with standard URIs, but maybe include some of the above.

Ensure adaptor version always gets logged when calling core

Right now when job adaptors are set to "auto-upgrade" and engine is used via platform, we don't see the actual version that was used:
image.png

When they're version locked (as most are) we do see it, but it's not formatted how we'd expect:
image.png

This output should be clean and clear, it's critical for debugging. (Not putting this in core because it seems to work fine when being called by the cli.)

AdaptorService crashes if there are no adaptors

** (Mix) Could not start application open_fn: OpenFn.start(:normal, []) returned an error: shutdown: failed to start child: Engine.Adaptor.Service
    ** (EXIT) an exception was raised:
        ** (File.Error) could not read file "package.json": no such file or directory
            (elixir 1.11.4) lib/file.ex:355: File.read!/1
            (engine 0.3.1) lib/engine/adaptor/repo.ex:12: anonymous fn/1 in Engine.Adaptor.Repo.list_local/2
            (elixir 1.11.4) lib/enum.ex:1411: Enum."-map/2-lists^map/1-0-"/2
            (engine 0.3.1) lib/engine/adaptor/service.ex:85: Engine.Adaptor.Service.State.refresh_list/1
            (engine 0.3.1) lib/engine/adaptor/service.ex:98: Engine.Adaptor.Service.start_link/1
            (stdlib 3.14.2.2) supervisor.erl:385: :supervisor.do_start_child_i/3
            (stdlib 3.14.2.2) supervisor.erl:371: :supervisor.do_start_child/2
            (stdlib 3.14.2.2) supervisor.erl:355: anonymous fn/3 in :supervisor.start_children/2

Keep config in state

When calling Engine.handle_message/2 you are required to call with a Config struct, lets get that to be stored in a process.

Fail gracefully when shellruntime exits without final state

Currently a final state is all but gaurenteed, if the command fails because the runtime couldn't be executed there will be no final state - rather check and warn of no state to copy.

This does beg the question of why there is now an exception to the rule of saving a failed runs state.. And if we save failed states (which dont carry the failure reason) then subsequent failure triggers get either no state or failed state. We should discuss what kind of expectations we have around failure flow triggers.

Installing a new adaptor can hold up concurrent requests

Installing adaptors (Engine.Adaptor.Service.install!/2) is mostly non-blocking for the agent - however the refresh step is not - and keeps the agent busy while the filesystem is scanned.

Lets make this happen outside the agent with a get and an update - and briefly consider an ets table.

Does /usr/bin/env cause problems in docker containers with non-standard app users?

I'm still getting a spooky error on lightning:

NODE_PATH=./priv/openfn/lib PATH=./priv/openfn/bin:/app/erts-12.2.1/bin:/app/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
cmd:
core execute \
 -o /tmp/output-1654009907-7-3fgxeg.json
 -s /tmp/state-1654009907-7-f6usl6.json \
 -l \
 -e /tmp/expression-1654009907-7-9zc4q1.json \
sh: 1: core: not found

Two things jump out at me: first the language pack is blank (and that's likely a deployment/config issue with lightning) but second the NODE_PATH seems ๐Ÿ‘Œ but we're getting an error (sh: 1: core: not found) that I'd expect to see if the NODE_PATH was not properly set. I can confirm this when I run the following commands by hand:

$ NODE_PATH=./priv/openfn/lib PATH=./priv/openfn/bin:/app/erts-12.2.1/bin:/app/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin core execute
core execute

run an expression

Options:
      --version          Show version number                           [boolean]
  -l, --language         resolvable language/adaptor path             [required]
  -e, --expression       target expression to execute                 [required]
  -s, --state            Path to initial state file.                  [required]
  -o, --output           Path to write output.
  -t, --test             test mode, no HTTP requests
      --nc, --noConsole  if set to true, removes console.log for security
      --help             Show help                                     [boolean]

Examples:
  core execute -l salesforce -e foo.js -s   Using the salesforce language pack,
  state.json                                execute foo.js to STDOUT

Missing required arguments: language, expression, state
$ core execute
/bin/sh: 3: core: not found

@stuartc , is it possible that Rambo isn't respecting the NODE_PATH stuff because it's expecting something to be accessible first in /usr/bin/env?

"/usr/bin/env",

Aligning "meta" with platform?

If users of platform-app make use of state.meta to access things like the runId, there's not much we can do about it on microservice... in that sense, it is possible to write jobs which run on platform but not on microservice. I think that's always going to be possible, and we'll have even less control over how folks build and pass around state on their own implementations.

However, I think that including a last_success timestamp is arguably valuable enough to be shared between platform and microservice... it's very useful for keeping a "cursor" between runs, allowing a cron job to access only resources that have been created/updated since the last_success -ful run of the job.

Thoughts?

Raise an error if npm install fails during "install runtime" mix task

The System.cmd() in lib/mix/engine.install.runtime.ex should be wrapped in a case statement that checks for the exit of the npm install command then raises an error if it fails. Oterhwise, folks could run mix openfn.install.runtime and get a false success due to npm errors.

    System.cmd(
      "/usr/bin/env",
      [
        "sh",
        "-c",
        "npm install --prefix $NODE_PATH --global #{package_list}"
      ],
      env: [{"NODE_PATH", @default_path}],
      stderr_to_stdout: true,
      into: IO.stream(:stdio, :line)
    )

Disable adaptor listing conditionally

In some environments the adaptor list scanning takes a few seconds (on a fast disk), however any parent app using engine has to wait for this; including test runners.

It would be nice to be able to control this startup step in environments like tests.

In addition in order to rely on existing packages for tests, would it be possible to lazy load/check for a given package?

Write all temporary files to the basedir

We currently have errors on circle when trying to read from the state repo (either permissions or the file doesn't exist)

Perhaps we consolidate the state/tmp files into the same folder so it's easier to track why something is failing

A higher level interface (like get_last_state) should be used when checking for current job state - instead of stat!..

Validate for adaptors_path when building env for ShellRuntime

When RunSpec.adaptors_path isn't set then Rambo gets:

[
  timeout: 3000,
  log: #Function<1.77575986/1 in OpenFn.RunHandler.start/2>,
  env: %{
    "NODE_PATH" => nil,
    "PATH" => "./assets/node_mod...

The nil env var causes Rambo to raise an exception, we need to either enforce the use of the adaptors_path field or skip adding NODE_PATH to the env.

Provide replacement for execute_sync/2

Since the introduction of Run.Handler and some of the pre & post logic from RunTask (no longer available) being moved into RunDispatcher, we need a simple entrypoint for processing a Message and a Job.

The original implementation for execute_sync/2 didn't perform any matching nor did it register state (as it technically handled what is late in the pipeline).

Provide a centralised point for accessing app configuration

We currently have some 'sane' defaults like :adaptor_service_name etc, for looking up the Adaptor service process. However changing it for any reason makes things pretty difficult to track.

Oban has some lean and useful examples on how to deal with this:

I think we can benefit significantly by adopting this kind of pattern, our lack of centralized app config is getting unwieldy.

"ShellRuntime task exited without a value" during Kubernetes rollouts during high-traffic periods

We're seeing this error on occasion during restarts or kubernetes rollouts during high traffic periods... as if Engine isn't quite ready when it's getting work sent to it by an individual pod that's just come online?

Oban error

{
  "at": "2021-11-01T22:45:10.383540Z",
  "error": "** (RuntimeError) ShellRuntime task exited without a value\n (open_fn 1.97.44) 
lib/engine/run/handler.ex:95: OpenFn.RunHandler.wait/1\n (open_fn 1.97.44)
lib/dispatcher.ex:68: Dispatcher.execute/2\n (open_fn 1.97.44)
lib/open_fn/job_runner/processor.ex:99: OpenFn.Processor.process/1\n (open_fn 1.97.44) 
lib/open_fn/job_runner/processor.ex:63: OpenFn.Processor.perform/1\n (oban 2.9.2) 
lib/oban/queue/executor.ex:212: Oban.Queue.Executor.perform_inline/2\n (oban 2.9.2) 
lib/oban/queue/executor.ex:200: Oban.Queue.Executor.perform_inline/2\n (elixir 1.11.4) 
lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2\n (elixir 1.11.4)
lib/task/supervised.ex:35: Task.Supervised.reply/5\n",
  "attempt": 1}

Sentry log

https://sentry.io/organizations/openfn/issues/2598294287/?project=118735&query=is%3Aunresolved

AdaptorService enhancements

@stuartc , how much of this is already addressed by the latest on main?

  • Version matching
    • What does latest actually mean?
    • Do we have a plan for refreshing/revoking the 'latest'.
  • Make sure the find_adaptor functions make sense.
  • Test against microservice
  • Module docs
    • Make sure the suffix {adaptors_path}/node_modules is explained.

Start as an application, execute, match and get a result

As a 'tracer-bullet' issue, we want to be able to:

  1. be able to start the application (as an app in a supervision tree of a phoenix app?) with a config file to the application (triggers, jobs, credentials?)
  2. be able to call OpenFn.Engine.handle_payload from the bigger phoenix app.
  3. it returns "result" (which the bigger phoenix app won't know how to do ANYTHING with...)
applications: [
  supervisor(OpenFn.Repo),
  worker(OpenFn.Engine, appConfig)
]

In order to validate this: OpenFn/microservice should use this library via git and be able to trigger a specific expression using something like this:

httpRequest
|> platformCE.acceptRequest
|> OpenFn.Engine.handle_payload()
|> Logger.info()
:ok

As a side effect of this work will be resulting datatypes, including (but not limited to): %OpenFn.Result{} as the return type from handlePayload/1

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.