Git Product home page Git Product logo

azure-event-hubs-java's Introduction

Microsoft Azure Event Hubs

Microsoft Azure Event Hubs Client for Java

Azure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream them into multiple applications. This lets you process and analyze the massive amounts of data produced by your connected devices and applications. Once Event Hubs has collected the data, you can retrieve, transform and store it by using any real-time analytics provider or with batching/storage adapters.

The Azure Events Hubs client library for .NET allows for both sending and receiving of events. Most common scenarios call for an application to act as either an event publisher or an event consumer, but rarely both.

An event publisher is a source of telemetry data, diagnostics information, usage logs, or other log data, as part of an embedded device solution, a mobile device application, a game title running on a console or other device, some client or server based business solution, or a web site.

An event consumer picks up such information from the Event Hub and processes it. Processing may involve aggregation, complex computation and filtering. Processing may also involve distribution or storage of the information in a raw or transformed fashion. Event Hub consumers are often robust and high-scale platform infrastructure parts with built-in analytics capabilities, like Azure Stream Analytics, Apache Spark, or Apache Storm.

We've moved!

The Microsoft Azure Event Hubs Client for Java has joined the unified Azure Developer Platform and can now be found in the Azure SDK for Java repository. To view the latest source, participate in the development process, report issues, or engage with the community, please visit our new home.

This repository has been archived and is intended to provide historical reference and context for the Microsoft Azure Event Hubs Client for Java.

Source code in the folders beginning with "microsoft-azure-" | Client JAR (Maven) | Event Processor Host JAR (Maven) | Product documentation

azure-event-hubs-java's People

Contributors

clemensv avatar clguiman avatar dciborow avatar dependabot[bot] avatar devigned avatar dlstucki avatar dmoebius avatar fokko avatar hmlam avatar jamesbirdsall avatar jlleitschuh avatar jtaubensee avatar microsoft-github-policy-service[bot] avatar mssfang avatar pgbhagat avatar sabeegrewal avatar scottmacwatters avatar serkantkaraca avatar shubhavijayasarathy avatar simplesteph avatar sjkwak avatar sjoerdsmink avatar sreeramgarlapati avatar stuartleeks avatar velocipedist avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

azure-event-hubs-java's Issues

Need way to enable tracing on a running process

Actual Behavior

  1. The only way to enable tracing for Event Processor Host, client, or Proton-J is by restarting the JVM.

Expected Behavior

  1. Not all customers may even have control of their JVM, and for those that do, restarting whatever long-running process with additional parameters to turn on tracing may be awkward at best. If there is an issue, the restart may also lose the repro.

Since we don't know anything about the customer's environment, not even the operating system, it is hard for EPH or the Java client to look for some external trigger to control tracing. The best way may be to provide an API which the customer's code can call to control tracing dynamically. The customer's code can then call the API in response to whatever external trigger makes the most sense for their environment and application.

Some Issues when ser/deser EventData

Actual Behavior

  1. Sample code: https://github.com/CodingCat/eventhubs-sample-event-producer/blob/master/src/main/scala/com/microsoft/azure/eventhubs/client/example/LoremSer2.scala

This program just serialize EventData and deserialize from the file, it seems that the part of the body data which is beyond 1024 turns to garbage data after deserialization

Expected Behavior

The expected behavior is to deserialize correctly.....

Versions

  • OS platform and version: Ubuntu/Linux
  • Maven package version or commit ID: maven 3,
  • Client Version: observed in 0.9 but I believe it exists in 0.10 0.11 as well

ServiceBusExceptions thrown corresponding to missing messages

Actual Behavior

  1. com.microsoft.azure.servicebus.ServiceBusException exceptions thrown at unpredictable times that correspond to times that clients reading from event hub skip over messages

Examples:
com.microsoft.azure.servicebus.ServiceBusException: The message container is being closed (823). TrackingId:1ade7936-4120-4e5c-a658-d2c3bb941b92_B23, SystemTracker:NoSystemTracker, Timestamp:3/2/2017 6:29:12 PM, errorContext[NS: XXX, PATH: XXX/Partitions/4, REFERENCE_ID: b9b87c_00cbc20143e1403a860cc926e2006001_G19, LAST_OFFSET: 2240952, PREFETCH_COUNT: 999, LINK_CREDIT: 992, PREFETCH_Q_LEN: 0, R_TYPE: EPOCH]
at com.microsoft.azure.servicebus.ExceptionUtil.toException(ExceptionUtil.java:93)
at com.microsoft.azure.servicebus.MessageReceiver.onError(MessageReceiver.java:393)
at com.microsoft.azure.servicebus.MessageReceiver.onClose(MessageReceiver.java:646)
at com.microsoft.azure.servicebus.amqp.BaseLinkHandler.processOnClose(BaseLinkHandler.java:83)
at com.microsoft.azure.servicebus.amqp.BaseLinkHandler.onLinkRemoteClose(BaseLinkHandler.java:52)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:309)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:276)
at com.microsoft.azure.servicebus.MessagingFactory$RunReactor.run(MessagingFactory.java:340)
at java.lang.Thread.run(Thread.java:745)

com.microsoft.azure.servicebus.ServiceBusException: com.microsoft.azure.servicebus.amqp.AmqpException: The link 'G25:32590172:65c6ee_4544660c07c8411e92e1c8a43f7a9ef3_G25' is force detached by the broker due to errors occurred in consumer(link221668). Detach origin: InnerMessageReceiver was closed. TrackingId:290bcb3500020337000361e458b82c36_G25_B23, SystemTracker:XXX~19662|XXX, Timestamp:3/2/2017 3:49:21 PM, errorContext[NS: XXX, PATH: XXX/Partitions/11, REFERENCE_ID: 65c6ee_4544660c07c8411e92e1c8a43f7a9ef3_G25, LAST_OFFSET: 279304, PREFETCH_COUNT: 999, LINK_CREDIT: 988, PREFETCH_Q_LEN: 0, R_TYPE: EPOCH]
at com.microsoft.azure.servicebus.ExceptionUtil.toException(ExceptionUtil.java:86)
at com.microsoft.azure.servicebus.MessageReceiver.onError(MessageReceiver.java:393)
at com.microsoft.azure.servicebus.MessageReceiver.onClose(MessageReceiver.java:646)
at com.microsoft.azure.servicebus.amqp.BaseLinkHandler.processOnClose(BaseLinkHandler.java:83)
at com.microsoft.azure.servicebus.amqp.BaseLinkHandler.onLinkRemoteClose(BaseLinkHandler.java:52)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:309)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:276)
at com.microsoft.azure.servicebus.MessagingFactory$RunReactor.run(MessagingFactory.java:340)
at java.lang.Thread.run(Thread.java:745)

Expected Behavior

  1. No exceptions are thrown, or
  2. Some way to recognize when the exception occurs so we don't skip over the messages and just restart our process or do some other clean up task

Versions

  • OS platform and version: Windows Server 2012 R2
  • Maven package version or commit ID: 0.9.0

Using Azure Event Hubs in Android Studio

Hi All,

I am new at building Android Applications. In my first application I would like to push data to an Azure event hub. I have read the tutorial here: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-java-ephcs-getstarted

In Android studio I added azure-eventhubs-0.10.0.jar and then added it as a library to the project. Unfortunately, when testing the app this leads to a build error:

Error:Error converting bytecode to dex:
Cause: Dex cannot parse version 52 byte code.
This is caused by library dependencies that have been compiled using Java 8 or above.
If you are using the 'java' gradle plugin in a library submodule add
targetCompatibility = '1.7'
sourceCompatibility = '1.7'
to that submodule's build.gradle file.

In response I added the lines:
targetCompatibility = '1.7'
sourceCompatibility = '1.7'

to build.gradle but the error remains. I think I am not correctly adding the azure event hub library to android studio. Can someone explain to me how to add maven repositories in android studio?

PartitionReceiver should expose Name/Path

From @hmlam on February 10, 2016 21:49

Given a group of receivers I don't see any way to get which receiver is for which partitions. Some identifier is needed.

Copied from original issue: Azure/azure-event-hubs#39

Use of these libraries on server side Java are danger

From @yoshioterada on April 26, 2016 14:41

In order to use these libraries on server side Java with more safety,
please add the support of JSR-236 of Concurrency utilities for Java EE ?
https://docs.oracle.com/javaee/7/api/javax/enterprise/concurrent/package-summary.html

Note :
Until, Java EE 6(not only EE but also simple Servlet environment)
it was not recommended to create the new Thread on server side.
Because the new created Thread is not managed by Java EE container
(both Servlet container and EJB container).
As a result, there is a possibility to occur some trouble.

Please refer to the following documents which I explained at JavaOne?
The new created Thread is not proxy object but simple Java Thread Object.
http://www.slideshare.net/OracleMiddleJP/javaone-2013-san-francisco-asynconcurrencyonee7

For example, following code is good for Java SE environment,
but it is not good on server side.
https://github.com/Azure/azure-event-hubs/blob/master/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/Timer.java


private static final ScheduledThreadPoolExecutor executor
= new ScheduledThreadPoolExecutor(Math.max(Runtime.getRuntime().availableProcessors(), 4));
executor.schedule(runnable, seconds, TimeUnit.SECONDS);

For example, we will implement like follows on Java EE 7 environment.


public class MyManagedScheduledExecutorService{
@resource(name = "concurrent/
DefaultManagedScheduledExecutorService")
ManagedScheduledExecutorService managedScheduledExecsvc;

public void execScheduledExecutorService() {
MyRunnableTask task = new MyRunnableTask();
managedScheduledExecsvc.schedule(
task, 60L, TimeUnit.SECONDS);
}


And I think that there is a problem on following code too.


https://github.com/Azure/azure-event-hubs/blob/master/java/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionSender.java
private CompletableFuture createInternalSender() throws ServiceBusException
{
return MessageSender.create(this.factory, StringUtil.getRandomString(),
String.format("%s/Partitions/%s", this.eventHubName, this.partitionId))
.thenAcceptAsync(new Consumer()
{
public void accept(MessageSender a) { PartitionSender.this.internalSender = a;}
});
}


From Java SE 8 API point of view,
you didn't pass the Executor to the "thenAcceptAsync" of CompletableFuture method.
As a result, it will be unmanaged(danger) code on Java EE environment.

I recommend you to use the following API.
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html
Java SE8 Env :
thenAcceptAsync(Consumer<? super T> action);
Java SE8/EE7 Env :
thenAcceptAsync(Consumer<? super T> action, Executor executor);

For example, it will be wrote like following code.
@resource(name = "concurrent/DefaultManagedExecutorService")
ManagedExecutorService managedExecsvc;

CompletableFuture tasks = CompletableFuture
.supplyAsync(execMyAsyncTask1(i), managedExecsvc)
.thenAccept(System.out::println, managedExecsvc);

I think that these libraries is very good for send/receive the data to Event Hub.
However I will not be able to use these libraries on Java server side environment.
It means that we can't use these lib on Java Web Application.

Copied from original issue: Azure/azure-event-hubs#124

ServiceBusException while creating client

From @Sshubb on September 15, 2016 12:28

Hi all,
I am using javaeventhublibrary version 0.8.2 latest version and i have written the following code for sending data:

ConnectionStringBuilder connStr = new ConnectionStringBuilder("connectionstring"); 
byte[] payloadBytes = valueChangeMessage.data.get().toString().getBytes("UTF-8");
EventData sendEvent = new EventData(payloadBytes);
EventHubClient ehClient = EventHubClient.createFromConnectionStringSync(connStr.toString());
ehClient.sendSync(sendEvent);

And after executing this code i am getting following error:

Sep 15, 2016 5:32:44 PM com.microsoft.azure.servicebus.MessagingFactory$RunReactor run
WARNING: UnHandled exception while processing events in reactor:
java.lang.NullPointerException
org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:112)
org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:309)
org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:276)
com.microsoft.azure.servicebus.MessagingFactory$RunReactor.run(MessagingFactory.java:332)
java.lang.Thread.run(Thread.java:745)Cause: null
org.apache.qpid.proton.engine.impl.SaslImpl.plain(SaslImpl.java:446)
com.microsoft.azure.servicebus.amqp.ConnectionHandler.onConnectionBound(ConnectionHandler.java:74)
org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:131)
org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:309)
org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:276)
com.microsoft.azure.servicebus.MessagingFactory$RunReactor.run(MessagingFactory.java:332)
java.lang.Thread.run(Thread.java:745)
Sep 15, 2016 5:32:44 PM com.cognizant.iotworkbench.Channel.CatChannel sendPOST
SEVERE: null
com.microsoft.azure.servicebus.ServiceBusException: java.lang.NullPointerException, TrackingId: fcf793be-47b9-48a1-ac05-fb036fe6f69e, at: 2016-09-15T17:32:44.917+05:30[Asia/Calcutta]
    at com.microsoft.azure.servicebus.MessagingFactory$RunReactor.run(MessagingFactory.java:357)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
    at org.apache.qpid.proton.engine.impl.SaslImpl.plain(SaslImpl.java:446)
    at com.microsoft.azure.servicebus.amqp.ConnectionHandler.onConnectionBound(ConnectionHandler.java:74)
    at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:131)
    at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:309)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:276)
    at com.microsoft.azure.servicebus.MessagingFactory$RunReactor.run(MessagingFactory.java:332)
    ... 1 more

Please let me know what can be the possible issue???

Copied from original issue: Azure/azure-event-hubs#263

Failing Appveyor build

Adding a ticket to track. I believe we need to configure Maven to run the tests in a synchronous fashion.

Validating connection string

Right now if we want to validate whether a connection string is valid or not (especially whether the sas token is valid), we need to actually create a receiver, specifying the consumer group, partition id, etc.

Is there an cleaner way to do the connection string validation?

This might be beyond the java library. Since I didn't find anything in c# as well. Let me know if I should bring this issue to the general repository.

Bug in PartitonContext.checkpoint(EventData event) method

Actual Behavior

The method PartitonContext.checkpoint(EventData event) calls the deprecated method "setOffsetAndSequenceNumber" which makes it impossible to save a checkpoint that is before "this.sequenceNumber"

Expected Behavior

Remove call to "setOffsetAndSequenceNumber" in "PartitonContext.checkpoint(EventData event)"

Versions

  • OS platform and version: Linux
  • Maven package version or commit ID: 0.10.0

Service bus exception, onTransportError when connecting to Event Hub

Hello

I'm trying to send a payload to a newly created Azure Event Hub, and I receive this error:

"Feb 07, 2017 10:27:19 AM com.microsoft.azure.servicebus.amqp.ConnectionHandler onTransportError
WARNING: Connection.onTransportError: hostname[myeventhub.servicebus.windows.net:5671], error[connection aborted]
Service bus exception: connection aborted"

(where myeventhub replaces the actual name of my event hub)

My method of setting up and sending to the hub is this:


final String namespaceName = "----ServiceBusNamespaceName-----";
final String eventHubName = "----EventHubName-----";
final String sasKeyName = "-----SharedAccessSignatureKeyName-----";
final String sasKey = "---SharedAccessSignatureKey----";
ConnectionStringBuilder connStr = new ConnectionStringBuilder(namespaceName, eventHubName, sasKeyName, sasKey);

try{
	byte[] payloadBytes = "Test message".getBytes("UTF-8");
	EventData sendEvent = new EventData(payloadBytes);
	EventHubClient ehClient = EventHubClient.createFromConnectionStringSync(connStr.toString());
}
catch(ServiceBusException sbe){
	traceln("Service bus exception: " + sbe.getMessage());
}
catch(UnsupportedEncodingException uee){
	traceln("Unsupported Encoding Exception: " + uee.getMessage());
}
catch(IOException ioe){
	traceln("IO Exception: " + ioe.getMessage());
}

I'm using:

  • Windows 7 Enterprise
  • Java client
  • proton-j-0.15.0.jar, azure.eventhubs-0.10.0.jar, bcp-jdk15on-156.jar

This seems to be similar to this issue: #7

My guess is that this may be a firewall thing, but I really want it not to be. I'm going to try from outside the firewall another day, but for the moment, I'd really appreciate someone's help on this, either to:

  • solve the issue
  • suggest ways of handling company firewalls

Many thanks

Supporting GetRuntimeInformationAsync in Java

Right now the java client library doesn't provide the information about count of partitions for a specific EventHub entity. For our use case, this is very important. We would like to have something similar to the GetRuntimeInformationAsync() in C#:

https://docs.microsoft.com/en-us/dotnet/api/microsoft.azure.eventhubs.eventhubclient#Microsoft_Azure_EventHubs_EventHubClient_GetRuntimeInformationAsync

Or let me know if there is other way to obtain the number of partitions in the java library.

Thanks,
Hai

Request to create Event Hub Client using existing Shared Access Signature token

From @jbrodeur on April 19, 2016 21:22

JAVA - Event Hub Client Example:

final String namespaceName = "----ServiceBusNamespaceName-----";
final String eventHubName = "----EventHubName-----";
final String sasToken = "-----SharedAccessSignature token-----"; // A previously issued Shared Access Signature token

ConnectionStringBuilder connStr = new ConnectionStringBuilder(namespaceName, eventHubName, sasToken);

EventHubClient ehClient = EventHubClient.createFromConnectionString(connStr.toString()).get();

Copied from original issue: Azure/azure-event-hubs#119

EPH crashes due to high memory usage by large number of threads

From @serkantkaraca on July 18, 2016 18:46

EPH client process memory usage hit almost 3GB and then process crashed due to not able to allocate more resources. Attached screenshot shows the memory footprint during the run.

<2790868.22> 07/18/2016 18:24:16 - Error: JAVA-Receiver_1: Java HotSpot(TM) 64-Bit Server VM warning: Attempt to allocate stack guard pages failed.
<2790868.22> 07/18/2016 18:24:16 - Error: JAVA-Receiver_1: Java HotSpot(TM) 64-Bit Server VM warning: Attempt to allocate stack guard pages failed.
<2790868.22> 07/18/2016 18:24:16 - Error: JAVA-Receiver_1: Java HotSpot(TM) 64-Bit Server VM warning: Attempt to unguard stack red zone failed.
<2790868.22> 07/18/2016 18:24:16 - Error: JAVA-Receiver_1: Java HotSpot(TM) 64-Bit Server VM warning: Attempt to allocate stack guard pages failed.
<2790868.22> 07/18/2016 18:24:16 - Error: JAVA-Receiver_1: Java HotSpot(TM) 64-Bit Server VM warning: Attempt to unguard stack red zone failed.
<2790868.22> 07/18/2016 18:24:16 - Error: JAVA-Receiver_1: Java HotSpot(TM) 64-Bit Server VM warning: Attempt to unguard stack red zone failed.

capture

Copied from original issue: Azure/azure-event-hubs#192

Timeline of 0.11 release?

Hi, I am from HDInsight Spark team and we are building a new connector based on the eventhubs client lib

I'm wondering is there any timeline for the 0.11 release, specifically, when will #50 get merged and release?

Java class is not able to resolve EventData and EventHubClient

I am using following code to send data to Azure event hub and EventData and EventHubClient are not getting resolved in the code and the code is not being compiled.

import java.io.IOException;
import java.nio.charset.;
import java.time.Instant;
import java.util.
;
import java.util.concurrent.ExecutionException;

import com.microsoft.azure.eventhubs.;
import com.microsoft.azure.servicebus.
;
public class Send {

public static void main(String[] args) 
		throws ServiceBusException, ExecutionException, InterruptedException, IOException
{
	// TODO Auto-generated method stub
	final String namespaceName = "ns-1234-dt";
    final String eventHubName = "evt-12345-dt";
    final String sasKeyName = "sas-12345-dt";
    final String sasKey = "<Key>";
    ConnectionStringBuilder connStr = new ConnectionStringBuilder(namespaceName, eventHubName, sasKeyName, sasKey);
    byte[] payloadBytes = "Test AMQP message from JMS".getBytes("UTF-8");
    EventData sendEvent = new EventData(payloadBytes);

    EventHubClient ehClient = EventHubClient.createFromConnectionStringSync(connStr.toString());
    ehClient.sendSync(sendEvent);

}

}

I have added below dependency in pom.xml file while creating Maven project

<dependency>
	<groupId>com.microsoft.azure</groupId>
	<artifactId>azure-eventhubs</artifactId>
	<version>0.13.1</version>
</dependency>

Host should deliver empty list instead of NULL when InvokeProcessorAfterReceiveTimeout is set

Java client's empty message delivery contract is different than SB client. SB client returns an empty list on timeout where Java returns NULL. Better to make behavior consistent across EH clients.

Actual Behavior

  1. OnEvents delivered with NULL messages on timeout.

Expected Behavior

  1. OnEvents delivered with empty list.

Versions

  • OS platform and version: Any
  • Maven package version or commit ID: Any

Reactor thread is not a deamon thread and remans running after EventHubClient is closed

From @ialexandrov on October 21, 2016 7:19

There is already an issue about unmanaged threads used in the JavaClient implementation.

java / azure-eventhubs / src / main / java / com / microsoft / azure / servicebus / MessagingFactory.java

134 final Thread reactorThread = new Thread(new RunReactor(newReactor));
135 reactorThread.start();

Thread is started and then forgoten. Later it prevents JVM from exiting. This is even a bigger issue in the EE environment.

  1. Please switch to using ExecutorService
  2. Please shutdown the executor upon EventHubClient close

Copied from original issue: Azure/azure-event-hubs#283

Incompatibility of C# and Java client

Actual Behavior

  1. send some message containing properties (String, Int) with C# sender
  2. receive the message with Java receiver
  3. Program crashes due to the following ClassCastException (the following example is from a Spark application )
java.lang.Integer cannot be cast to java.lang.String
Serialization trace:
properties (com.microsoft.azure.eventhubs.EventData)
        at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
        at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:195)
        at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:135)
        at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:185)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:150)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:184)
        at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:606)
        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:109)
        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39)
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
        at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
        ... 13 more

Expected Behavior

  1. Client implemented with different languages shall be compatible

Simple Analysis

From the above stack we can see that, the deserializer of Java client is trying to interpret some integer as string. I looked at the C# client code, and found that:

https://github.com/Azure/azure-event-hubs-dotnet/blob/dev/src/Microsoft.Azure.EventHubs/EventData.cs#L51 in c# client, the properties is a Map<String, Object>; in Java client, https://github.com/Azure/azure-event-hubs-java/blob/master/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventData.java#L38 , it is Map<String, String>; when they send (string, int) pair in c#, Java receiver will try to deserialize as <String, String>and of course fail on that

From @SreeramGarlapati:

This is a violation of Amqp SPEC & would be a breaking change:

3.2.5 Application Properties
….
The keys of this map are restricted to be of type string (which excludes the possibility of a null key) and the values are restricted to be of simple types only, that is, excluding map, list, and array types.

Versions

  • OS platform and version: Linux/Windows
  • Maven package version or commit ID: 0.9+

MessageSender and ReplayableWorkItem are not thread-safe but got accessed from various threads

From @ialexandrov on October 21, 2016 11:32

MessageSender has set of fields which are not final. At the same time methods of MessageSender are called from different threads without any state synchronization. This will lead to ocasional phantom isssues.
Either make MessageSender thread safe or make sure all members of it are only accessed from reactor thread,

For example:
MessageSender.lastKnownLinkError is set by reactor thread in onError method and read in sendCore() method which is called from ForJoinPool due to (probably) bug in EventHubClient.send(..,) methods where thenComposeAsync is called instead on thenCompose.

More of less the same problem is with ReplayableWorkItem.lastKnownException which is again is set by reactor thread but read from timer thread pool thread:

java / azure-eventhubs / src / main / java / com / microsoft / azure / servicebus / MessageSender.java
778 MessageSender.this.throwSenderTimeout(sendData.getWork(), sendData.getLastKnownException());

Copied from original issue: Azure/azure-event-hubs#284

Only a single instance of IEventProcessor gets created?

After talking to Dan Rosanova and @jtaubensee, I was asked to created this issue last night - which is also related to #52 that @adriancole created.

Basically, from conversations it seems the design of the EPH focused on creating an instance of IEventProcessor impl per partition. But in the tests I have had so far only a single instance gets created. This would mean that any implementation of IEventProcessor has to be thread-safe and also have a bearing on the checkpointing.

So can you please confirm the instancing policy and thread-safety of IEventProcessor implementations?

Checkpoint immediately before host shutdown may be lost

The semantics of the ILeaseManager and ICheckpointManager interfaces are that the two are completely separate. However, the Azure Storage implementation puts them both in the same storage blob because that is the way the .NET EPH was implemented, which can lead to unexpected interdependencies.

When PartitionContext.checkpoint is called, the lease blob in storage is updated. However, PartitionManager has a separate representation of that blob, as a Lease/AzureBlobLease object, which now has stale data in its checkpoint-related fields. Under most circumstances that doesn't matter because PartitionManager gets all leases every ten seconds, which downloads from storage and replaces the stale object. However, if checkpoint is called and then EPH is shut down before the next ten-second sweep, the act of releasing the leases as part of cleanup causes the stale checkpoint info to be written to storage, overwriting any checkpoints done since the last sweep.

Client should support TokenProvider model

From @hmlam on January 26, 2016 19:20

Just like the .Net counter part, we should have some form of TokenProvider model to support

  1. token generation from key so that we can use token across the wire protocol instead of key
  2. renewal/re-generations of tokens

Copied from original issue: Azure/azure-event-hubs#15

Are there any guarantees about single-threaded access to IEventProcessor callbacks?

Actual Behavior

Right now, there's no documentation about whether instances of IEventProcessor are invoked in a single thread or not. This means that callers need to handle shared state defensively, which is particularly difficult to test as code involved is in package protected signed jars.

Expected Behavior

Ideally, I'd like to see the creation of an IEventProcessor and all callbacks on a single-thread. This makes the programming model very simple, especially if batching is done. Otherwise, I'd at least like to see some documentation on the interface of the factor or the processor about how someone should handle concurrency.

Versions

0.10.0 event hubs. Here's the code I can't quite understand

Pull request in mention, which can be dramatically simplified if single-threaded call backs are made used openzipkin-attic/zipkin-azure#17

Unable to use custom RetryPolicy

From @nirmalpshah on September 22, 2016 20:39

I'd like to use a retry policy with some custom settings. I can create the policy and set it in the ConnectionStringBuilder, but get an error when trying to receive events using EPH.

ConnectionStringBuilder connection = new ConnectionStringBuilder(eventHubConfig.getNamespace(),
eventHubConfig.getName(), eventHubConfig.getPolicyName(), eventHubConfig.getPolicyKey());
connection.setRetryPolicy(new RetryExponential(
Duration.ofMillis(config.backoffMinMillis()),
Duration.ofMillis(config.backoffMaxMillis()),
Integer.MAX_VALUE,
"retryForever"));
String connectionString = connection.toString();
client = com.microsoft.azure.eventhubs.EventHubClient.createFromConnectionStringSync(connectionString);

// stack trace
com.microsoft.azure.servicebus.IllegalConnectionStringFormatException: Connection string parameter 'RetryPolicy'='retryForever' is not recognized
at com.microsoft.azure.servicebus.ConnectionStringBuilder.parseConnectionString(ConnectionStringBuilder.java:384) ~[azure-eventhubs-0.8.2.jar:na]
at com.microsoft.azure.servicebus.ConnectionStringBuilder.(ConnectionStringBuilder.java:149) ~[azure-eventhubs-0.8.2.jar:na]
at com.microsoft.azure.eventhubs.EventHubClient.createFromConnectionString(EventHubClient.java:96) ~[azure-eventhubs-0.8.2.jar:na]
at com.microsoft.azure.eventhubs.EventHubClient.createFromConnectionStringSync(EventHubClient.java:52) ~[azure-eventhubs-0.8.2.jar:na]
...

Copied from original issue: Azure/azure-event-hubs#266

EventHubClient.send() returns a result with partition and message position

Actual Behavior

  1. EventHubClient.send() returns CompletableFuture. We can not know any information, such as: which partition this event is sent to, message position (offset) in stream.

Expected Behavior

  1. EventHubClient.send() returns a result. We can get some information from this result, such as: which partition this event is sent to, message position (offset) in stream.

Versions

  • OS platform and version:
  • Maven package version or commit ID:

EPH checkpoint store initialization creating unwanted checkpoints with offset -1

From @JamesBirdsall on September 15, 2016 0:50

It is legal to never create a checkpoint for a partition. Now that the initial offset provider supports timestamps, it can be a useful scenario to not have a checkpoint and just receive events by time. The checkpoint store initialization is creating unwanted checkpoints which interfere with that scenario.

Copied from original issue: Azure/azure-event-hubs#262

ClassCastException when I consume a message

Actual Behavior

java.lang.ClassCastException: org.apache.qpid.proton.amqp.messaging.AmqpValue cannot be cast to org.apache.qpid.proton.amqp.messaging.Data
at com.microsoft.azure.eventhubs.EventData.(EventData.java:85) ~[azure-eventhubs-0.10.0.jar:na]
at com.microsoft.azure.eventhubs.EventDataUtil.toEventDataCollection(EventDataUtil.java:46) ~[azure-eventhubs-0.10.0.jar:na]
at com.microsoft.azure.eventhubs.PartitionReceiver$3.apply(PartitionReceiver.java:291) ~[azure-eventhubs-0.10.0.jar:na]
at com.microsoft.azure.eventhubs.PartitionReceiver$3.apply(PartitionReceiver.java:287) ~[azure-eventhubs-0.10.0.jar:na]
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) ~[na:1.8.0_111]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) ~[na:1.8.0_111]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[na:1.8.0_111]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[na:1.8.0_111]
at com.microsoft.azure.servicebus.MessageReceiver.onReceiveComplete(MessageReceiver.java:401) ~[azure-eventhubs-0.10.0.jar:na]
at com.microsoft.azure.servicebus.amqp.ReceiveLinkHandler.onDelivery(ReceiveLinkHandler.java:99) ~[azure-eventhubs-0.10.0.jar:na]
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:185) ~[proton-j-0.14.0.jar:na]
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) ~[proton-j-0.14.0.jar:na]
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:309) ~[proton-j-0.14.0.jar:na]
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:276) ~[proton-j-0.14.0.jar:na]
at com.microsoft.azure.servicebus.MessagingFactory$RunReactor.run(MessagingFactory.java:409) ~[azure-eventhubs-0.10.0.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]

Expected Behavior

To receive the message

Versions

  • OS platform and version: Ubuntu 16.04, Java 8
  • Maven package version or commit ID: 0.10.0

Not able to push messages to Event Hub

From @vedhera on February 15, 2017 17:4

Hi,
I am using the following code to send messages to event hub and getting following error:

com.microsoft.azure.servicebus.ServiceBusException: connection aborted

package com.adf.eventhub;

import java.io.IOException;
import java.nio.charset.*;
import java.util.*;
import java.util.concurrent.ExecutionException;

import com.microsoft.azure.eventhubs.*;
import com.microsoft.azure.servicebus.*;

public class Send
{
    public static void main(String[] args)
            throws ServiceBusException, ExecutionException, InterruptedException, IOException {
        final String namespaceName = "mcdp-event-hub";
        final String eventHubName = "akhilesh-event-hub";
        final String sasKeyName = "mysendpolicy";
        final String sasKey = "<Key>";
        ConnectionStringBuilder connStr = new ConnectionStringBuilder(namespaceName, eventHubName, sasKeyName, sasKey);


        byte[] payloadBytes = "Test AMQP message from JMS".getBytes("UTF-8");
        EventData sendEvent = new EventData(payloadBytes);

        EventHubClient ehClient = EventHubClient.createFromConnectionStringSync(connStr.toString());
        ehClient.sendSync(sendEvent);
    }

}

Kindly suggest what is wrong in my implementation.

Copied from original issue: Azure/azure-event-hubs#298

EPH stopped receiving from all partitions after hitting IllegalArgumentException couple of times.

From @serkantkaraca on July 26, 2016 22:22

  • Single process
  • Single host
  • 4 partitions EH

Host ran quite fine for 3 hours in our stress tests and then hit multiple IllegalArgumentExceptions then completely stopped delivering messages.

<8084.20> 07/25/2016 20:32:04 - Error: JAVA-Receiver_0: Jul 25, 2016 8:32:04 PM com.microsoft.azure.servicebus.MessagingFactory$RunReactor run
<8084.20> 07/25/2016 20:32:04 - Error: JAVA-Receiver_0: WARNING: UnHandled exception while processing events in reactor:
<8084.20> 07/25/2016 20:32:04 - Error: JAVA-Receiver_0: java.lang.IllegalArgumentException
<8084.20> 07/25/2016 20:32:04 - Error: JAVA-Receiver_0: org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:112)
<8084.20> 07/25/2016 20:32:04 - Error: JAVA-Receiver_0: org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:307)
<8084.20> 07/25/2016 20:32:04 - Error: JAVA-Receiver_0: org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:276)
<8084.20> 07/25/2016 20:32:04 - Error: JAVA-Receiver_0: com.microsoft.azure.servicebus.MessagingFactory$RunReactor.run(MessagingFactory.java:355)
<8084.20> 07/25/2016 20:32:04 - Error: JAVA-Receiver_0: java.lang.Thread.run(Unknown Source)Cause: null
<8084.20> 07/25/2016 20:32:04 - Error: JAVA-Receiver_0: java.nio.Buffer.position(Unknown Source)
<8084.20> 07/25/2016 20:32:04 - Error: JAVA-Receiver_0: org.apache.qpid.proton.engine.impl.ssl.SimpleSslTransportWrapper.pop(SimpleSslTransportWrapper.java:411)
<8084.20> 07/25/2016 20:32:04 - Error: JAVA-Receiver_0: org.apache.qpid.proton.engine.impl.ssl.SslImpl$UnsecureClientAwareTransportWrapper.pop(SslImpl.java:185)
<8084.20> 07/25/2016 20:32:04 - Error: JAVA-Receiver_0: org.apache.qpid.proton.engine.impl.TransportImpl.pop(TransportImpl.java:1476)
<8084.20> 07/25/2016 20:32:04 - Error: JAVA-Receiver_0: org.apache.qpid.proton.reactor.impl.SelectorImpl.select(SelectorImpl.java:145)
<8084.20> 07/25/2016 20:32:04 - Error: JAVA-Receiver_0: org.apache.qpid.proton.reactor.impl.IOHandler.handleQuiesced(IOHandler.java:57)
<8084.20> 07/25/2016 20:32:04 - Error: JAVA-Receiver_0: org.apache.qpid.proton.reactor.impl.IOHandler.onUnhandled(IOHandler.java:381)
<8084.20> 07/25/2016 20:32:04 - Error: JAVA-Receiver_0: org.apache.qpid.proton.engine.BaseHandler.onReactorQuiesced(BaseHandler.java:87)
<8084.20> 07/25/2016 20:32:04 - Error: JAVA-Receiver_0: org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:206)
<8084.20> 07/25/2016 20:32:04 - Error: JAVA-Receiver_0: org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
<8084.20> 07/25/2016 20:32:04 - Error: JAVA-Receiver_0: org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:307)
<8084.20> 07/25/2016 20:32:04 - Error: JAVA-Receiver_0: org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:276)
<8084.20> 07/25/2016 20:32:04 - Error: JAVA-Receiver_0: com.microsoft.azure.servicebus.MessagingFactory$RunReactor.run(MessagingFactory.java:355)
<8084.20> 07/25/2016 20:32:04 - Error: JAVA-Receiver_0: java.lang.Thread.run(Unknown Source)

Copied from original issue: Azure/azure-event-hubs#200

Move logging framework from logger to slf4j

From @SreeramGarlapati on May 16, 2016 17:1

One open Item here is - if the log4j appender has to push events to EventHubs using the JavaClient - how to ignore the eventhub events (am sure there should be a way)

Copied from original issue: Azure/azure-event-hubs#148

Logging level not configurable in EPH

Actual Behavior

  1. EPH will only ever log SEVERE-level traces and cannot be configured.

Expected Behavior

  1. Logging level can be configured via config file and -Djava.util.logging.config.file=file

Versions

  • OS platform and version: Windows 10
  • Maven package version or commit ID: 0.10.0

This is happening because the code forces the level to SEVERE. That was done because otherwise by default everything INFO and above goes to stderr, which got spammy, and I didn't know that it couldn't be overridden by the config file. One potential approach is to redo the levels so that INFO is not used. WARNING and above occur rarely under normal circumstances and it's OK if they go to stderr by default.

CBSChannel does not complete/return

Actual Behavior

  1. Attempt to send a sas token for cbs
  2. cbsChannel never completes or throws.

Expected Behavior

  1. The action completes and either the error is handled or the completed result is returned

Versions

  • OS platform and version: UBUNTU 14.04
  • Maven package version or commit ID:
    'com.microsoft.azure:azure-eventhubs-eph:0.13.0',
    'com.microsoft.azure:azure-eventhubs:0.13.0',

I cannot use the pre-canned message classes because I need explicit access around the way the sas tokens are controlled as this is part of our process of generating Sas tokens for distribution to untrusted devices.


    public static void main(String[] args) throws IOException,InterruptedException,ExecutionException{

        ConnectionStringBuilder builder  =new ConnectionStringBuilder("ns",
                "metrics", "RootManageSharedAccessKey", "redacted");
        CompletableFuture<MessagingFactory> factoryFuture = MessagingFactory.createFromConnectionString(builder.toString());
        MessagingFactory factory = factoryFuture.get();
        Reactor reactor = new ReactorImpl();
        reactor.start();
        ReactorDispatcher dispatcher = new ReactorDispatcher(reactor);
        CBSChannel channel =  factory.getCBSChannel();
        IOperationResult<Void,Exception> callback = new IOperationResult<Void, Exception>() {
            @Override
            public void onComplete(Void aVoid) {
                System.out.println("Request Completed");
            }

            @Override
            public void onError(Exception e) {
                System.out.println(e.toString());
            }
        };

// I know the values here are wrong but that should not matter
// Hangs inside here indefinitely.
// I do see the request is sent though
        channel.sendToken(dispatcher,"foo","audience", callback);


    }

initial offset provider ignored

From @PeterKelecom on September 9, 2016 15:1

I've set my function to return the current datetime so that the processor will only start processing the new incoming messages. However, the processor still starts from the start of the stream.
Code below:

EventProcessorHost host = new EventProcessorHost(eventHubName, consumerGroupName, eventHubConnectionString.toString(), storageConnectionString);
EventProcessorOptions options = new EventProcessorOptions();
options.setInitialOffsetProvider((partitionId) -> { return Instant.now();});
try {
host.registerEventProcessorFactory(factory, options).get();
}

Copied from original issue: Azure/azure-event-hubs#256

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.