Git Product home page Git Product logo

flink-scala-api's People

Contributors

buinauskas avatar ignasd avatar maggiesullivan avatar mzuehlke avatar novakov-alexey avatar scala-steward avatar shuttie avatar thedim47 avatar wojciechmazur avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

flink-scala-api's Issues

Describe schema evolution for case classes

In one of the recent release we added new functionality to allow case classes to evolve schemas. #80

This issue is a TODO to describe what case class changes are supported when Flink job restores its state with previous case class schema.

Runtime error when using a List in a case class

I'm trying to use List in my model, however when I do that I have runtime exception during my tests. It seems like I end up with a broken model, with a List that throws whatever I do with it. I'm getting a java.util.NoSuchElementException: head of empty list exception from inside the Scala standard lib, so it's neither empty nor containing valid values.

I created a repro as a new test in this branch: erwan@2d339dc

Is there anything I should do differently here? Or maybe that would be a bug in the library?

(migrated from findify/flink-adt#71 as the same issue is present in this new lib)

Mixed test runners

Tests are written either in JUnit or in ScalaTest, if there're no objections, I would like to migrate everything to ScalaTest.

Which Apache Flink versions will be supported

What Flink versions this library needs to support? I assume Scala users might want to use not only the latest Flink version, but also one or two previous versions. Any ideas what can we do to cover multiple versions or should we just stick to latest one and rely on non-breaking changes in Flink 1.x public Java API?

UUID type information support

There's no out-of-the-box UUID type information support and I'm happy to implement type information for it so that I and others could use it, the question is whether this is something this library intends to do or it's up to the user to deal with.

Thanks!

Case class evolution: ArrayIndexOutOfBoundsException

First of all, thank you for bringing us this library to rescue me of the schema/state migration of Flink.

I have a question about the version compatible with Flink. I notice the oldest version of this lib is 1.15.4_1.0.0, which means it needs at least Flink 1.15. But I tested it with the following:

ThisBuild / version := "0.0.1"
ThisBuild / scalaVersion := "2.12.17"

val flinkVersion = "1.13.2"

val flinkDependencies = Seq(
  //  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  //  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
  ("org.apache.flink" %% "flink-clients" % flinkVersion)
    .exclude("org.apache.flink", "flink-runtime_2.12")
    .exclude("org.apache.flink", "flink-streaming-java_2.12"),
  "org.flinkextended" %% "flink-scala-api" % "1.15.4_1.1.1"
)

it works fine with the simple case class schema evolution, like adding fields, or changing field types, although it is Flink 1.13.2 (Align with AWS), I would like to confirm if there are any potential issues with mixing this lib 1.15.4_1.0.0 with Flink 1.13.2? Currently we can do re-bootstrap the job in prod, but try to avoid re-bootstrap once is deployed fully in prod and using state migration with snapshot in AWS.

Also, is there any example of how to use this library for:
can be extended with custom serializers even for deeply-nested types, as it uses implicitly available serializers in the current scope

Thanks a lot in advance.

Concurrency issue in subclasses of TypeInformation

Hi,
I stumbled upon the race conditions in a app using this library.

Description

As far as I know, TypeInformation class is okay to be used as singleton, whereas TypeSerializer isn't. JavaDoc of TypeSerializer class reads:

The methods in this class are not necessarily thread safe. To avoid unpredictable side effects, it is recommended to call duplicate() method and use one serializer instance per thread.

However, the TypeInformation classes in the library just pass TypeSerializer instance, resulting the instance to be used in multiple threads:

override def createSerializer(config: ExecutionConfig): TypeSerializer[T] = ser

Therefore, this can lead to data inconsistency when used with thread-unsafe TypeSerializer such as CaseClassSerializer (It has mutable variable used during deserialization.)

Steps to reproduce

This example code shows that the data inconsistency could happen when run in multicore environment:

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper}
import org.apache.flinkx.api.serializers.*
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import scala.collection.parallel.CollectionConverters.given
import scala.collection.parallel.ParSeq


object ExampleApp:

  case class TestClass(a: String, b: String)

  val typeInfo: TypeInformation[TestClass] = deriveTypeInformation

  def randomString: String = scala.util.Random.alphanumeric.take(100).mkString

  @main
  def main(): Unit =

    // Make field 'a' and 'b' always have the same value
    val values: ParSeq[TestClass] = (1 to 1000).map {_ =>
      val s = randomString
      TestClass(s, s)
    }.par

    values.foreach { value =>
      val serializer = typeInfo.createSerializer(null)

      val out = new ByteArrayOutputStream()
      val outView = new DataOutputViewStreamWrapper(out)
      serializer.serialize(value, outView)
      val bytes = out.toByteArray

      val in = new ByteArrayInputStream(bytes)
      val inView = new DataInputViewStreamWrapper(in)
      val value2 = serializer.deserialize(inView)

      // The assertion fails randomly
      assert(value2.a == value2.b)
    }

Suggested fix

Looking at the createSerializer() method implementation of POJOTypeInfo in flink-core, it creates a new instance of TypeSerializer.

In the above example, if the TypeSerializer instantiation is modified like this:

val serializer = typeInfo.createSerializer(null).duplicate()

the data inconsistency does not happen anymore.

I'll submit a pull request to fix the issue. Feel free to ask if you have question about the issue or the fix.

Savepoint incompatibility between 1.1.4 and 1.1.5

After upgrading this library from 1.1.4 to 1.1.5 we've noticed that savepoints made while using older version became incompatible with the job containing newer version.

After digging in, I've traced the cause to the PR #98. Within this PR the serialized format for the case classes was changed to include arity in the blob. Now the deserializer expects arity as the first 4 bytes, however in the pre-1.1.5 format those first 4 bytes contained some field(s) payload. This results in incorrect deserialization.

Are breaking changes to be expected within patch level releases of this library?

To be honest, I'm not really sure how to get out of this. Since we're dealing with streamed input, naive try { newFormat } catch { oldFormat } won't do the trick, at least at the level the issue was introduced in.

Not being able to restore stateful operator

I've encountered an issue when restoring a job from a savepoint, this happens when I need to make some changes to the already existing job, the procedure is as follows:

  1. Stop the job and take a canonical savepoint
  2. Redeploy the job by providing the savepoint path
  3. Get the following exception

We're running Flink cluster in Kubernetes in session mode.

2024-02-29 09:20:39
java.lang.RuntimeException: Error while getting state
	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:203)
	at com.vinted.search.readmodel.translations.TranslationBuilder.open(TranslationBuilder.scala:18)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:101)
	at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:46)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer (org.apache.flinkx.api.serializer.ScalaCaseClassSerializer@ad4d1cde) must not be incompatible with the old state serializer (org.apache.flinkx.api.serializer.ScalaCaseClassSerializer@60d01b87).
	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:247)
	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createOrUpdateInternalState(HeapKeyedStateBackend.java:326)
	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createOrUpdateInternalState(HeapKeyedStateBackend.java:313)
	at org.apache.flink.runtime.state.KeyedStateFactory.createOrUpdateInternalState(KeyedStateFactory.java:47)
	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70)
	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:362)
	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:413)
	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
	... 15 more

The classes I'm using as a state:

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flinkx.api.serializers._

case class TranslationKey(
  id: Long,
  entry_type: Long,
  entry_id: Long
)

object TranslationKey {
  implicit val typeInformation: TypeInformation[TranslationKey] = deriveTypeInformation
}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flinkx.api.serializers._

case class TranslationValue(
  id: Long,
  translation_key_id: Long,
  locale: String,
  value: String
)

object TranslationValue {
  implicit val typeInformation: TypeInformation[TranslationValue] = deriveTypeInformation
}

And the operator declares uses the state as follows, I've omitted implementation details because they don't matter:

import org.apache.flink.api.common.state._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
import org.apache.flink.util.Collector
import org.apache.flinkx.api._
import org.apache.flinkx.api.serializers._

class TranslationBuilder extends RichCoFlatMapFunction[TranslationKey, TranslationValue, Translation] {
  private var translationKeyState: ValueState[TranslationKey] = _
  private var translationValuesState: MapState[Long, TranslationValue] = _

  override def open(parameters: Configuration): Unit = {
    translationKeyState = getRuntimeContext.getState(new ValueStateDescriptor("keys", TranslationKey.typeInformation))
    translationValuesState = getRuntimeContext.getMapState(new MapStateDescriptor("values", longInfo, TranslationValue.typeInformation))
  }

  override def flatMap1(value: TranslationKey, out: Collector[Translation]): Unit = ???

  override def flatMap2(value: TranslationValue, out: Collector[Translation]): Unit = ???
}

Is there something I'm doing wrong? This works with deprecated scala API. The exception also says that the issue might be with heap state, which I'm using in production. If switching to RoksDB would solve the issue, I'd be glad to make the switch.

Possible case class serialization/deserialization bug

I have been using your library for many months without any issues, and thanks for making this project available, however I am encountering a weird issue with a MatchError on a None that seems to be a library issues - maybe. I have tried to debug my Flink job but I cannot see that there should be any issue with the application code as this does not show up in my unit nor integration tests, it seems to only happen on substantial load on the production data. I can recreate the error both locally and in my Flink cluster when running on the production data coming from Kafka. I am using Flink's KeyedCoProcessFunction to do data enrichment and I am getting the following error:

[info] [2024-04-08 10:06:42,187] WARN Co-Keyed-Process (1/1)#0 (5d5cf71827a90860134cddc3077e286b_e95aaff440415b0a80c0a4fa5a9ff133_0_0) switched from RUNNING to FAILED with failure cause: (org.apache.flink.runtime.taskmanager.Task)
[info] scala.MatchError: None (of class scala.None$)
[info] 	at org.apache.flinkx.api.serializer.OptionSerializer.serialize(OptionSerializer.scala:68)
[info] 	at org.apache.flinkx.api.serializer.OptionSerializer.serialize(OptionSerializer.scala:63)
[info] 	at org.apache.flinkx.api.serializer.CaseClassSerializer.serialize(CaseClassSerializer.scala:104)
[info] 	at org.apache.flinkx.api.serializer.CaseClassSerializer.serialize(CaseClassSerializer.scala:99)
[info] 	at org.apache.flinkx.api.serializer.ListSerializer.serialize$$anonfun$1(ListSerializer.scala:20)
[info] 	at scala.runtime.function.JProcedure1.apply(JProcedure1.java:15)
[info] 	at scala.runtime.function.JProcedure1.apply(JProcedure1.java:10)
[info] 	at scala.collection.immutable.List.foreach(List.scala:333)
[info] 	at org.apache.flinkx.api.serializer.ListSerializer.serialize(ListSerializer.scala:20)
[info] 	at org.apache.flinkx.api.serializer.ListSerializer.serialize(ListSerializer.scala:18)
[info] 	at org.apache.flinkx.api.serializer.OptionSerializer.serialize(OptionSerializer.scala:66)
[info] 	at org.apache.flinkx.api.serializer.OptionSerializer.serialize(OptionSerializer.scala:63)
[info] 	at org.apache.flinkx.api.serializer.CaseClassSerializer.serialize(CaseClassSerializer.scala:104)
[info] 	at org.apache.flinkx.api.serializer.CaseClassSerializer.serialize(CaseClassSerializer.scala:99)
[info] 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:173)
[info] 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:44)
[info] 	at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
[info] 	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:152)
[info] 	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:108)
[info] 	at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
[info] 	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:140)
[info] 	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collectAndCheckIfChained(RecordWriterOutput.java:120)
[info] 	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:101)
[info] 	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:53)
[info] 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:60)
[info] 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:32)
[info] 	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:52)
[info] 	at org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78)
[info] 	at org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor1$1(RecordProcessorUtils.java:88)
[info] 	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:254)
[info] 	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
[info] 	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
[info] 	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
[info] 	at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
[info] 	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579)
[info] 	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
[info] 	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
[info] 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
[info] 	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
[info] 	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
[info] 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
[info] 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
[info] 	at java.base/java.lang.Thread.run(Thread.java:840)
[info] [2024-04-08 10:06:42,256] WARN Failed to trigger or complete checkpoint 1 for job 926262f3df68336e0973bba535007918. (0 consecutive failed attempts so far) (org.apache.flink.runtime.checkpoint.CheckpointFailureManager)
[info] org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending.
[info] 	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:2056)
[info] 	at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46)
[info] 	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1580)
[info] 	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1146)
[info] 	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1118)
[info] 	at org.apache.flink.runtime.scheduler.SchedulerBase.transitionExecutionGraphState(SchedulerBase.java:600)
[info] 	at org.apache.flink.runtime.scheduler.DefaultScheduler.addVerticesToRestartPending(DefaultScheduler.java:382)
[info] 	at org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:358)
[info] 	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:326)
[info] 	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268)
[info] 	at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261)
[info] 	at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787)
[info] 	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764)
[info] 	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
[info] 	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
[info] 	at jdk.internal.reflect.GeneratedMethodAccessor20.invoke(Unknown Source)
[info] 	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[info] 	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
[info] 	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
[info] 	at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
[info] 	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
[info] 	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
[info] 	at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
[info] 	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
[info] 	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
[info] 	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
[info] 	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
[info] 	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
[info] 	at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
[info] 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
[info] 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
[info] 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
[info] 	at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
[info] 	at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
[info] 	at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
[info] 	at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
[info] 	at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
[info] 	at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
[info] 	at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
[info] 	at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
[info] 	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
[info] 	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
[info] 	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
[info] 	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
[info] 	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)

I am using the following library versions:

  • scala -> 3.4.1
  • flink-scala-api -> 1.18.1_1.1.4
  • flink -> 1.19.0 (tried to downgrade to 1.18.1 but same things happens)

Any ideas what could cause this?

BigDecMaper not serializable

Hey, I've got this (simplified) class and encountered an issue with BigDecMapper not being serializable.

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flinkx.api.serializers._

case class Item(
  id: Long,
  price: BigDecimal,
  currency: String,
)

object Item {
  implicit lazy val typeInformation: TypeInformation[Item] = deriveTypeInformation
}

This compiles well, but when I execute my test suite, this is being thrown:

org.apache.flink.api.common.InvalidProgramException: The implementation of the CaseClassSerializer is not serializable. The object probably contains or references non serializable fields.
  at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:170)
  at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
  at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
  at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2360)
  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1997)
  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1980)
  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1959)
  at org.apache.flinkx.api.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:618)
  at com.vinted.search.commons.test.fixtures.FlinkTest$TestSourceExtensions.fromList(FlinkTest.scala:84)
  ...
  Cause: java.io.NotSerializableException: org.apache.flinkx.api.mapper.BigDecMapper
  at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
  at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1543)
  at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1500)
  at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
  at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
  at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1543)
  at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1500)
  at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
  at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
  at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1369)

I tried marking typeInformation as @transient so that flink would not send it over the wire, but that did not help, how could I solve this?

Thank you ๐Ÿ™

Configure GitHub CI build

Configure Github Actions to compile and test flink-scala-api for different JDK and Scala versions.

Head of empty list error in tests

Hi. Got "Head of empty list" in simple test:

master...TheDIM47:flink-scala-api:head-of-empty-list-demo

head of empty list
java.util.NoSuchElementException: head of empty list
	at scala.collection.immutable.Nil$.head(List.scala:662)
	at scala.collection.immutable.Nil$.head(List.scala:661)
	at scala.collection.StrictOptimizedLinearSeqOps$$anon$1.next(LinearSeq.scala:267)
	at scala.collection.IterableOnceOps.addString(IterableOnce.scala:1249)
	at scala.collection.IterableOnceOps.addString$(IterableOnce.scala:1241)
	at scala.collection.AbstractIterable.addString(Iterable.scala:933)
	at scala.collection.IterableOnceOps.mkString(IterableOnce.scala:1191)
	at scala.collection.IterableOnceOps.mkString$(IterableOnce.scala:1189)
	at scala.collection.AbstractIterable.mkString(Iterable.scala:933)
	at scala.collection.Iterable.toString(Iterable.scala:78)
	at scala.collection.Iterable.toString$(Iterable.scala:78)
	at scala.collection.Seq.toString(Seq.scala:43)
	at scala.collection.Seq.toString$(Seq.scala:43)
	at scala.collection.AbstractSeq.toString(Seq.scala:1189)
	at java.base/java.lang.String.valueOf(String.java:2951)
	at java.base/java.lang.StringBuilder.append(StringBuilder.java:172)
	at scala.collection.IterableOnceOps.addString(IterableOnce.scala:1249)
	at scala.collection.IterableOnceOps.addString$(IterableOnce.scala:1241)
	at scala.collection.AbstractIterator.addString(Iterator.scala:1300)
	at scala.collection.IterableOnceOps.mkString(IterableOnce.scala:1191)
	at scala.collection.IterableOnceOps.mkString$(IterableOnce.scala:1189)
	at scala.collection.AbstractIterator.mkString(Iterator.scala:1300)
	at scala.runtime.ScalaRunTime$._toString(ScalaRunTime.scala:156)
	at org.apache.flink.api.ProcessFunctionTest$DataPackage.toString(ProcessFunctionTest.scala:28)
	at java.base/java.lang.String.valueOf(String.java:2951)
	at java.base/java.lang.StringBuilder.append(StringBuilder.java:172)
	at scala.collection.IterableOnceOps.addString(IterableOnce.scala:1246)
	at scala.collection.IterableOnceOps.addString$(IterableOnce.scala:1241)
	at scala.collection.AbstractIterable.addString(Iterable.scala:933)
	at scala.collection.IterableOnceOps.mkString(IterableOnce.scala:1191)
	at scala.collection.IterableOnceOps.mkString$(IterableOnce.scala:1189)
	at scala.collection.AbstractIterable.mkString(Iterable.scala:933)
	at scala.collection.Iterable.toString(Iterable.scala:78)
	at scala.collection.Iterable.toString$(Iterable.scala:78)
	at scala.collection.Seq.toString(Seq.scala:43)
	at scala.collection.Seq.toString$(Seq.scala:43)
	at scala.collection.AbstractSeq.toString(Seq.scala:1189)
	at org.scalactic.ColCompatHelper$.className(ColCompatHelper.scala:35)
	at org.scalactic.Prettifier$$anon$2.apply(Prettifier.scala:228)
	at org.scalatest.FailureMessages$.wasNotEqualTo(FailureMessages.scala:907)
	at org.scalatest.matchers.should.Matchers$AnyShouldWrapper.$anonfun$shouldBe$67(Matchers.scala:7021)
	at org.scalatest.matchers.MatchersHelper$.indicateFailure(MatchersHelper.scala:391)
	at org.scalatest.matchers.should.Matchers$AnyShouldWrapper.shouldBe(Matchers.scala:7021)
	at org.apache.flink.api.ProcessFunctionTest.$anonfun$new$1(ProcessFunctionTest.scala:22)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.flatspec.AnyFlatSpecLike$$anon$5.apply(AnyFlatSpecLike.scala:1812)
	at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
	at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
	at org.scalatest.flatspec.AnyFlatSpec.withFixture(AnyFlatSpec.scala:1685)
	at org.scalatest.flatspec.AnyFlatSpecLike.invokeWithFixture$1(AnyFlatSpecLike.scala:1810)
	at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$runTest$1(AnyFlatSpecLike.scala:1822)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.flatspec.AnyFlatSpecLike.runTest(AnyFlatSpecLike.scala:1822)
	at org.scalatest.flatspec.AnyFlatSpecLike.runTest$(AnyFlatSpecLike.scala:1804)
	at org.scalatest.flatspec.AnyFlatSpec.runTest(AnyFlatSpec.scala:1685)
	at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$runTests$1(AnyFlatSpecLike.scala:1880)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
	at scala.collection.immutable.List.foreach(List.scala:333)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:390)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:427)
	at scala.collection.immutable.List.foreach(List.scala:333)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
	at org.scalatest.flatspec.AnyFlatSpecLike.runTests(AnyFlatSpecLike.scala:1880)
	at org.scalatest.flatspec.AnyFlatSpecLike.runTests$(AnyFlatSpecLike.scala:1879)
	at org.scalatest.flatspec.AnyFlatSpec.runTests(AnyFlatSpec.scala:1685)
	at org.scalatest.Suite.run(Suite.scala:1114)
	at org.scalatest.Suite.run$(Suite.scala:1096)
	at org.scalatest.flatspec.AnyFlatSpec.org$scalatest$flatspec$AnyFlatSpecLike$$super$run(AnyFlatSpec.scala:1685)
	at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$run$1(AnyFlatSpecLike.scala:1925)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
	at org.scalatest.flatspec.AnyFlatSpecLike.run(AnyFlatSpecLike.scala:1925)
	at org.scalatest.flatspec.AnyFlatSpecLike.run$(AnyFlatSpecLike.scala:1923)
	at org.scalatest.flatspec.AnyFlatSpec.run(AnyFlatSpec.scala:1685)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)
	at scala.collection.immutable.List.foreach(List.scala:333)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)
	at org.scalatest.tools.Runner$.run(Runner.scala:798)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:43)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:26)

flink-scala-api package name clashes with Flink's java package

There is a minor, but quite weird compile time issue on a user side. It happens when the user imports Java classes from the standard Java library like "java.*".

For example this does not compile due to "package not found" error:

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream}

the user must prefix it with "root" prefix to skip relative package path import process. This compiles:

import _root_.java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream}

Why this issue happens?

Since we decided to put this library into the same package as official Apache Flink does, we clash with existing Apache Flink's package which is called: "org.apache.flink.api.java".

Below code is what we ask a user to to put in his code to use this library:

import org.apache.flink.api._

This import makes "org.apache.flink.api.java" package automatically imported in the same source file as just "java".
In the result such import of non-Flink Java class "java.io.ByteArrayInputStream" cannot be resolved, because it automatically resolved as "org.apache.flink.api.java.io.ByteArrayInputStream" which is incorrect, because it does not exist. If the user adds prefix "_ root _" to the imported java class then it works fine.

Solution

Use different package name for this library than official Flink using. For example:

// option 1
package org.apache.flinkx.api
// or  option 2
package org.apache.flinkextended.api
// or option 3
package org.flinkextended.api

Please let me know you thoughts.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.