Enkelt bibliotek for strukturerte tasks som kan kjøres på et cluster av maskiner, i definerte rekkefølger (sekvensielt, parallellt), med transaksjonstøtte og feilhåndtering.
Denne modulen implementerer rammeverk for å kjøre og fordele Prosess Tasks. En prosess task kan anses som en jobb som kjører asynkront, transaksjonelt, og kan kjøre avhengig av andre jobber. Ved feil under kjøring kan applikasjonen beskrive en feilhåndteringsalgoritme som skal benyttes før rekjøring eller varsling.
Prosesstasks er asynkrone bakgrunnsjobber som kjøres fordelt utover tilgjengelig podder. Muliggjør prosessering av transaksjoner og arbeidsflyt som paralle og sekvensielle oppgaver og kan spres utover 1 eller flere jvmer på ulike podder.
Ytterligere bakgrunnsinformasjon finnes her: Automasjon.
ProsessTasks kan enten kjøres med en gang, i en definert rekkefølge, eller på angitt tid (eks. cron uttrykk).
-- legg til i root pom
<dependency>
<groupId>no.nav.vedtak.prosesstask</groupId>
<artifactId>prosesstask-root</artifactId>
<version>2.1.0-20200131125359-9b02742</version>
<scope>import</scope>
<type>pom</type>
</dependency>
Vanlig bruk. Krever at applikasjonen har en egen implementasjon av ProsessTaskDispatcher (se Konfigurasjon under)
<dependency>
<groupId>no.nav.vedtak.prosesstask</groupId>
<artifactId>prosesstask</artifactId>
</dependency>
Denne kommer med avhengighet til felles-sikkerhet, og autentiserer bruker ved hver kjøring av task (i AuthenticatedCdiProsessTaskDispatcher
).
<dependency>
<groupId>no.nav.vedtak.prosesstask</groupId>
<artifactId>prosesstask-legacy</artifactId>
</dependency>
En ProsessTaskGruppe
definerer et sett med ProsessTask som skal kjøres sekvensielt/parallelt
Det er mulig å definere en digraph av prosesstasks i en jafs.
Eksempel - task #1 kjøres først, deretter 4 ulike tasks parallell(#2), så 2 tasks (#3) i parallell, til slutt kjøres #4.
#2a
+
+
#2b #3a
+ +
#1 +---------------+ #4
+ +
#2c #3b
+
+
#2d
Eksempel kode for over å sette opp over
var pt1 = new ProsessTaskGruppe("oppgave 1");
var pt2 = new ProsessTaskGruppe("oppgave 2a");
...
var gruppe = new ProsessTaskGruppe();
gruppe
.addNesteSekvensiell(pt1)
.addNesteParallell(pt2a, pt2b, pt2c, pt2d);
.addNesteParallell(pt3a, pt3b);
.addNesteSekvensiell(pt4);
;
ProsessTaskRepository repo = ...;
repo.lagre(gruppe);
Se task/src/test/resources/db/migration/defaultDS
for eksempel tabell DDL (passer postgresql)
Nåværende implementasjon forventer at META-INF/pu-default.prosesstask.orm.xml
er definert i applikasjonens persistence.xml (eller tilsvarende) og dermed er en del av samme EntityManager som applikasjoen benytter. Det gir tilgang til å dele transaksjon for en kjøring (krever dermed ikke midlertidig bokføring for status på tasks eller egen opprydding når noe går galt utover å sette en task til KLAR igjen)
(NB: brukes ikke dersom prosesstask-legacy brukes) Dette benyttes til sporing for endring av prosesstasks.
@ApplicationScoped
public class MinSubjectProvider implements SubjectProvider{
public String getUserIdentity(){
// return user identity fra container el.
return "[email protected]";
}
}
(NB: brukes ikke dersom prosesstask-legacy brukes) Dette er kobling mellom prosesstask data som er satt opp og implementasjon av en task.
@ApplicationScoped
public class MinTaskDispatcher implements ProsessTaskDispatcher {
public void dispatch(ProsessTaskData task) throws Exception {
String taskType = task.getTaskType();
switch(taskType){
case "innhent.inntektsopplysninger.task":
new InnhentInntektsOpplysningerTask().doRun(task.getAktørId());
return;
default:
throw new UnsupportedOperationException("Ukjent task type " + taskType);
}
}
}
TaskManager startes f.eks. fra en WebListener
@WebListener
public class TaskManagerStarter implements ServletContextListener {
@Override
public void contextInitialized(ServletContextEvent sce) {
// kan gjøre programmatisk lookup siden TaskManager er ApplicationScoped (en per applikasjoninstans)
var taskManager = CDI.current().select(TaskManager.class).get();
taskManager.start();
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
var taskManager = CDI.current().select(TaskManager.class).get();
taskManager.stop();
}
}
- Tasks polles fra database i rekkefølge de er definert (innenfor en gruppe)
- Kjøring av tasks foregår i en transaksjon, denne deles med andre database operasjoner som utføres i tasken, avgrenset av savepoints for bokføring (for feilhåndtering etc).
- En task gruppe kan definere både sekvensielle og parallelle tasks (se Bruk).
- Kun en maskin (jvm) vil kjøre en task til enhver tid. Dersom flere jvmer er satt opp fordeles tasks broderlig mellom dem.
- Ved polling har jvmen 30sekunder til å påstarte arbeid på tasken, etter det står andre jvmer fritt til å stjele denne.
- Skalerbarhet: avhenger av antall transaksjoner databasen kan kjøre samtidig og connections tilgjenglig på en pod. Hver JVM settes opp default med 3 worker threads for kjøring av tasks, og 1 poller thread. Dvs. connection pool for database bør tillate minimum 3 connections for utelukkende kjøre tasks ved default oppsett (trenger normalt noen flere dersom andre ting kjøres i samme jvm).
- Se TaskManager Polling SQL for algoritme som håndterer sekvensiell/parallell plukking av tasks, samt sørger for å fordele tasks utover ulike konsumenter. Dette gjøres vha. SQL WINDOW functions + SKIP LOCKED syntaks. Dette er hjertet av hele dette bibliotektet. Alt annet er støtte feilhåndtering, konfigurasjon, dispatch og API.
- Savepoints brukes til bokføring av kjørende tasks og feilhåndtering dersom noen tasks kaster exceptions. Enkelte exceptions (SQLTransientException etc) oppfører seg transient og vil automatisk retryes, mens andre er avhengig av definert feilhåndteringalgoritme. Hvilke defineres av angitt implementasjon av
ProsessTaskDispatcher
For updated information, always see LICENSE first!
Interne henvendelser kan sendes via Slack i kanalen #teamforeldrepenger.