flink-extended / flink-scala-api Goto Github PK
View Code? Open in Web Editor NEWFlink Scala API is a thin wrapper on top of Flink Java API which support Scala Types for serialisation as well the latest Scala version
License: Apache License 2.0
Flink Scala API is a thin wrapper on top of Flink Java API which support Scala Types for serialisation as well the latest Scala version
License: Apache License 2.0
In one of the recent release we added new functionality to allow case classes to evolve schemas. #80
This issue is a TODO to describe what case class changes are supported when Flink job restores its state with previous case class schema.
We have one test which fails randomly in GitHub CI. Example: https://github.com/flink-extended/flink-scala-api/actions/runs/6855224529
I'm trying to use List in my model, however when I do that I have runtime exception during my tests. It seems like I end up with a broken model, with a List that throws whatever I do with it. I'm getting a java.util.NoSuchElementException: head of empty list
exception from inside the Scala standard lib, so it's neither empty nor containing valid values.
I created a repro as a new test in this branch: erwan@2d339dc
Is there anything I should do differently here? Or maybe that would be a bug in the library?
(migrated from findify/flink-adt#71 as the same issue is present in this new lib)
Implement Gitet8 project template for flink-scala-api.
Tests are written either in JUnit or in ScalaTest, if there're no objections, I would like to migrate everything to ScalaTest.
Based on chosen project groupId and artifactId configure SBT to publish released version to Sonatype repo.
What Flink versions this library needs to support? I assume Scala users might want to use not only the latest Flink version, but also one or two previous versions. Any ideas what can we do to cover multiple versions or should we just stick to latest one and rely on non-breaking changes in Flink 1.x public Java API?
There's no out-of-the-box UUID type information support and I'm happy to implement type information for it so that I and others could use it, the question is whether this is something this library intends to do or it's up to the user to deal with.
Thanks!
First of all, thank you for bringing us this library to rescue me of the schema/state migration of Flink.
I have a question about the version compatible with Flink. I notice the oldest version of this lib is 1.15.4_1.0.0
, which means it needs at least Flink 1.15. But I tested it with the following:
ThisBuild / version := "0.0.1"
ThisBuild / scalaVersion := "2.12.17"
val flinkVersion = "1.13.2"
val flinkDependencies = Seq(
// "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
// "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
("org.apache.flink" %% "flink-clients" % flinkVersion)
.exclude("org.apache.flink", "flink-runtime_2.12")
.exclude("org.apache.flink", "flink-streaming-java_2.12"),
"org.flinkextended" %% "flink-scala-api" % "1.15.4_1.1.1"
)
it works fine with the simple case class schema evolution, like adding fields, or changing field types, although it is Flink 1.13.2 (Align with AWS), I would like to confirm if there are any potential issues with mixing this lib 1.15.4_1.0.0
with Flink 1.13.2
? Currently we can do re-bootstrap the job in prod, but try to avoid re-bootstrap once is deployed fully in prod and using state migration with snapshot in AWS.
Also, is there any example of how to use this library for:
can be extended with custom serializers even for deeply-nested types, as it uses implicitly available serializers in the current scope
Thanks a lot in advance.
Hi,
I stumbled upon the race conditions in a app using this library.
As far as I know, TypeInformation
class is okay to be used as singleton, whereas TypeSerializer
isn't. JavaDoc of TypeSerializer
class reads:
The methods in this class are not necessarily thread safe. To avoid unpredictable side effects, it is recommended to call
duplicate()
method and use one serializer instance per thread.
However, the TypeInformation
classes in the library just pass TypeSerializer
instance, resulting the instance to be used in multiple threads:
Therefore, this can lead to data inconsistency when used with thread-unsafe TypeSerializer such as CaseClassSerializer
(It has mutable variable used during deserialization.)
This example code shows that the data inconsistency could happen when run in multicore environment:
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper}
import org.apache.flinkx.api.serializers.*
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import scala.collection.parallel.CollectionConverters.given
import scala.collection.parallel.ParSeq
object ExampleApp:
case class TestClass(a: String, b: String)
val typeInfo: TypeInformation[TestClass] = deriveTypeInformation
def randomString: String = scala.util.Random.alphanumeric.take(100).mkString
@main
def main(): Unit =
// Make field 'a' and 'b' always have the same value
val values: ParSeq[TestClass] = (1 to 1000).map {_ =>
val s = randomString
TestClass(s, s)
}.par
values.foreach { value =>
val serializer = typeInfo.createSerializer(null)
val out = new ByteArrayOutputStream()
val outView = new DataOutputViewStreamWrapper(out)
serializer.serialize(value, outView)
val bytes = out.toByteArray
val in = new ByteArrayInputStream(bytes)
val inView = new DataInputViewStreamWrapper(in)
val value2 = serializer.deserialize(inView)
// The assertion fails randomly
assert(value2.a == value2.b)
}
Looking at the createSerializer()
method implementation of POJOTypeInfo in flink-core, it creates a new instance of TypeSerializer
.
In the above example, if the TypeSerializer instantiation is modified like this:
val serializer = typeInfo.createSerializer(null).duplicate()
the data inconsistency does not happen anymore.
I'll submit a pull request to fix the issue. Feel free to ask if you have question about the issue or the fix.
Current version of flink-scala-api
seems to not support all featuires of Java's Async I/O API. Implementing unorderedWaitWithRetry
would allow to use retry support in asyncronous operations.
As far I can see only unorderedWait
and orderedWait
are currently implemented.
I think you guys forget mentioning from where actually it was forked.
This project is a community-maintained fork of findify/flink-scala-api cross-built for scala 2.12, 2.13 and 3.x.
After upgrading this library from 1.1.4 to 1.1.5 we've noticed that savepoints made while using older version became incompatible with the job containing newer version.
After digging in, I've traced the cause to the PR #98. Within this PR the serialized format for the case classes was changed to include arity
in the blob. Now the deserializer expects arity
as the first 4 bytes, however in the pre-1.1.5 format those first 4 bytes contained some field(s) payload. This results in incorrect deserialization.
Are breaking changes to be expected within patch level releases of this library?
To be honest, I'm not really sure how to get out of this. Since we're dealing with streamed input, naive try { newFormat } catch { oldFormat }
won't do the trick, at least at the level the issue was introduced in.
Push initial version from https://github.com/findify/flink-scala-api
I've encountered an issue when restoring a job from a savepoint, this happens when I need to make some changes to the already existing job, the procedure is as follows:
We're running Flink cluster in Kubernetes in session mode.
2024-02-29 09:20:39
java.lang.RuntimeException: Error while getting state
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:203)
at com.vinted.search.readmodel.translations.TranslationBuilder.open(TranslationBuilder.scala:18)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:101)
at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:46)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer (org.apache.flinkx.api.serializer.ScalaCaseClassSerializer@ad4d1cde) must not be incompatible with the old state serializer (org.apache.flinkx.api.serializer.ScalaCaseClassSerializer@60d01b87).
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:247)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createOrUpdateInternalState(HeapKeyedStateBackend.java:326)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createOrUpdateInternalState(HeapKeyedStateBackend.java:313)
at org.apache.flink.runtime.state.KeyedStateFactory.createOrUpdateInternalState(KeyedStateFactory.java:47)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:362)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:413)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 15 more
The classes I'm using as a state:
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flinkx.api.serializers._
case class TranslationKey(
id: Long,
entry_type: Long,
entry_id: Long
)
object TranslationKey {
implicit val typeInformation: TypeInformation[TranslationKey] = deriveTypeInformation
}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flinkx.api.serializers._
case class TranslationValue(
id: Long,
translation_key_id: Long,
locale: String,
value: String
)
object TranslationValue {
implicit val typeInformation: TypeInformation[TranslationValue] = deriveTypeInformation
}
And the operator declares uses the state as follows, I've omitted implementation details because they don't matter:
import org.apache.flink.api.common.state._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
import org.apache.flink.util.Collector
import org.apache.flinkx.api._
import org.apache.flinkx.api.serializers._
class TranslationBuilder extends RichCoFlatMapFunction[TranslationKey, TranslationValue, Translation] {
private var translationKeyState: ValueState[TranslationKey] = _
private var translationValuesState: MapState[Long, TranslationValue] = _
override def open(parameters: Configuration): Unit = {
translationKeyState = getRuntimeContext.getState(new ValueStateDescriptor("keys", TranslationKey.typeInformation))
translationValuesState = getRuntimeContext.getMapState(new MapStateDescriptor("values", longInfo, TranslationValue.typeInformation))
}
override def flatMap1(value: TranslationKey, out: Collector[Translation]): Unit = ???
override def flatMap2(value: TranslationValue, out: Collector[Translation]): Unit = ???
}
Is there something I'm doing wrong? This works with deprecated scala API. The exception also says that the issue might be with heap state, which I'm using in production. If switching to RoksDB would solve the issue, I'd be glad to make the switch.
Configure GH Actions to run existing steps for all supported Flink versions (see release.sh)
I have been using your library for many months without any issues, and thanks for making this project available, however I am encountering a weird issue with a MatchError on a None that seems to be a library issues - maybe. I have tried to debug my Flink job but I cannot see that there should be any issue with the application code as this does not show up in my unit nor integration tests, it seems to only happen on substantial load on the production data. I can recreate the error both locally and in my Flink cluster when running on the production data coming from Kafka. I am using Flink's KeyedCoProcessFunction
to do data enrichment and I am getting the following error:
[info] [2024-04-08 10:06:42,187] WARN Co-Keyed-Process (1/1)#0 (5d5cf71827a90860134cddc3077e286b_e95aaff440415b0a80c0a4fa5a9ff133_0_0) switched from RUNNING to FAILED with failure cause: (org.apache.flink.runtime.taskmanager.Task)
[info] scala.MatchError: None (of class scala.None$)
[info] at org.apache.flinkx.api.serializer.OptionSerializer.serialize(OptionSerializer.scala:68)
[info] at org.apache.flinkx.api.serializer.OptionSerializer.serialize(OptionSerializer.scala:63)
[info] at org.apache.flinkx.api.serializer.CaseClassSerializer.serialize(CaseClassSerializer.scala:104)
[info] at org.apache.flinkx.api.serializer.CaseClassSerializer.serialize(CaseClassSerializer.scala:99)
[info] at org.apache.flinkx.api.serializer.ListSerializer.serialize$$anonfun$1(ListSerializer.scala:20)
[info] at scala.runtime.function.JProcedure1.apply(JProcedure1.java:15)
[info] at scala.runtime.function.JProcedure1.apply(JProcedure1.java:10)
[info] at scala.collection.immutable.List.foreach(List.scala:333)
[info] at org.apache.flinkx.api.serializer.ListSerializer.serialize(ListSerializer.scala:20)
[info] at org.apache.flinkx.api.serializer.ListSerializer.serialize(ListSerializer.scala:18)
[info] at org.apache.flinkx.api.serializer.OptionSerializer.serialize(OptionSerializer.scala:66)
[info] at org.apache.flinkx.api.serializer.OptionSerializer.serialize(OptionSerializer.scala:63)
[info] at org.apache.flinkx.api.serializer.CaseClassSerializer.serialize(CaseClassSerializer.scala:104)
[info] at org.apache.flinkx.api.serializer.CaseClassSerializer.serialize(CaseClassSerializer.scala:99)
[info] at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:173)
[info] at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:44)
[info] at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
[info] at org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:152)
[info] at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:108)
[info] at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
[info] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:140)
[info] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collectAndCheckIfChained(RecordWriterOutput.java:120)
[info] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:101)
[info] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:53)
[info] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:60)
[info] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:32)
[info] at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:52)
[info] at org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78)
[info] at org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor1$1(RecordProcessorUtils.java:88)
[info] at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:254)
[info] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
[info] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
[info] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
[info] at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
[info] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579)
[info] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
[info] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
[info] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
[info] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
[info] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
[info] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
[info] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
[info] at java.base/java.lang.Thread.run(Thread.java:840)
[info] [2024-04-08 10:06:42,256] WARN Failed to trigger or complete checkpoint 1 for job 926262f3df68336e0973bba535007918. (0 consecutive failed attempts so far) (org.apache.flink.runtime.checkpoint.CheckpointFailureManager)
[info] org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending.
[info] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:2056)
[info] at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46)
[info] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1580)
[info] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1146)
[info] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1118)
[info] at org.apache.flink.runtime.scheduler.SchedulerBase.transitionExecutionGraphState(SchedulerBase.java:600)
[info] at org.apache.flink.runtime.scheduler.DefaultScheduler.addVerticesToRestartPending(DefaultScheduler.java:382)
[info] at org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:358)
[info] at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:326)
[info] at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268)
[info] at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261)
[info] at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787)
[info] at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764)
[info] at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
[info] at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
[info] at jdk.internal.reflect.GeneratedMethodAccessor20.invoke(Unknown Source)
[info] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[info] at java.base/java.lang.reflect.Method.invoke(Method.java:568)
[info] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
[info] at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
[info] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
[info] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
[info] at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
[info] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
[info] at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
[info] at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
[info] at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
[info] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
[info] at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
[info] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
[info] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
[info] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
[info] at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
[info] at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
[info] at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
[info] at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
[info] at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
[info] at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
[info] at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
[info] at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
[info] at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
[info] at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
[info] at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
[info] at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
[info] at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
I am using the following library versions:
scala
-> 3.4.1
flink-scala-api
-> 1.18.1_1.1.4
flink
-> 1.19.0
(tried to downgrade to 1.18.1 but same things happens)Any ideas what could cause this?
Hey, I've got this (simplified) class and encountered an issue with BigDecMapper not being serializable.
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flinkx.api.serializers._
case class Item(
id: Long,
price: BigDecimal,
currency: String,
)
object Item {
implicit lazy val typeInformation: TypeInformation[Item] = deriveTypeInformation
}
This compiles well, but when I execute my test suite, this is being thrown:
org.apache.flink.api.common.InvalidProgramException: The implementation of the CaseClassSerializer is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:170)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2360)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1997)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1980)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1959)
at org.apache.flinkx.api.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:618)
at com.vinted.search.commons.test.fixtures.FlinkTest$TestSourceExtensions.fromList(FlinkTest.scala:84)
...
Cause: java.io.NotSerializableException: org.apache.flinkx.api.mapper.BigDecMapper
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1543)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1500)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1543)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1500)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1369)
I tried marking typeInformation as @transient so that flink would not send it over the wire, but that did not help, how could I solve this?
Thank you ๐
Configure Github Actions to compile and test flink-scala-api for different JDK and Scala versions.
Hi. Got "Head of empty list" in simple test:
master...TheDIM47:flink-scala-api:head-of-empty-list-demo
head of empty list
java.util.NoSuchElementException: head of empty list
at scala.collection.immutable.Nil$.head(List.scala:662)
at scala.collection.immutable.Nil$.head(List.scala:661)
at scala.collection.StrictOptimizedLinearSeqOps$$anon$1.next(LinearSeq.scala:267)
at scala.collection.IterableOnceOps.addString(IterableOnce.scala:1249)
at scala.collection.IterableOnceOps.addString$(IterableOnce.scala:1241)
at scala.collection.AbstractIterable.addString(Iterable.scala:933)
at scala.collection.IterableOnceOps.mkString(IterableOnce.scala:1191)
at scala.collection.IterableOnceOps.mkString$(IterableOnce.scala:1189)
at scala.collection.AbstractIterable.mkString(Iterable.scala:933)
at scala.collection.Iterable.toString(Iterable.scala:78)
at scala.collection.Iterable.toString$(Iterable.scala:78)
at scala.collection.Seq.toString(Seq.scala:43)
at scala.collection.Seq.toString$(Seq.scala:43)
at scala.collection.AbstractSeq.toString(Seq.scala:1189)
at java.base/java.lang.String.valueOf(String.java:2951)
at java.base/java.lang.StringBuilder.append(StringBuilder.java:172)
at scala.collection.IterableOnceOps.addString(IterableOnce.scala:1249)
at scala.collection.IterableOnceOps.addString$(IterableOnce.scala:1241)
at scala.collection.AbstractIterator.addString(Iterator.scala:1300)
at scala.collection.IterableOnceOps.mkString(IterableOnce.scala:1191)
at scala.collection.IterableOnceOps.mkString$(IterableOnce.scala:1189)
at scala.collection.AbstractIterator.mkString(Iterator.scala:1300)
at scala.runtime.ScalaRunTime$._toString(ScalaRunTime.scala:156)
at org.apache.flink.api.ProcessFunctionTest$DataPackage.toString(ProcessFunctionTest.scala:28)
at java.base/java.lang.String.valueOf(String.java:2951)
at java.base/java.lang.StringBuilder.append(StringBuilder.java:172)
at scala.collection.IterableOnceOps.addString(IterableOnce.scala:1246)
at scala.collection.IterableOnceOps.addString$(IterableOnce.scala:1241)
at scala.collection.AbstractIterable.addString(Iterable.scala:933)
at scala.collection.IterableOnceOps.mkString(IterableOnce.scala:1191)
at scala.collection.IterableOnceOps.mkString$(IterableOnce.scala:1189)
at scala.collection.AbstractIterable.mkString(Iterable.scala:933)
at scala.collection.Iterable.toString(Iterable.scala:78)
at scala.collection.Iterable.toString$(Iterable.scala:78)
at scala.collection.Seq.toString(Seq.scala:43)
at scala.collection.Seq.toString$(Seq.scala:43)
at scala.collection.AbstractSeq.toString(Seq.scala:1189)
at org.scalactic.ColCompatHelper$.className(ColCompatHelper.scala:35)
at org.scalactic.Prettifier$$anon$2.apply(Prettifier.scala:228)
at org.scalatest.FailureMessages$.wasNotEqualTo(FailureMessages.scala:907)
at org.scalatest.matchers.should.Matchers$AnyShouldWrapper.$anonfun$shouldBe$67(Matchers.scala:7021)
at org.scalatest.matchers.MatchersHelper$.indicateFailure(MatchersHelper.scala:391)
at org.scalatest.matchers.should.Matchers$AnyShouldWrapper.shouldBe(Matchers.scala:7021)
at org.apache.flink.api.ProcessFunctionTest.$anonfun$new$1(ProcessFunctionTest.scala:22)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.flatspec.AnyFlatSpecLike$$anon$5.apply(AnyFlatSpecLike.scala:1812)
at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
at org.scalatest.flatspec.AnyFlatSpec.withFixture(AnyFlatSpec.scala:1685)
at org.scalatest.flatspec.AnyFlatSpecLike.invokeWithFixture$1(AnyFlatSpecLike.scala:1810)
at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$runTest$1(AnyFlatSpecLike.scala:1822)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.flatspec.AnyFlatSpecLike.runTest(AnyFlatSpecLike.scala:1822)
at org.scalatest.flatspec.AnyFlatSpecLike.runTest$(AnyFlatSpecLike.scala:1804)
at org.scalatest.flatspec.AnyFlatSpec.runTest(AnyFlatSpec.scala:1685)
at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$runTests$1(AnyFlatSpecLike.scala:1880)
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
at scala.collection.immutable.List.foreach(List.scala:333)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:390)
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:427)
at scala.collection.immutable.List.foreach(List.scala:333)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
at org.scalatest.flatspec.AnyFlatSpecLike.runTests(AnyFlatSpecLike.scala:1880)
at org.scalatest.flatspec.AnyFlatSpecLike.runTests$(AnyFlatSpecLike.scala:1879)
at org.scalatest.flatspec.AnyFlatSpec.runTests(AnyFlatSpec.scala:1685)
at org.scalatest.Suite.run(Suite.scala:1114)
at org.scalatest.Suite.run$(Suite.scala:1096)
at org.scalatest.flatspec.AnyFlatSpec.org$scalatest$flatspec$AnyFlatSpecLike$$super$run(AnyFlatSpec.scala:1685)
at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$run$1(AnyFlatSpecLike.scala:1925)
at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
at org.scalatest.flatspec.AnyFlatSpecLike.run(AnyFlatSpecLike.scala:1925)
at org.scalatest.flatspec.AnyFlatSpecLike.run$(AnyFlatSpecLike.scala:1923)
at org.scalatest.flatspec.AnyFlatSpec.run(AnyFlatSpec.scala:1685)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)
at scala.collection.immutable.List.foreach(List.scala:333)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)
at org.scalatest.tools.Runner$.run(Runner.scala:798)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:43)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:26)
It makes sense to combine flink-adt with scala-api to get one artefact for our users
There is a minor, but quite weird compile time issue on a user side. It happens when the user imports Java classes from the standard Java library like "java.*".
For example this does not compile due to "package not found" error:
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream}
the user must prefix it with "root" prefix to skip relative package path import process. This compiles:
import _root_.java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream}
Since we decided to put this library into the same package as official Apache Flink does, we clash with existing Apache Flink's package which is called: "org.apache.flink.api.java".
Below code is what we ask a user to to put in his code to use this library:
import org.apache.flink.api._
This import makes "org.apache.flink.api.java" package automatically imported in the same source file as just "java".
In the result such import of non-Flink Java class "java.io.ByteArrayInputStream" cannot be resolved, because it automatically resolved as "org.apache.flink.api.java.io.ByteArrayInputStream" which is incorrect, because it does not exist. If the user adds prefix "_ root _" to the imported java class then it works fine.
Use different package name for this library than official Flink using. For example:
// option 1
package org.apache.flinkx.api
// or option 2
package org.apache.flinkextended.api
// or option 3
package org.flinkextended.api
Please let me know you thoughts.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.