Git Product home page Git Product logo

azure-event-hubs-spark's Introduction

Azure Event Hubs + Apache Spark Connector

Azure Event Hubs Connector for Apache Spark

chat on gitter build status star our repo

This is the source code of the Azure Event Hubs Connector for Apache Spark.

Azure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream them into multiple applications. Spark Streaming and Structured Streaming are scalable and fault-tolerant stream processing engines that allow users to process huge amounts of data using complex algorithms expressed with high-level functions like map, reduce, join, and window. This data can then be pushed to filesystems, databases, or even back to Event Hubs.

By making Event Hubs and Spark easier to use together, we hope this connector makes building scalable, fault-tolerant applications easier for our users.

Latest Releases

Spark

Spark Version Package Name Package Version
Spark 3.0 azure-eventhubs-spark_2.12 Maven Central
Spark 2.4 azure-eventhubs-spark_2.11 Maven Central
Spark 2.4 azure-eventhubs-spark_2.12 Maven Central

Databricks

Databricks Runtime Version Artifact Id Package Version
Databricks Runtime 8.X azure-eventhubs-spark_2.12 Maven Central
Databricks Runtime 7.X azure-eventhubs-spark_2.12 Maven Central
Databricks Runtime 6.X azure-eventhubs-spark_2.11 Maven Central

Roadmap

There is an open issue for each planned feature/enhancement.

FAQ

We maintain an FAQ - reach out to us via gitter if you think anything needs to be added or clarified!

Usage

Linking

For Scala/Java applications using SBT/Maven project definitions, link your application with the artifact below. Note: See Latest Releases to find the correct artifact for your version of Apache Spark (or Databricks)!

groupId = com.microsoft.azure
artifactId = azure-eventhubs-spark_2.11
version = 2.3.22

or

groupId = com.microsoft.azure
artifactId = azure-eventhubs-spark_2.12
version = 2.3.22

Documentation

Documentation for our connector can be found here. The integration guides there contain all the information you need to use this library.

If you're new to Apache Spark and/or Event Hubs, then we highly recommend reading their documentation first. You can read Event Hubs documentation here, documentation for Spark Streaming here, and, the last but not least, Structured Streaming here.

Further Assistance

If you need additional assistance, please don't hesitate to ask! General questions and discussion should happen on our gitter chat. Please open an issue for bug reports and feature requests! Feedback, feature requests, bug reports, etc are all welcomed!

Contributing

If you'd like to help contribute (we'd love to have your help!), then go to our Contributor's Guide for more information.

Build Prerequisites

In order to use the connector, you need to have:

More details on building from source and running tests can be found in our Contributor's Guide.

Build Command

// Builds jar and runs all tests
mvn clean package

// Builds jar, runs all tests, and installs jar to your local maven repository
mvn clean install

azure-event-hubs-spark's People

Contributors

alexott avatar arerlend avatar arijitt avatar basilhariri avatar codingcat avatar duhuan avatar fokko avatar gison93 avatar jaceklaskowski avatar jamesbirdsall avatar jgiardin avatar lenadroid avatar lucarosellini avatar microsoft-github-policy-service[bot] avatar myasuka avatar nyaghma avatar ppatierno avatar qaemma avatar romitgirdhar avatar sabeegrewal avatar sardinois avatar shanyu avatar sjkwak avatar slyons avatar spacerangerwes avatar sreeramgarlapati avatar tilumi avatar vjrantal avatar xaviergeerinck avatar yamin-msft avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

azure-event-hubs-spark's Issues

fix the filtering semantics which is inconsistent with EventHubs

After talking with @SreeramGarlapati, we found that the eventhubs filtering semantics is actually "exact search"

e.g. if you pass in a value for filtering.enqueueTime as 4000, and there is no message with an enqueueTime as 4000, the behavior is undefined (through our observation, it will be empty list in "most of cases"), even the events are enqueued at 4100, 4200, etc.

Spark 2.0 compatible version

Hi, I was wondering, have you found time to test this with the new Spark 2.0 version?
I could test it out and make a pull request for any changes that are required to make this work on the new version :-)

Get offset from stream

Hi All,

Is there any way to get the offset of partition from stream for self fault-tolerance implementation?

Backpressure support

Currently when connecting to EventHubs it appears the source appears to grab every message available for the partition. It would be useful to limit the amount of incoming messages processed at a time, which would lead to backpressure support as is supported by other streaming sources such as Kafka.

reuse receiver

we should be able to reuse in receiver to maximumly utilize the prefetched data

Question Regarding CreateDirectStream.

Hi When I create a DirectStream Using CreateDirectStream.
Running the job with --num-executors 18 --executor-memory 512MB --conf spark.executor.cores=5

with below properties.
Am able to pull the data but the not at every 2 secs.
If am getting some 5000 records in a batch it is waiting for that 5000 records to process and start the next job. Mean while there are some 10,000 to 20,000 records getting stacked up and the next job is taking much longer time.
Is there any way I can increase the performance. or Optimize the job in such a way that it will do parallel processing.
Am unable to set the --conf spark.streaming.concurrentJobs=4 when pulling the data from event hub.

val eventhubnamespace = "flightawarespark"
val progressdir = "/Event/DirectStream/"
val eventhubname_d = "flightaware"
val ehParams = Map[String, String](
  "eventhubs.policyname" -> "Test_listen",
  "eventhubs.policykey" -> "PolicyKey",
  "eventhubs.namespace" -> "namespace",
  "eventhubs.name" -> "name",
  "eventhubs.partition.count" -> "10",
  "eventhubs.consumergroup" -> "$default",
  "eventhubs.checkpoint.dir" -> "/EventCheckpoint_0.1",
  "eventhubs.checkpoint.interval" -> "2"
)
println("testing spark")
val conf = new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")//.setMaster("local[4]").setAppName("Eventhubs_Test")
conf.registerKryoClasses(Array(classOf[PublishToTopic]))
conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
val sc= new SparkContext(conf)
val hiveContext = new HiveContext(sc)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val pool:ExecutorService=Executors.newFixedThreadPool(10)
val ssc = new StreamingContext(sc, Seconds(2))
var dataString :RDD[String] =sc.emptyRDD
val stream=EventHubsUtils.createDirectStreams(ssc,eventhubnamespace,progressdir,Map(eventhubname_d -> ehParams))

Thank you for your help.

Thanks,
Ankush Reddy.

Offset check-pointing issues and recommendations.

Hi there, I am from Azure Event Hubs team. One of our customers has reported an issue with their spark adapter where EH receiver kept initialized at the same offset again and again for couple of hours. After reviewing the code I have noticed some issues I listed below that you can easily address.

  1. When while(!stopMesasgeHandler) is triggered to exit, you are closing the offsetStore w/o check-pointing last event processed. Consider check-pointing the last event processed in the finally block.
  2. Instead if check-pointing on the elapsed time consider check-pointing the last message of the received batch. Current implementation is prone to missing to check-point on non-UTC systems during DST adjust.
  3. Consider check-pointing the last message processed if it throws during processing the batch of messages.

Code not reaching after the line val stream=EventHubsUtils.createUnionStream(ssc, ehParams)

Hi,

I have followed the steps provided in the link https://azure.microsoft.com/en-us/documentation/articles/hdinsight-apache-spark-eventhub-streaming/#comment-2693663776. I have written a sample eventhub receiver program as given below,

package com.onerm.spark

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.eventhubs.EventHubsUtils
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark._
import org.apache.spark.sql.hive.HiveContext
import java.util.concurrent.{Executors, ExecutorService}

object HiveEvents {

def b2s(a: Array[Byte]): String = new String(a)

def main(args: Array[String]): Unit = {

val ehParams = MapString, String

val conf = new SparkConf().setAppName("Eventhubs Onerm")
val sc= new SparkContext(conf)
val hiveContext = new HiveContext(sc)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val pool:ExecutorService=Executors.newFixedThreadPool(5)
val ssc = new StreamingContext(sc, Seconds(120))
var dataString :RDD[String] =sc.emptyRDD

val stream=EventHubsUtils.createUnionStream(ssc, ehParams)
//lines below are not getting executed until I stop the execution
stream.print()
stream.foreachRDD {

rdd =>
if(rdd.isEmpty())
{
println("RDD IS EMPTY ")
}
else
{
dataString=rdd.map(line=>b2s(line))
println("COUNT" +dataString.count())
sqlContext.read.json(dataString).registerTempTable("jsoneventdata")
val filterData=sqlContext.sql("SELECT id,ClientProperties.PID,ClientProperties.Program,ClientProperties.Platform,ClientProperties.Version,ClientProperties.HWType,ClientProperties.OffVer,ContentID,Data,Locale,MappedSources,MarketingMessageContext.ActivityInstanceID,MarketingMessageContext.CampaignID,MarketingMessageContext.SegmentName,MarketingMessageContext.OneRMInstanceID,MarketingMessageContext.DateTimeSegmented,Source,Timestamp.Date,Timestamp.Epoch,TransactionID,UserAction,EventProcessedUtcTime,PartitionId,EventEnqueuedUtcTime from jsoneventdata")
filterData.show(10)
filterData.saveAsParquetFile("EventCheckpoint_0.1/ParquetEvent")
} }
ssc.start()
ssc.awaitTermination()
}}

The issue is that , the code after the line val stream=EventHubsUtils.createUnionStream(ssc, ehParams) is not reachable and is getting executed only when I am manually stopping the program execution by performing Ctrl+C.

After analyzing the logs of my spark-submit, I could observe that, when I am calling EventHubsUtils.createUnionStream(ssc, ehParams) its internally calling the EventHubReceiver class. The EventHubReceiver class has the EventHubsMessageHandler() class which internally has the below code which is getting looped continuously until the stopMessageHandler is set to false and I guess this value is getting set to true when I am stopping the program execution by doing ctrl+c.

Hence then only the statements after the line "EventHubsUtils.createUnionStream(ssc, ehParams)" are getting executed. Please correct me if I am wrong. Thank you!!!

while (!stopMessageHandler)
{
val message = receiverClient.receive()
if (message != null && message.getSequence > latestSequence) {
latestSequence = message.getSequence
processReceivedMessage(message)
}

val now = System.currentTimeMillis()
if(now > nextTime) {
if(offsetToSave != savedOffset) {
logInfo("writing offset to store: " + offsetToSave + ", partition: " + partitionId)
myOffsetStore.write(offsetToSave)
savedOffset = offsetToSave
nextTime = now + checkpointInterval
}
}
}

2 direct steams at the same time gives ERROR "detected lost partitions"

I have a driver program that consumes from eventhubs via 2.0.4-SNAPSHOT.

two different hubs in the same namespace with different policies via EventHubsUtils.createDirectStreams test good independently.

but when started in the same driver program with the same ssc give

[JobGenerator] ERROR o.a.s.s.e.EventHubDirectDStream - detected lost partitions List(camera-partition-0, camera-partition-1, camera-partition-2, camera-partition-3)

Which ever stream I start first will show up in the partition error. If I comment out one, the other works fine.

Should multiple eventhubs be usable via a single ssc?

ps: building 2.0.4 for the new isLocal check on the progress_dir hdfs/adl enforcement and hoping to use wasbs.

Unsupported major.minor version 52.0

The current version of spark-eventhubs is only compatible with Java 1.8. On a hdinsight cluster deployed with Spark 1.6 the package "com.microsoft.azure:spark-streaming-eventhubs_2.10:1.6.0" is not compatible due to the version of Java used to compile.

How to restrict the createunionstream.

Hi am trying to pull data from event hub.

we are having 10 partitions in the eventhub. Am using createUnionStream to pull the events.
As am using createUnionStream to save as files. It is not writing into the file until a shutdownhook is called.
I need to stream the data or get the union of the streams for every 10 to 20 seconds instead of waiting for calling the shutdownhook.

Could you suggest if am missing anything or if we have that functionality implemented.

package com.microsoft.spark.streaming.examples.workloads
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.eventhubs.EventHubsUtils
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark._
import org.apache.spark.sql.hive.HiveContext
import java.util.concurrent.{ExecutorService, Executors}
import com.microsoft.spark.streaming.examples.arguments.EventhubsArgumentKeys
import com.microsoft.spark.streaming.examples.workloads.EventhubsToHiveTable.createStreamingContext
import org.apache.spark.sql.SparkSession
object HiveEventsTest2 {
def b2s(a: Array[Byte]): String = new String(a)
def createStreamingContext(): StreamingContext = {
val ehParams = Map[String, String](
"eventhubs.policyname" -> "Test_listen",
"eventhubs.policykey" -> "",
"eventhubs.namespace" -> "",
"eventhubs.name" -> "",
"eventhubs.partition.count" -> "10",
"eventhubs.consumergroup" -> "$default",
"eventhubs.checkpoint.dir" -> "/EventCheckpoint_0.1",
"eventhubs.checkpoint.interval" -> "20"
)
val sparkConfiguration : SparkConf = EventHubsUtils.initializeSparkStreamingConfigurations
sparkConfiguration.setAppName("Eventhubs Onerm")
sparkConfiguration.set("spark.streaming.driver.writeAheadLog.allowBatching", "true")
sparkConfiguration.set("spark.streaming.driver.writeAheadLog.batchingTimeout", "60000")
sparkConfiguration.set("spark.streaming.receiver.writeAheadLog.enable", "true")
sparkConfiguration.set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true")
sparkConfiguration.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true")
sparkConfiguration.set("spark.streaming.stopGracefullyOnShutdown", "true")
val sparkSession : SparkSession = SparkSession.builder.config(sparkConfiguration).enableHiveSupport.getOrCreate
val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(5))
ssc.checkpoint("/EventCheckpoint_0.1")
var dataString :RDD[String] =sparkSession.sparkContext.emptyRDD
val stream=EventHubsUtils.createUnionStream(ssc, ehParams)
//lines below are not getting executed until I stop the execution**
stream.foreachRDD {
rdd =>
dataString=rdd.map(line=>b2s(line)) print("@@@@@@@@@@@@@@@@@@@@@@@@############################################@$#$@############################@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") print("@@@@@@@@@@@@@@@@@@@@@@@@############################################@$#$@############################@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
println("COUNT" +dataString.count()) print("@@@@@@@@@@@@@@@@@@@@@@@@############################################@$#$@############################@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") print("@@@@@@@@@@@@@@@@@@@@@@@@############################################@$#$@############################@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
//dataString.saveAsTextFile("/EventCheckpoint_0.1/TextFile")
}
ssc
}
def main(inputArguments: Array[String]): Unit = {
val streamingContext = StreamingContext.getOrCreate("/EventCheckpoint_0.1",() => createStreamingContext())
sys.ShutdownHookThread {
println("Gracefully stopping Spark Streaming Application")
println("*****************************************************########################################################################################################################")
streamingContext.stop(true, true)
println("Application stopped")
}
streamingContext.start()
streamingContext.awaitTermination()
}
}

If we see the code from this line //lines below are not getting executed until I stop the execution rest of the code is not getting executed until I stop the script.
Sorry for the confusion.

Thank You for your help.

Thanks,
Ankush Reddy

Support pluggable progress store

Similar to the 1.0 version, it would be very useful to support an external progress store. It could be specified using a builder pattern, i.e:

EventHubsUtils.direct.builder()
  .context(streamingContext)
  .namespace(namespace)
  .progressStore(foo)
  .hubConfig(Map(config.hubConfig.name -> eventHubsParameters))
  .create()

This would more align with the SparkSession approach of building.

It could also be specified through one of the EventHub config parameters, but that limits flexibility.

Either way, a standard interface would allow for a pluggable source of resiliency.

eventhubs.filter.enqueuetime documentation has incorrect unit

Based on what I am seeing in my program's behavior, it looks like the unit for eventhubs.filter.enqueuetime should be seconds, not milliseconds.

The docs are in these two files:

"eventhubs.filter.enqueuetime": Unix time, millisecond since epoch, default to "0"

https://github.com/hdinsight/spark-eventhubs/blob/ee7dcfe65843d624064308ce1da5be86cfdd3fa6/core/src/main/scala/org/apache/spark/eventhubscommon/Implicits.scala

https://github.com/hdinsight/spark-eventhubs/blob/170a90c562eb932c2a7afaa969fdbccd194f082a/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubsUtils.scala

Upgrading form 2.0.5->2.1.1 fails to load previous checkpoints

When upgrading from release 2.0.5 to 2.1.1 I get the following exception:

java.lang.IllegalStateException: detect corrupt progress tracking file at <time> <namespace> 0 <ehub> 9 2808934696544 9791652 it might be a bug in the implementation of underlying file system

Is there a better way to transition?

This might be another case to support external tracking stores so that schema evolution can happen if necessary.

java.util.NoSuchElementException when using enqueueTime

Some tasks are failing when using enqueueTime:

17/07/05 19:34:14 WARN TaskSetManager: Lost task 10.0 in stage 0.0 (TID 10, 10.0.0.9): java.util.NoSuchElementException
        at scala.collection.LinearSeqOptimized$class.last(LinearSeqOptimized.scala:148)
        at scala.collection.immutable.List.last(List.scala:84)
        at org.apache.spark.eventhubscommon.rdd.EventHubsRDD.compute(EventHubsRDD.scala:119)

Using driver version 2.1.1

upgrade Azure EventHub client to 0.13.1

the new version fixed a bug so that a null return value from receiver would determinedly indicate there is no more data in server side

it is necessary to implement epoch time filtering

Can't get spark-streaming-eventhubs_2.11 to work with Jupyter in HDInsight 3.6 (Spark 2.1)

I can launch spark-shell with

spark-shell --packages com.microsoft.azure:spark-streaming-eventhubs_2.11:2.0.5

which works fine, but when I use the same package in Jupyter with

%%configure -f
{ "conf": {"spark.jars.packages": "com.microsoft.azure:spark-streaming-eventhubs_2.11:2.0.5" }}

It always fails with the cryptic error:

The code failed because of a fatal error:
Session 31 unexpectedly reached final status 'dead'. See logs:

Or sometimes with this one:

The code failed because of a fatal error: Status 'shutting_down' not supported by session..

Also interesting that streaming-eventhubs_2.10 does work in Jupyter

NPE when receiving events

java.lang.NullPointerException
        at scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:54)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at org.apache.spark.streaming.eventhubs.EventHubsReceiver$EventHubsMessageHandler.run(EventHubsReceiver.scala:134)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

It would appear that the asScala conversion method does not check if the value to be wrapped is null, so the receivedEvents val at line 128 in EventHubsClientWrapper.scala contains an instance of a scala.lang.Iterable wrapper implementation whose underlying java.lang.Iterable instance is null.

Using Spark 2.0.0, Scala 2.11 and streaming-eventhubs 2.0.0.

Optimize the Connection to EventHubs

We might reuse client in eventhubs client to save TCP connection cost....

a proposed pattern from @SreeramGarlapati

"if you can implement a layer which can make sure - 1 EHClient instance is being used only by 1 Receiver - at any given point of time - you could achieve both performance & be less error prone"

"keep the old EHclient alive and after you close the receiver- without any errors - release this to the pool"

"If you see any errors with closing the receiver- close the EHClient (discard it).."

Scala 2.11 version

Hi, I'm planning on using this software in a Spark 2.0 project that is based on Scala 2.11.
Sadly the project currently doesn't have a 2.11 version. Would it be possible to build a 2.11 version also?

Publish to Maven Central

This project should be published to Maven Central so that users do not get an unpleasant surprise when an older version is unexpectedly removed from the "private" repo.

Exception in thread "main"

I am getting the following error message when I try to use the eventhubs.EventHubsUtils. I am using a local instance of Spark to develop against and cannot get my stream to run against Event Hubs.

any help would be greatly appreciated.

Exception in thread "main" java.lang.NoClassDefFoundError: com/microsoft/eventhubs/client/IEventHubFilter
at org.apache.spark.streaming.eventhubs.EventHubsUtils$.createStream$default$6(EventHubsUtils.scala:80)
at org.apache.spark.streaming.eventhubs.EventHubsUtils$$anonfun$1.apply(EventHubsUtils.scala:58)
at org.apache.spark.streaming.eventhubs.EventHubsUtils$$anonfun$1.apply(EventHubsUtils.scala:58)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.streaming.eventhubs.EventHubsUtils$.createUnionStream(EventHubsUtils.scala:57)
at main.scala.com.datuh.spark.ConnectToEH$.createContext(ConnectToEH.scala:33)
at main.scala.com.datuh.spark.ConnectToEH$$anonfun$1.apply(ConnectToEH.scala:69)
at main.scala.com.datuh.spark.ConnectToEH$$anonfun$1.apply(ConnectToEH.scala:68)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:633)
at main.scala.com.datuh.spark.ConnectToEH$.main(ConnectToEH.scala:67)
at main.scala.com.datuh.spark.ConnectToEH.main(ConnectToEH.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.ClassNotFoundException: com.microsoft.eventhubs.client.IEventHubFilter
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 21 more

receiver-based connection undefined behavior

In the implementation of receiver-based connection, there is a race condition when the receiver is restarted.

The current implementation restart a new receiver in a new thread (within Spark Streaming internal) and in the original MessageHandler thread, it will close the FileSystem

This is an undefined behavior according to the API doc of HDFS

https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#close--

Urgent fixes are needed

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.