Demonstrate how to easily leverage a custom remote to send work for processing in a Web Worker...
Notes:
- The Worker registers for message events, and then sends 0 or more progress events, and finally some kind of finish event.
- The remote supports abort, so a worker can be killed
- You have to talk via js-serializable messages but the remote can convert to/from EDN so that auto-merge facilities work, thus
returning
works
Remote
(defn web-worker-remote
"Create a Fulcro remote that forwards (only) mutations web workers. The web worker
is given by `js-file`.
The worker will receive an event of the form: #js {:cmd (name mutation-symbol) :payload (clj->js params) :requestId unique-str}.
The web worker MUST post a messages that take the form: #js {:cmd 'xxx' :requestId unique-str-from-event :payload {}}`,
where the payload must always be a map, and the requestId must be the one it is responding to (what it was sent as an event to
process). The special command `done` means the worker has finished the given mutation, and the `payload` is the return value
of the mutation. ALL other commands are sent as UPDATES to the same mutation (e.g. progress reports).
The returned map from the worker will be turned to CLJ, and will be available for normalization via the normal mutation
`returning` mechanism. It is therefore recommended that you include a namespaced ID in the map for this
purpose (e.g. #js {'project/id': id-to-normalize ...})."
[js-file]
(log/debug "Starting web worker remote" js-file)
(let [active-requests (atom {})]
{:transmit! (fn transmit! [_ {:keys [::txn/ast ::txn/result-handler ::txn/update-handler] :as send-node}]
(let [{:keys [dispatch-key params type] :as _mutation-node} (if (= :root (:type ast))
(first (:children ast))
ast)
request-id (str (random-uuid))
abort-id (or
(-> send-node ::txn/options ::txn/abort-id)
(-> send-node ::txn/options :abort-id)
request-id)
edn (eql/ast->query ast)]
(let [worker (js/Worker. js-file)]
(try
(when (not= type :call) (throw (ex-info "This remote only handles mutations" {})))
(let [msg #js {:cmd (name dispatch-key)
:requestId request-id
:payload (clj->js params)}
listener (fn listener* [event]
(let [{:keys [cmd payload]} (js->clj (.-data event))]
(log/debug "Received worker event with " cmd payload)
(if (= cmd "done")
(do
(.removeEventListener worker "message" listener*)
(.terminate worker)
(swap! active-requests dissoc request-id)
(try
(result-handler {:status-code 200
:transaction edn
:body {dispatch-key payload}})
(catch :default e
(log/error e "Result handler for web worker remote failed."))))
(try
(update-handler {:transaction edn
:body {dispatch-key payload}})
(catch :default e
(log/error e "Web worker update handler threw unexpected exception."))))))]
(swap! active-requests assoc request-id {:listener listener
:abort-id abort-id
:worker worker})
(.addEventListener worker "message" listener)
(.postMessage worker msg)
request-id)
(catch :default e
(log/error e "Unexpected internal exception")
(result-handler {:status-code 500
:status-text "Internal Error"})
(.terminate worker))))))
:abort! (fn [_ id]
(when-let [{:keys [worker
listener
request-id]} (reduce-kv
(fn [_ k v]
(when (= id (:abort-id v))
(reduced (assoc v :requestId k))))
nil
@active-requests)]
(log/debug "Aborting request " request-id)
(.removeEventListener worker "message" listener)
(.terminate worker)
(swap! active-requests dissoc request-id)))}))
Notes: Here we start a new worker for each request. That way, you could also leverage comp/transact!
's option :parallel? true
to submit multiple work items in parallel. Depending on your use case, you might want to have a single, reused worker or a pool of workers.
Worker
function progress(request, id, stage) {
self.postMessage({
request, payload: {
"worker-result/status": stage,
"worker-result/id": id
}
})
}
function done(request, output) {
self.postMessage({
cmd: "done",
request, payload: {
"worker-result/status": "success",
"worker-result/output": output,
}
})
}
function failed(request, error) {
self.postMessage({
cmd: "done",
request, payload: {
"worker-result/status": "failed",
"worker-result/output": { errors: error }
}
})
}
self.addEventListener('message', (e) => {
const { data: { cmd, requestId, payload } } = e
const { text } = payload
const doTheWork = (text) => { try { done(requestId, text.upperCase()); } catch (e) { failed(requestId, e); } }
progress(requestId, `Starting...`);
if (cmd === "transform") {
setTimeout(() => progress(requestId, `Still working...`), 1000);
setTimeout(doTheWork, 1000);
} else { failed(requestId, `Unknown command: ${cmd}`); }
})
UI and mutation
(ns ex-worker-remote
(:require
[com.fulcrologic.fulcro.algorithms.merge :as merge]
[com.fulcrologic.fulcro.dom :as dom]
[com.fulcrologic.fulcro.mutations :as m]
[com.fulcrologic.fulcro.components :as comp :refer [defsc]]
[com.fulcrologic.fulcro.algorithms.normalized-state :as fns]
[com.fulcrologic.fulcro.algorithms.tx-processing :as txn]))
(defsc WorkerResult [_ {:worker-result/keys [status output]}]
{:query [:worker-result/id :worker-result/output :worker-result/status]
:ident :worker-result/id}
(dom/div
(str "Result status: " (or status "none."))
(if (= status "failed")
(dom/div :.ui.red.message (dom/p "Something went wrong:" (pr-str (some-> output (js/JSON.parse) (js->clj :keywordize-keys true) :errors seq))))
(dom/div "Output: " (dom/kbd output)))))
(def ui-worker-result (comp/factory WorkerResult))
(m/defmutation do-in-worker
"Send work to a Web Worker & return the result. Input:
id - The ID of the request. You make it up (SHA of source is recommended).
input - the input (text) to process
result-key - (optional) Where the build result should be joined on `ref`.
The mutation returns a WorkerResult (normalized as `[:worker-result/id id]`).
"
[{:keys [id input result-key] :or {result-key ::result}}]
(action [{:keys [state ref] :as _env}]
(fns/swap!-> state
(assoc-in (conj ref :ui/working?) true)
(merge/merge-component WorkerResult {:worker-result/id id
:worker-result/output {}
:worker-result/status "working"}
:replace (conj ref result-key))))
(progress-action [{:keys [state progress] :as _env}]
(when-let [status (-> progress :body (get `do-in-worker) :worker-result/status)]
(swap! state assoc-in [:worker-result/id id :worker-result/status] status)))
(ok-action [{:keys [state ref] ::txn/keys [options]}]
(fns/swap!-> state (assoc-in (conj ref :ui/working?) false))
(let [status (get-in @state [:worker-result/id id :worker-result/status])
{:keys [on-success]} options]
(when (and (= "success" status) on-success) (on-success))))
(error-action [{:keys [state ref]}]
(fns/swap!-> state
(assoc-in (conj ref :ui/working?) false)
(assoc-in [:worker-result/id id :worker-result/status] "failed")))
(web-worker [env]
(-> env
(m/with-params {:id id :text input})
(m/returning WorkerResult))))
Putting it all together
Create fuclro-app
with remotes containing :web-worker (web-worker-remote "path/to/my-worker.js")
, add a UI button to transact the do-in-worker
mutation and display the progress/result.