Git Product home page Git Product logo

streamz's Introduction

Streamz

Gitter Build Status

Streamz provides combinator libraries for integrating Functional Streams for Scala (FS2), Akka Streams and Apache Camel endpoints. They integrate

  • Apache Camel with Akka Streams: Camel endpoints can be integrated into Akka Stream applications with the Camel DSL for Akka Streams.
  • Apache Camel with FS2: Camel endpoints can be integrated into FS2 applications with the Camel DSL for FS2.
  • Akka Streams with FS2: Akka Stream Sources, Flows and Sinks can be converted to FS2 Streams, Pipes and Sinks, respectively, and vice versa with Stream converters.

Streamz intro

Dependencies

Streamz artifacts are available for Scala 2.12 and 2.13 at:

resolvers += Resolver.bintrayRepo("streamz", "maven")
val streamzVersion = "0.13"

libraryDependencies ++= Seq(
  "com.github.krasserm" %% "streamz-camel-akka" % streamzVersion,
  "com.github.krasserm" %% "streamz-camel-fs2" % streamzVersion,
  "com.github.krasserm" %% "streamz-converter" % streamzVersion,
)
Streamz version fs2 version
(not yet) 3.0.x
0.13 2.x
0.10-M2 1.x
0.9.1 0.10.x
0.8.1 0.9.x

Latest milestone release for FS2 1.0.x

val streamzVersion = "0.10-M2"

Latest stable release for FS2 0.10.x

val streamzVersion = "0.9.1"

Latest stable release for FS2 0.9.x

val streamzVersion = "0.8.1"

Documentation

Streamz 0.13

Streamz 0.10-M2

Streamz 0.9.1

Streamz 0.8.1

API docs

Latest version

Older versions

Streamz 0.10-M2

Not published. Run sbt unidoc on tag 0.10-M2 for generating 0.10 API docs.

Streamz 0.9.1

Not published. Run sbt unidoc on branch r-0.9 for generating 0.9 API docs.

Streamz 0.8.1

Not published. Run sbt unidoc on branch r-0.8 for generating 0.8 API docs.

External examples

streamz's People

Contributors

ahjohannessen avatar chunjef avatar cquiroz avatar daenyth avatar danicheg avatar guersam avatar justwrote avatar krasserm avatar kubukoz avatar milanvdm avatar notxcain avatar sarahgerweck avatar volkerstampa avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

streamz's Issues

Apache Camel DSL for FS2

Completely rewrite the Camel integration for FS2 (use Apache Camel directly and not akka-camel)

[0.9] IllegalStateException "onNext must not be called after onError" in ActorPublisher

Kind of useless stack trace;

onNext must not be called after onError
ActorPublisher.scala  199 akka.stream.actor.ActorPublisher.onNext(...)
ActorPublisher.scala:199:in `akka.stream.actor.ActorPublisher.onNext'
ActorPublisher.scala  190 akka.stream.actor.ActorPublisher.onNext$(...)
ActorPublisher.scala:190:in `akka.stream.actor.ActorPublisher.onNext$'
AkkaStreamPublisher.scala  34 streamz.converter.AkkaStreamPublisher.onNext(...)
AkkaStreamPublisher.scala:34:in `streamz.converter.AkkaStreamPublisher.onNext'
AkkaStreamPublisher.scala  48 streamz.converter.AkkaStreamPublisher$$anonfun$receive$1.applyOrElse(...)
AkkaStreamPublisher.scala:48:in `streamz.converter.AkkaStreamPublisher$$anonfun$receive$1.applyOrElse'
Actor.scala  517 akka.actor.Actor.aroundReceive(...)
Actor.scala:517:in `akka.actor.Actor.aroundReceive'
Actor.scala  515 akka.actor.Actor.aroundReceive$(...)
Actor.scala:515:in `akka.actor.Actor.aroundReceive$'
AkkaStreamPublisher.scala  34 streamz.converter.AkkaStreamPublisher.akka$stream$actor$ActorPublisher$$super$aroundReceive(...)
AkkaStreamPublisher.scala:34:in `streamz.converter.AkkaStreamPublisher.akka$stream$actor$ActorPublisher$$super$aroundReceive'
ActorPublisher.scala  330 akka.stream.actor.ActorPublisher.aroundReceive(...)
ActorPublisher.scala:330:in `akka.stream.actor.ActorPublisher.aroundReceive'
ActorPublisher.scala  273 akka.stream.actor.ActorPublisher.aroundReceive$(...)
ActorPublisher.scala:273:in `akka.stream.actor.ActorPublisher.aroundReceive$'
AkkaStreamPublisher.scala  34 streamz.converter.AkkaStreamPublisher.aroundReceive(...)
AkkaStreamPublisher.scala:34:in `streamz.converter.AkkaStreamPublisher.aroundReceive'
ActorCell.scala  588 akka.actor.ActorCell.receiveMessage(...)
ActorCell.scala:588:in `akka.actor.ActorCell.receiveMessage'
ActorCell.scala  557 akka.actor.ActorCell.invoke(...)
ActorCell.scala:557:in `akka.actor.ActorCell.invoke'
Mailbox.scala  258 akka.dispatch.Mailbox.processMailbox(...)
Mailbox.scala:258:in `akka.dispatch.Mailbox.processMailbox'
Mailbox.scala  225 akka.dispatch.Mailbox.run(...)
Mailbox.scala:225:in `akka.dispatch.Mailbox.run'
Mailbox.scala  235 akka.dispatch.Mailbox.exec(...)
Mailbox.scala:235:in `akka.dispatch.Mailbox.exec'
ForkJoinTask.java  260 akka.dispatch.forkjoin.ForkJoinTask.doExec(...)
ForkJoinTask.java:260:in `akka.dispatch.forkjoin.ForkJoinTask.doExec'
ForkJoinPool.java  1339 akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(...)
ForkJoinPool.java:1339:in `akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask'
ForkJoinPool.java  1979 akka.dispatch.forkjoin.ForkJoinPool.runWorker(...)
ForkJoinPool.java:1979:in `akka.dispatch.forkjoin.ForkJoinPool.runWorker'
ForkJoinWorkerThread.java  107 akka.dispatch.forkjoin.ForkJoinWorkerThread.run(...)
ForkJoinWorkerThread.java:107:in `akka.dispatch.forkjoin.ForkJoinWorkerThread.run'

This is probably fixed by accident in #43 but I wanted to record it for anyone else on 0.9

Unable to import library

[error]   not found: http://repo1.maven.org/maven2/com/github/krasserm/streamz-converter_2.12/0.10-M2/streamz-converter_2.12-0.10-M2.pom
[error]   not found: http://repo.addisonglobal.net/repository/maven-central/com/github/krasserm/streamz-converter_2.12/0.10-M2/streamz-converter_2.12-0.10-M2.pom
[error]   not found: http://dl.bintray.com/dnvriend/maven/com/github/krasserm/streamz-converter_2.12/0.10-M2/streamz-converter_2.12-0.10-M2.pom

[error]   not found: https://dl.bintray.com/emersonloureiro/sbt-plugins/com.github.krasserm/streamz-converter_2.12/0.10-M2/ivys/ivy.xml
[error]   not found: https://dl.bintray.com/evolutiongaming/maven/com/github/krasserm/streamz-converter_2.12/0.10-M2/streamz-converter_2.12-0.10-M2.pom
[error]   not found: http://repo.typesafe.com/typesafe/ivy-releases/com.github.krasserm/streamz-converter_2.12/0.10-M2/ivys/ivy.xml
[error]   not found: http://repo.scala-sbt.org/scalasbt/sbt-plugin-releases/com.github.krasserm/streamz-converter_2.12/0.10-M2/ivys/ivy.xml
[error]   unauthorized: https://jitpack.io/com/github/krasserm/streamz-converter_2.12/0.10-M2/streamz-converter_2.12-0.10-M2.pom (JitPack)
[error] (bnkTransactionView / ssExtractDependencies) sbt.librarymanagement.ResolveException: Error downloading com.github.krasserm:streamz-converter_2.12:0.10-M2

Resolver added according the Readme

resolvers += Resolver.url(
  "krasserm at bintray",
  url("http://dl.bintray.com/krasserm/maven")
)(Resolver.ivyStylePatterns)

Conversion from `fs2.Stream` to Akka `Source` Requires `ContextShift[IO]`

Hello!

The following code does not compile with the implicits and imports specified in the documentation/examples:

    AkkaSource .fromGraph(
            stream (: fs2.Stream[IO, Int])
            .toSource
      )

with the error:

value toSource is not a member of fs2.Stream[cats.effect.IO, Int]

However, this code:

  implicit val cs = IO.contextShift(ec)
    AkkaSource .fromGraph(
            stream (: fs2.Stream[IO, Int])
            .toSource
      )

compiles as expected. It would be great to add the requirement for a ContextShift[IO] to the documentation!

Update to latest scalaz-stream and akka-streams

I am not sure what the long term prospects for this project is. I am using Akka and Camel quite extensively in production and I think this project could be very valuable. I have forked the project to try and apply the update to the above libraries, but there are a number of API changes that makes it more than just a trivial change-over. I would be happy to assist, if I can get some guidance while familiarizing myself with the code.
Regards
Ian

How to use jms component in application ack mode?

Hello. Is there a way to use jms component in application ack mode? In akka-camel there is a def autoAck: Boolean method. It would be nice for example if I can use Sink.actorRefWithAck which send ack message to endpoint too. Thanks.

Request for fs2 Support

scalaz-stream's redesign to fs2 will be ready soon, and it would be fantastic to support it in this library. Particularly for me, the akka-persistence module.

What do you think?

`null` in `receiveBody` from AWS-KINESIS

I have a relatively simple test harness that, among other things, tries to send some JSON through Kinesis. It does this by providing a TypeConverter for Circe's Json type based on Java serialization. A property-based test of the TypeConverter passes, and sending a message succeeds, but receiveBodying returns a null body, although the headers are fine and the Camel logs show the base64-encoded body before decoding and conversion.

Also, I found it necessary to retain the SHARD_ID returned by sending and pass that through the receiveBody URL, otherwise the AWS-KINESIS component appears to sit and poll the wrong shard repeatedly. In the general case, I won't know what shard to request, and would expect receiveBodying to return any messages from all shards.

So on one hand, I may not understand Kinesis or the AWS-KINESIS component. On the other, I don't see how the receiveBody result can possibly be null.

You can run my test suite with it:test in a valid Docker environment. It relies on testcontainers-localstack for testing against AWS APIs.

Any advice would be welcome!

Thanks!
data-playground.zip

Application level ACK of in-only message exchanges

In addition to replying to consumed in-out message exchanges also support delayed ack of consumed in-only message exchanges e.g. by extending the DSL with an .ack element. Implementation of this features should also cover #32.

Stream errors (see also difference between errors and failures) should be translatable to negative acknowledgements (exceptions or faults) on individual message exchanges while stream failures should fail all message exchanges that are currently being processed.

Sbt dependencies don't work

As you mentioned in the README.md there are several dependencies available. Seems they are not published. Only streamz.jar itself seems to be published, but it doesn't contain anything.

Generalize fs2<->akka compat to F: Effect instead of IO

I'd like to use streamz-converter with any Effect instead of just IO.

I glanced at the code and haven't spotted anywhere that this would be difficult, though I could have missed it of course.

If this change is acceptable I might submit a PR for it.

timeout overrides configured greater delay value

After some debugging I found out that specifying the timeout (https://github.com/krasserm/streamz/blob/master/streamz-camel-akka/src/main/scala/streamz/camel/akka/EndpointConsumer.scala#L52) will override the config delay (http://camel.apache.org/file2.html). This only happens if the delay value is greater than your 500ms timeout. Since we need a greater value it would be nice to have it removed at all and use the receive(uri) method (delay seems to have the same effect) or at least make it possible to change it. If you wish I'm happy to create a PR.

Intended streamz-camel-akka dependency on FS2, Cats-Effect & Cats-Core?

When I look at the discussion in akka/alpakka#74, I get the impression that streamz-camel-akka was intended to have no dependencies except Akka Streams and Apache Camel. However, when I look at the published POM for version 0.9, it declares that this module depends on streamz-camel-context, and both streams-camel-akka and streamz-camel-context depend on FS2 0.10.1.

This dependency on FS2 then triggers dependencies on cats-effect, cats-core, cats-kernel, cats-macros and machinist.

Is it intended that all these dependencies are required just to use the Camel-to-Akka bridge that's advertised in Alpakka? If not, you probably need to either modify your build so streams-camel-akka doesn't depend on streamz-camel-context or modify streamz-camel-context so it doesn't declare any of these dependencies.

This is more than an academic concern for me: I have other dependencies on these libraries and can't justify the diamond dependencies for this bridge, which otherwise seems quite suitable for my use case.

Unable to import library

[error]   not found: http://repo1.maven.org/maven2/com/github/krasserm/streamz-converter_2.12/0.10-M2/streamz-converter_2.12-0.10-M2.pom
[error]   not found: http://repo.addisonglobal.net/repository/maven-central/com/github/krasserm/streamz-converter_2.12/0.10-M2/streamz-converter_2.12-0.10-M2.pom
[error]   not found: http://dl.bintray.com/dnvriend/maven/com/github/krasserm/streamz-converter_2.12/0.10-M2/streamz-converter_2.12-0.10-M2.pom

[error]   not found: https://dl.bintray.com/emersonloureiro/sbt-plugins/com.github.krasserm/streamz-converter_2.12/0.10-M2/ivys/ivy.xml
[error]   not found: https://dl.bintray.com/evolutiongaming/maven/com/github/krasserm/streamz-converter_2.12/0.10-M2/streamz-converter_2.12-0.10-M2.pom
[error]   not found: http://repo.typesafe.com/typesafe/ivy-releases/com.github.krasserm/streamz-converter_2.12/0.10-M2/ivys/ivy.xml
[error]   not found: http://repo.scala-sbt.org/scalasbt/sbt-plugin-releases/com.github.krasserm/streamz-converter_2.12/0.10-M2/ivys/ivy.xml
[error]   unauthorized: https://jitpack.io/com/github/krasserm/streamz-converter_2.12/0.10-M2/streamz-converter_2.12-0.10-M2.pom (JitPack)
[error] (bnkTransactionView / ssExtractDependencies) sbt.librarymanagement.ResolveException: Error downloading com.github.krasserm:streamz-converter_2.12:0.10-M2

Resolver added according the Readme

resolvers += Resolver.url(
  "krasserm at bintray",
  url("http://dl.bintray.com/krasserm/maven")
)(Resolver.ivyStylePatterns)

Bug in producing to aws-kinesis?

I don't really know if this is a streamz issue; my apologies for the error if not.

I'm attaching a small project that uses test-containers and LocalStack to try to do some very preliminary experimentation with streamz-camel-fs2. Specifically, it just tries to send a String to a Kinesis stream, being careful to first create the stream in beforeAll. This fails in the guts of the KinesisProducer with a NullPointerException that strongly suggests the PARTITION_KEY header is not set on the message. And that might be true--I certainly don't do anything to set it. But neither does the documentation suggest I need to, or tell me how to.

To run the test, you'll need to have a good Docker environment running, and in the sbt shell, do it:test.

So again, my apologies for some vagueness here. But I'm happy to help with whatever the path forward is, whether it's documentation, chasing down a bug, or what have you.

data.tar.gz

Creating race conditions when converting akka streams to fs2 streams is too easy

The problem:
Akka streams have materialized values, but the default .toStream() or .toSink() discard them. I've created race conditions by converting sinks with a materialized Future result and not waiting on that future.

Ad-hoc solution:
I created this:

implicit class RichAkkaSink[A, B](val sink: AkkaSink[A, Future[B]])
        extends AnyVal {

      /** Converts an akka sink with a success-status-indicating Future[B]
        * materialized result into an fs2 stream which will fail if the Future fails.
        * The stream returned by this will emit the Future's value one time at the end,
        * then terminate.
        */
      def toSinkWithStatusMat(
          implicit ec: ExecutionContext,
          m: Materializer
      ): Pipe[IO, A, B] =
        Fs2AkkaCompat.toSinkWithStatusMat(sink)

      /** The same as toSinkWithStatusMat, but ignoring the materialized value */
      def toSinkWithStatusMat_(
          implicit ec: ExecutionContext,
          m: Materializer
      ): Sink[IO, A] =
        in => in.through(Fs2AkkaCompat.toSinkWithStatusMat(sink)).void
    }

  /** Converts an akka sink with a success-status-indicating Future[B]
    * materialized result into an fs2 stream which will fail if the Future fails.
    * The stream returned by this will emit the Future's value one time at the end,
    * then terminate.
    */
  def toSinkWithStatusMat[A, B](
      akkaSink: AkkaSink[A, Future[B]]
  )(
      implicit ec: ExecutionContext,
      m: Materializer
  ): Pipe[IO, A, B] = {
    val mkPromise = Promise.empty[IO, Either[Throwable, B]]
    // `Sink` is just a function of Stream[F, A] => Stream[F, Unit], so we take a stream as input.
    in =>
      Stream.eval(mkPromise).flatMap { p =>
        // Akka streams produce a materialized value as a side effect of being run.
        // streamz-converters allows us to have a `Future[Done] => Unit` callback when that materialized value is created.
        // This callback tells the akka materialized future to store its result status into the Promise
        val captureMaterializedResult: Future[B] => Unit = _.onComplete {
          case Failure(ex)    => p.complete(Left(ex)).unsafeRunSync
          case Success(value) => p.complete(Right(value)).unsafeRunSync
        }
        // toSink is from streamz-converters; convert an akka sink to fs2 sink with a callback for the materialized values
        val fs2Sink: Sink[IO, A] = akkaSink.toSink(captureMaterializedResult)

        val fs2Stream: Stream[IO, Unit] = fs2Sink.apply(in)
        val materializedResultStream: Stream[IO, B] = Stream.eval {
          p.get // Async wait on the promise to be completed; => IO[Either[Throwable, B]]
          .rethrow // IO[Either[Throwable, B]] => IO[B]
        }
        // Run the akka sink for its effects and then run stream containing the effect of getting the Promise results
        fs2Stream.drain ++ materializedResultStream
      }
  }

The problem with my approach is that the user is still required to realize they need the alternate converter method.

A more comprehensive solution:
I'd like to make a breaking API change before we make the 1.x release.

// currently
def toSink(onMaterialization: M => Unit = _ => ())

Proposed:

package streamz.converter {
  // For convenience on wildcard imports
  val Discard: M => Unit = _ => ()
}

def toSink(onMaterialization: M => Unit)
def toSink()(implicit ev: M =:= Akka.NotUsed)

This guarantees that a caller must either provide an explicit onMaterialization, for which Discard will be wildcard imported for convenience, or if they don't want to pass one, that M must be NotUsed.

I haven't tested this yet, but I believe the general approach should work.

If you're willing to take this, I'll send a PR implementing it

Unable to import library

[error]   not found: http://repo1.maven.org/maven2/com/github/krasserm/streamz-converter_2.12/0.10-M2/streamz-converter_2.12-0.10-M2.pom
[error]   not found: http://repo.addisonglobal.net/repository/maven-central/com/github/krasserm/streamz-converter_2.12/0.10-M2/streamz-converter_2.12-0.10-M2.pom
[error]   not found: http://dl.bintray.com/dnvriend/maven/com/github/krasserm/streamz-converter_2.12/0.10-M2/streamz-converter_2.12-0.10-M2.pom

[error]   not found: https://dl.bintray.com/emersonloureiro/sbt-plugins/com.github.krasserm/streamz-converter_2.12/0.10-M2/ivys/ivy.xml
[error]   not found: https://dl.bintray.com/evolutiongaming/maven/com/github/krasserm/streamz-converter_2.12/0.10-M2/streamz-converter_2.12-0.10-M2.pom
[error]   not found: http://repo.typesafe.com/typesafe/ivy-releases/com.github.krasserm/streamz-converter_2.12/0.10-M2/ivys/ivy.xml
[error]   not found: http://repo.scala-sbt.org/scalasbt/sbt-plugin-releases/com.github.krasserm/streamz-converter_2.12/0.10-M2/ivys/ivy.xml
[error]   unauthorized: https://jitpack.io/com/github/krasserm/streamz-converter_2.12/0.10-M2/streamz-converter_2.12-0.10-M2.pom (JitPack)
[error] (bnkTransactionView / ssExtractDependencies) sbt.librarymanagement.ResolveException: Error downloading com.github.krasserm:streamz-converter_2.12:0.10-M2

Resolver added according the Readme

resolvers += Resolver.url(
  "krasserm at bintray",
  url("http://dl.bintray.com/krasserm/maven")
)(Resolver.ivyStylePatterns)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.