Git Product home page Git Product logo

csharpclient-for-kafka's Introduction

This project will be archived to be in read-only mode as no new development/maintenance is expected. Please look at the Note section below for alternate libraries.

Note

This library was created for Kafka 0.8 with an intention to have a native library built from scratch. With Kafka protocol getting updated frequently with new features (which is expected until it reaches version 1.0), it doesn't seem beneficial to maintain a library built from scratch. The right approach (and as suggested by Confluent) for now would be to use a C# wrapper around the librdkafka C-Library, which the confluent-kafka-dotnet client is doing.

So, if you are using Kafka 0.9 or higher, please move to using the confluent-kafka-dotnet client library.

CSharpClient-for-Kafka

Join the chat at https://gitter.im/Microsoft/Kafkanet .Net implementation of the Apache Kafka Protocol that provides basic functionality through Producer/Consumer classes. The project also offers balanced consumer implementation. The project is a fork from ExactTarget's Kafka-net Client.

Related documentation

Build CSharpClient-for-Kafka

  • Clone CSharpClient-for-Kafka through git clone https://github.com/Microsoft/CSharpClient-for-Kafka.git
  • Open src\KafkaNETLibraryAndConsole.sln in Visual Studio
  • Build Solution

Run Unit Tests

  • Open Test Window in Visual Studio: Test>Windows>Test Explorer
  • Run all

Using Console

  • Setup local Kafka and Zookeeper

Console Options

	topic                           Dump topics metadata, such as: earliest/latest offset, replica, ISR.
    consumesimple                   Consume data in single thread.
    consumegroup                    Monitor latest offset gap and speed of consumer group.
    consumegroupmonitor             Monitor latest offset gap and speed of consumer group.
    producesimple                   Produce data in single thread.
    produceperftest                 Produce data in multiple thread.
    eventserverperftest             Http Post data to event server in multiple thread.
    producemonitor                  Monitor latest offset.
    test                            Run some adhoc test cases.

Using the library

Producer

The Producer can send one message or an entire batch to Kafka. When sending a batch you can send to multiple topics at once

Producer Usage

var brokerConfig = new BrokerConfiguration()
{
    BrokerId = this.brokerId,
    Host = this.kafkaServerName,
    Port = this.kafkaPort
};
var config = new ProducerConfiguration(new List<BrokerConfiguration> { brokerConfig });
kafkaProducer = new Producer(config);
// here you construct your batch or a single message object
var batch=ConstructBatch();
kafkaProducer.Send(batch);

Simple Consumer

The simple Consumer allows full control for retrieving data. You could instantiate a Consumer directly by providing a ConsumerConfiguration and then calling Fetch. CSharpClient-for-Kafka has a higher level wrapper around Consumer which allows consumer reuse and other benefits

Consumer Usage

// create the Consumer higher level manager
var managerConfig = new KafkaSimpleManagerConfiguration()
{
    FetchSize = FetchSize,
    BufferSize = BufferSize,
    Zookeeper = m_zookeeper
};
m_consumerManager = new KafkaSimpleManager<int, Kafka.Client.Messages.Message>(managerConfig);
// get all available partitions for a topic through the manager
var allPartitions = m_consumerManager.GetTopicPartitionsFromZK(m_topic);
// Refresh metadata and grab a consumer for desired partitions
m_consumerManager.RefreshMetadata(0, m_consumerId, 0, m_topic, true);
var partitionConsumer = m_consumerManager.GetConsumer(m_topic, partitionId);

Balanced Consumer

The balanced consumer manages partition assignment for each instance in the same consumer group. Rebalance are triggered by zookeeper changes.

Balanced Consumer Usage

// Here we create a balanced consumer on one consumer machine for consumerGroupId. All machines consuming for this group will get balanced together
ConsumerConfiguration config = new ConsumerConfiguration
{
    AutoCommit = false,
    GroupId = consumerGroupId
    ConsumerId = uniqueConsumerId
    MaxFetchBufferLength = m_BufferMaxNoOfMessages,
    FetchSize = fetchSize,
    AutoOffsetReset = OffsetRequest.LargestTime,
    NumberOfTries = 20,
    ZooKeeper = new ZooKeeperConfiguration(zookeeperString, 30000, 30000, 2000)
};
var balancedConsumer = new ZookeeperConsumerConnector(config, true, m_ConsumerRebalanceHandler, m_ZKDisconnectHandler, m_ZKExpireHandler);
// grab streams for desired topics 
var streams = m_ZooKeeperConsumerConnector.CreateMessageStreams(m_TopicMap, new DefaultDecoder());
var KafkaMessageStream = streams[m_Topic][0];
// start consuming stream
foreach (Message message in m_KafkaMessageStream.GetCancellable(cancellationTokenSource.Token))
....

Contribute

Contributions to CSharpClient-for-Kafka are welcome. Here is how you can contribute to CSharpClient-for-Kafka:

csharpclient-for-kafka's People

Contributors

alexandrnikitin avatar algorithmsarecool avatar danielli90 avatar ducas avatar edenhill avatar epayet avatar gitter-badger avatar johnstark avatar jthelin avatar mhorstma avatar msftgits avatar riamandii avatar soumyajit-sahu avatar wha-deploy avatar yixuanliu avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

csharpclient-for-kafka's Issues

Low performance

Basically, it's a question that could grow to issues: is performance in focus? Any plans to work on it?

ZKSessionExpireListener - HandleStateChanged error in code

In ZKSessionExpireListener.cs - HandleStateChanged method the condition for the new state seems to be reversed.

The condition is "if (newState != Disconnected)" and then the code inside refers to the new state being a disconnected connection. The condition should be "if (newState == Disconnected)"

untitled

Roadmap and support

Hi all,

I was looking into the various.NET Kafka clients out there and stumbled across this one. I was wondering if there is any official or unofficial commitment from Microsoft for this client? Where is it used? What is the roadmap for this library? Would be cool to get some insights.

Thanks

Multiples consumers for multiples zookeepers

Dears,

We have an application that create two consumers. Each consumer reads messages from diferents zookeepers servers.
The problem that occurs is that the second consumer does not read from it zookeeper server or else it read from the first consumer zookeepers servers.

We have create two consumer configuration object, two ZookeeperConsumerConnector and two streams.

Best regards

Nuget Package

When will be the nuget package for this Kafka Client released?

Messages received multiple times when using ZookeeperConsumerConnector.CommitOffset

When using a balanced consumer and manually committing offsets for a topic/partition, the CommitOffset method sets the fetcher's position to the offset being committed. If this offset is behind the most recently received message(s) (e.g. because it's still being processed) then the message(s) will be fetched again.

e.g.

// assumes consumer created for topic named "test.topic"
stream.iterator.MoveNext() // offset = 1, partition 0
stream.iterator.Current // get message with offset 1, p 0
stream.iterator.MoveNext() // offset = 2, partition 0
stream.iterator.Current // get message with offset 2, p 0

consumer.Commit("test.topic", 0, 1)

stream.iterator.MoveNext()
stream.iterator.Current // gets message with offset 2, p0

In the above case, I would expect to receive message with offset 3. However, this is not the case because of lines 244 and 245 in ZookeeperConsumerConnector.cs:

   partitionTopicInfo.ConsumeOffset = offset;
   partitionTopicInfo.FetchOffset = offset;

This behaviour is desirable in many circumstances, but should be configurable/overridable.

FailedToSendMessageException when RequiredAcks != 0

Hi everyone,

I'm using CSharpClient-for-Kafka NuGet 1.0.47 against Kafka 0.9.0.1 to implement a simple producer.

Every time I try to send a message with non-default ProducerConfiguration.RequiredAcks, the client fails after 3 attempts with FailedToSendMessageException.
Interesting fact is that the message is actually delivered to the broker three times, and even passed to the consumers three times. When I don't use any acknowledgements, sending message works perfectly fine, but for my use case I need delivery guarantees, so RequiredAcks=0 is not really an option.

I've tried using the Kafka built-in kafka-console-producer.sh and it works as expected even with --request-required-acks set to 1 or -1 - the message is delivered once and with no error.

This leads me to believe there's either an issue with the Producer (not processing acks correctly?) or the way I'm using it.

The topic has 5 partitions on 2 brokers, replication factor 1.

My code to reproduce:

var brokers = new[] { ... };    
var config = new ProducerConfiguration(brokers){RequiredAcks=1};
var producer = new Producer(config);
var message= new Message(Encoding.UTF8.GetBytes("Hi from C#"));
var batch = new ProducerData<string, Message>("test", "key", message);
producer.Send(batch);

The exception thrown on producer.Send:

Failed to send messages after 3 tries. FailedProducerDatas not empty. Success Count:0 Failed Count: 1.
================Failed sent message key: Topic: test Key: key
================Failed Detail: Broker:0,Topic:test PartitionID:1,Error:UnknownCode Offset:-1

Stack trace:

   at Kafka.Client.Producers.DefaultCallbackHandler`2.Handle(IEnumerable`1 events)
   at Kafka.Client.Producers.Producer`2.Send(IEnumerable`1 data)
   at Kafka.Client.Producers.Producer`2.Send(ProducerData`2 data)

kafka-console-consumer.sh output after a single call:

Hi from C#
Hi from C#
Hi from C#

Thank you for any advice!

ConsumerIterator Lag Bad managed

Consuming messages from one topic does not manage lag propertly. When consumed all messages, I get Lag=1 for consumer group (using native consumerCheckeroffset tool) and when I produce new messages i get this exception:

ERROR [8] consumed offset: 34 doesn't match fetch offset: 35 for topic:0: fetched offset = 35: consumed offset = 34; consumer may lose data

Could it be, in ConsumerIterator.cs ( MakeNext function)?
MessageAndOffset item = current.Current;
consumedOffset = item.MessageOffset;

I saw in other net libraries (like ExactTargetDev/kafka-net) that they use next offset to commit offset:

this.consumedOffset = item.NextOffset;

here is my code :
// CONSTRUCTOR
public KafkaSimpleConsumer(ConsumerConfiguration configuration, string topic)
{
try
{
zkConsumerConnector = new ZookeeperConsumerConnector(configuration, true);
consumerStreams = zkConsumerConnector.CreateMessageStreams(new Dictionary<string, int> { { topic, 1 } }, new DefaultDecoder());
streams = consumerStreams[topic];
it = streams[0].GetEnumerator();
}
catch (Exception ex)
{
throw ex;
}

    }

// FUNCTION FOR READING
public Message GetNextKafkaMsg()
{
try
{
if (it.MoveNext())
return it.Current;
else
return null;
}
catch (ConsumerTimeoutException)
{
it = streams[0].GetEnumerator();
return null;
}
}

log4net dependency

This component would be much better if it wouldn't depend on log4net. It causes version conflicts if the application doesn't reference the same version and assembly binding redirection is not always possible because log4net switched strong name keys with version 1.2.11

Possible issue in Message.cs

I am very new to this codebase, but this code seems odd to me in Message.cs:

        /// <summary>
        /// Try to show the payload as decoded to UTF-8.
        /// </summary>
        /// <returns>The decoded payload as string.</returns>
        public override string ToString()
        {
            var sb = new StringBuilder();
            sb.Append("Magic: ");
            sb.Append(this.Magic);
            if (this.Magic == 1)
            {
                sb.Append(", Attributes: ");
                sb.Append(this.Attributes);
            }

            sb.Append(", topic: ");
            try
            {
                sb.Append(Encoding.UTF8.GetString(this.Payload));
            }
            catch (Exception)
            {
                sb.Append("n/a");
            }

            return sb.ToString();
        }

Specifically the code that inserts topic: {payload} . (i.e. the label implies it is the topic, but in fact it shows the payload not the topic).

Kerberos support

Hello, I would like to know if you are planning to publish the support of kerberos in this CSharpClient? If yes, is there any planned date for it? Thanks a lot!

ZooKeeper can't load log4net assembly

var consumerConfig = ConsumerConfiguration.Configure("consumer");
var zkConsConnector = new ZookeeperConsumerConnector(consumerConfig, true); //exceptionThrownHere

Exception message:

ZookeeperConsumerConnector exception: Exception Message: Unable to connect to
SERVERSTRINGS
Source: KafkaNET.Library
Stack Trace:
в Kafka.Client.ZooKeeperIntegration.ZooKeeperConnection.Connect(IWatcher watcher)
в Kafka.Client.ZooKeeperIntegration.ZooKeeperClient.Connect()
в Kafka.Client.Consumers.ZookeeperConsumerConnector.ConnectZk()
в Kafka.Client.Consumers.ZookeeperConsumerConnector..ctor(ConsumerConfiguration config, Boolean enableFetcher, EventHandler rebalanceHandler, EventHandler zkDisconnectedHandler, EventHandler zkExpiredHandler)

---- Inner Exception ----
Exception Message: Could not load file or assembly "log4net, Version=1.2.15.0, Culture=neutral, PublicKeyToken=1b44e1d426115821" or one of its dependencies. The located assembly's manifest definition does not match the assembly reference. (Exception from HRESULT: 0x80131040)
Source: ZooKeeperNet
Stack Trace:
в ZooKeeperNet.ZooKeeper..ctor(String connectstring, TimeSpan sessionTimeout, IWatcher watcher)
в Kafka.Client.ZooKeeperIntegration.ZooKeeperConnection.Connect(IWatcher watcher)

---- Inner Exception ----
Exception Message: Could not load file or assembly "log4net, Version=1.2.10.0, Culture=neutral, PublicKeyToken=1b44e1d426115821" or one of its dependencies. The located assembly's manifest definition does not match the assembly reference. (Exception from HRESULT: 0x80131040)
Source:
Stack Trace:

App.config:

<runtime>
    <assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
      <dependentAssembly>
        <assemblyIdentity name="log4net" publicKeyToken="1b44e1d426115821" culture="neutral" />
      </dependentAssembly>
        <bindingRedirect oldVersion="0.0.0.0-1.2.10.0" newVersion="1.2.15.0"/>
    </assemblyBinding>
  </runtime>

Please tell me how Can I fix it? And why should we use log4net instead of ILog interface?

Offsets handling inconsistent with Java consumers

Use Cases

As a developer
I want to be able to use Java and .NET consumers in the same group
So that I can test them side by side

As a developer
I want lags to be reported with .NET consumers in the same way Java consumers report them
So that I don't have to change my monitoring tools depending on the client

Description

Java consumers commit the offset of the next expected message and start from there when next consuming. Instead, this library commits the offset of the last message received. E.g.

  • Java consumer
    • Receives message with P:0 O:1
    • Commits to P:0 with O:2
  • kafkanet consumer
    • Receives message with P:0 O:1
    • Commits to P:0 with O:1

This means that all monitoring tools (e.g. Kafka Manager) report a lag of 1 when the consumer is at the end of the topic instead of a lag of 0. It also means that when I swtich from a Java consumer to kafkanet consumer I'll miss a message, because it will try to start at P:0 O:3.

Proposed fix

A configuration flag to change the behaviour of the ZookeeperConsumerConnector and Fetcher to allow it to use offsets the same way that Java consumers do.

Fetch offset repeating from some random previously committed offset after reaching end of messages, in loop.

I have been hitting this problem intermittently. I have 40 msgs in a topic (1 partition), reading with Consumer Group, ZookeeperConnectorCount =1, CommitBatchSize = 2 (3, 5, 17, 25 etc didn't make diff w.r.t this issue).

From the filtered log snippet below, you can see that Message keeps going more than 40 and Offset value resets to some value already seen (Offset:4 below), and this continues. When I reach end of offset, again it will be reset to some other value - this repeats indefinitely.

If I kill the consumer and run again at later time, it will work just fine - will read till last offset and waits for new msgs.

Have you seen this behavior? Are there any settings that can control this? I am blocked by this, appreciate ANY help.
Repro_Detail_log.txt

1922:2016-05-17 01:05:56,781 [3] INFO MDM.MessageBrokerManager.ConsumerGroupHelperUnit [(null)] - Message 39 from Partition:0, Offset:38, key:(null), value:{
1971:2016-05-17 01:05:56,996 [3] INFO MDM.MessageBrokerManager.ConsumerGroupHelperUnit [(null)] - Message 40 from Partition:0, Offset:39, key:(null), value:{
2020:2016-05-17 01:05:57,162 [3] INFO MDM.MessageBrokerManager.ConsumerGroupHelperUnit [(null)] - Message 41 from Partition:0, Offset:4, key:(null), value:{
2067:2016-05-17 01:05:57,349 [3] INFO MDM.MessageBrokerManager.ConsumerGroupHelperUnit [(null)] - Message 42 from Partition:0, Offset:5, key:(null), value:{

Any plans for a DotNetCore implementation?

I am just wondering if there are any plans to convert this project (or add the support) to a dotnetcore implementation in order to support both ASPNET Core, UWP, and future Application Models.

Thank you

VersionId used in PartitionLeaderFinder when creating topic metadata request seems to be incorrect

In file: "CSharpClient-for-Kafka/src/KafkaNET.Library/Consumers/PartitionLeaderFinder.cs" on line 80 when making the creating a TopicMetadataRequest to get consumer metadata, a hard coded value of 1 is passed in as the versionId:

IEnumerable<TopicMetadata> metaData = consumer.GetMetaData(TopicMetadataRequest.Create(new[] { partition.Topic }, 1, 0, clientId));

Looking at other instances of VersionId used in other classes, seems like this should be set to 0.

Having it set to 1 causes the partition not finding a leader and results in the following exception, which fails silently:

2018-09-26 16:11:24.6465 [21] WARN: Kafka.Client.Consumers.PartitionLeaderFinder:Error retrieving meta data from broker 0: Exception Message: Unable to read data from the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond.
Source: System
Stack Trace:
at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size)
at System.IO.BinaryReader.ReadBytes(Int32 count)
at Kafka.Client.Utils.BitWorks.ReadShortString(KafkaBinaryReader reader, String encoding)
at Kafka.Client.Cluster.Broker.ParseFrom(KafkaBinaryReader reader)
at Kafka.Client.Requests.TopicMetadataRequest.Parser.ParseFrom(KafkaBinaryReader reader)
at Kafka.Client.KafkaConnection.Handle[T](Byte[] data, IResponseParser1 parser, Boolean shouldParse)
at Kafka.Client.KafkaConnection.Send(TopicMetadataRequest request)
at Kafka.Client.Consumers.Consumer.GetMetaData(TopicMetadataRequest request)
at Kafka.Client.Consumers.PartitionLeaderFinder.Start()

---- Inner Exception ----
Exception Message: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond
Source: System
Stack Trace:
at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size)

Examples are very complex to use and sample code as well.

Examples are very complex to understand and use.

Also as in other bug, in home page producer example needs broker id. And we cannot get broker id directly.
Please give workable examples or if we need to call topic metadata to find broker id, please give a complete example.

[PVS-Studio] V3005 - ConsumeGroupFindNewLeaderSleepIntervalMs variable assigned to itself

The kind people at Viva64 gave an evaluation license for PVS-Studio to try out on some OSS projects, and running it on Kafkanet source code found this problem.
http://www.viva64.com/en/pvs-studio/

https://github.com/Microsoft/Kafkanet/blob/master/src/KafkaNET.Library/Cfg/ConsumerConfiguration.cs#L127

Error V3005. The 'ConsumeGroupFindNewLeaderSleepIntervalMs' variable is assigned to itself
http://www.viva64.com/en/d/0403/print/

I imagine that line should probably be same as Line 93 ?

this.ConsumeGroupFindNewLeaderSleepIntervalMs = DefaultConsumeGroupFindNewLeaderSleepIntervalMs;

Zookeeper ReadData function not working properly

I have been using the ReadData function to query the zookeeper for brokers, offsets, etc.

I am trying to query the zookeeper for a list of consumer groups by using the ReadData function and passing the path: "/consumers". For some reason I get returned null back.

I also have a nodejs app that connects to zookeeper and by doing the same thing (passing the same path to their function) I am able to query all the consumer groups.

I am doing something wrong in the c# version one?

The exact command is this: zk.ReadData("/consumers", true);
where zk is my zookeeper client

Consumer method is not work

I'm using version 0.9.0. I can send message to queue successfully but i can not read message.

this method;

var managerConfig = new KafkaSimpleManagerConfiguration()
{
FetchSize = FetchSize,
BufferSize = BufferSize,
Zookeeper = m_zookeeper
};
m_consumerManager = new KafkaSimpleManager<int, Kafka.Client.Messages.Message>(managerConfig);
// get all available partitions for a topic through the manager
var allPartitions = m_consumerManager.GetTopicPartitionsFromZK(m_topic);
// Refresh metadata and grab a consumer for desired partitions
m_consumerManager.RefreshMetadata(0, m_consumerId, 0, m_topic, true);
var partitionConsumer = m_consumerManager.GetConsumer(m_topic, partitionId);

or this;

ConsumerConfiguration config = new ConsumerConfiguration
{
AutoCommit = false,
GroupId = consumerGroupId
ConsumerId = uniqueConsumerId
MaxFetchBufferLength = m_BufferMaxNoOfMessages,
FetchSize = fetchSize,
AutoOffsetReset = OffsetRequest.LargestTime,
NumberOfTries = 20,
ZooKeeper = new ZooKeeperConfiguration(zookeeperString, 30000, 30000, 2000)
};
var balancedConsumer = new ZookeeperConsumerConnector(config, true, m_ConsumerRebalanceHandler, m_ZKDisconnectHandler, m_ZKExpireHandler);
// grab streams for desired topics
var streams = m_ZooKeeperConsumerConnector.CreateMessageStreams(m_TopicMap, new DefaultDecoder());
var KafkaMessageStream = streams[m_Topic][0];
// start consuming stream
foreach (Message message in m_KafkaMessageStream.GetCancellable(cancellationTokenSource.Token))

are not work.

Balanced consumer in new group doesn't retrieve messages from topics that don't start at offset 0

Use Case

As a new balanced consumer
I want to consume messages from an old topic
So that I can start to perform operations based on these messages

Description

I have a topic with 2 partitions that has been published to for weeks now. The logs have been cleaned due to retention and it now has a starting offset greater than 0 on both partitions. When I start building an application that will use a balanced consumer via the ZookeeperConsumerConnector and try to consumer messages from this topic, I expect to (by default) start at the first message on each partition. Instead I find that I receive no messages.

Repro Steps

  1. Find a topic that has had its logs cleaned up so no longer starts at 0
  2. Create a new balanced consumer with a new group id usine ZookeeperConsumerConnector and open a stream to this topic
  3. Wait for messages...

Notes

  • Wireshark shows that some comms with kafka are happening, but they quickly end and only zookeeper is being talked to.
  • If I manually create the path structure and an offset in Zookeeper for the consumer (e.g. create /consumers/my.consumer/offsets/my.topic/0 100) and restart the application it starts to receive messages.
  • Here is a log file that shows pretty much nothing happening -KafkaNet.Lib._[15696]Detail.log2016_03_31.txt

Switch Zookeeper.NET dependecy to ZooKeeperNetEx

I've tried this library and while it works really well when Zookeeper is available, it falls down tremendously when the Zookeeper hostname is invalid, or there is no Zookeeper up to handle the connections. When the hostname is invalid, it throws an unhandled null ptr exception and I had to write some hacky code to make sure the hostname was valid before trying to connect. The library also seems to be unmaintained. Please switch to using this library instead https://www.nuget.org/packages/ZooKeeperNetEx/

Is there any retry mechanism in kafka-net dll while message sending failure?

@edenhill , @Jroland
I have used kafka-net to send messages on Kafka-Topic. I am running producer on Windows Server 2012 R2 OS machine and Consumer on Cent OS machine.

While sending messages on kafka, some messages are being dropped even though it is not sending messages continuously. After checking log files of Kafka Topic, it did not written those dropped messages in its logs.

How can I ensure that my message has been sent to Kafka-Topic successfully?

If sending message on Kafka get failed, then how to resend the same message again?

Port in CoreCLR

Hi,
I'm curious than why this project hasn't been created with coreclr support in a xproj project.
I know that the project is in Release Candidate but it support netframework very well.
Did you plan to migrate the project in an xproj one?

ConsumedOffset is handled incorrectly when an OffsetOutOfRange error is received by the fetcher

Description

In cases where a consumer cannot keep up with the incoming message rate and the fetcher attempts to begin fetching using an offset that no longer exists on the broker because it has been truncated, Kafka returns an OffsetOutOfRange error. This error is handled in FetcherRunnable.cs around line 120, where the partitionTopicInfo.FetchOffset and ConsumeOffset are updated to the new value computed by ResetConsumerOffsets(). The bug occurs later in the ConsumerIterator class, where the MakeNext() method updates the currentTopicInfo.ConsumeOffset to the currentDataChunk.FetchOffset value (ConsumerIterator.cs, line 256).

In cases where the OffsetOutOfRange error was handled, this will cause the ConsumeOffset to be set forward (correctly) by FetcherRunnable, and then reset back to the old value when MakeNext() is next called. The result is seen in log lines that look like this:

DEBUG Kafka.Client.Consumers.PartitionTopicInfo - reset fetch offset of xxxx:y: fetched offset = 111101027: consumed offset = 110956806 to 111101027
DEBUG Kafka.Client.Consumers.PartitionTopicInfo - reset consume offset of xxxx:y: fetched offset = 111101027: consumed offset = 111101027 to 111101027
INFO  Kafka.Client.Consumers.FetcherRunnable - xxxx:y: fetched offset = 111101027: consumed offset = 111101027 marked as done.
DEBUG Kafka.Client.Messages.BufferedMessageSet - MakeNext() in deepIterator: innerDone = True
DEBUG Kafka.Client.Messages.BufferedMessageSet - Message is uncompressed. Valid byte count = 8361220
DEBUG Kafka.Client.Consumers.PartitionTopicInfo - reset consume offset of xxxx:y: fetched offset = 111101027: consumed offset = 110956807 to 110956807

This causes the ConsumerIterator to start timing out whenever calling channel.TryTake() and the consumer process stalls, since it is then unable to make forward progress.

Proposed "Fix"

ConsumerIterator needs to be aware when FetcherRunnable updates the parititonTopicInfo ConsumeOffset in error situations like the OffsetOutOfRange error. If the value is updated, ConsumerIterator should not overwrite the updated value and should reset itself to the new offset appropriately so it can continue to make forward progress.

Note: This is a very similar issue to Issue #41

Balanced Consumer Usage throw TimeoutException

I try to set up balanced consumer with the following code.
ConsumerConfiguration consumer = new ConsumerConfiguration() { AutoCommit = false, GroupId = "testgroupid", ConsumerId="testconsumerid", ZooKeeper = new ZooKeeperConfiguration("192.168.198.3:2181", 0, 6000, 2000) }; ZookeeperConsumerConnector zoo = new ZookeeperConsumerConnector(consumer, true); IDictionary<string, int> topicMap = new Dictionary<string, int> { { "paidorder", 1 } }; var streams = zoo.CreateMessageStreams(topicMap, new DefaultDecoder()); var KafkaMessageStream = streams["paidorder"][0];

But it throws timeoutexception, is this happend in anyone else?
[TimeoutException: The request �/consumers/testgroupid/ids� timed out while waiting for a response from the server.] ZooKeeperNet.ClientConnection.SubmitRequest(RequestHeader h, IRecord request, IRecord response, WatchRegistration watchRegistration) +200 ZooKeeperNet.ZooKeeper.Exists(String path, IWatcher watcher) +177 ZooKeeperNet.ZooKeeper.Exists(String path, Boolean watch) +48 Kafka.Client.ZooKeeperIntegration.ZooKeeperConnection.Exists(String path, Boolean watch) in C:\Users\yidepiao\Documents\CSharpClient-for-Kafka\src\KafkaNET.Library\ZooKeeperIntegration\ZooKeeperConnection.cs:187 Kafka.Client.ZooKeeperIntegration.<>c__DisplayClass30_0.<Exists>b__0() in C:\Users\yidepiao\Documents\CSharpClient-for-Kafka\src\KafkaNET.Library\ZooKeeperIntegration\ZooKeeperClient.cs:461 Kafka.Client.ZooKeeperIntegration.ZooKeeperClient.RetryUntilConnected(Func1 callback) in C:\Users\yidepiao\Documents\CSharpClient-for-Kafka\src\KafkaNET.Library\ZooKeeperIntegration\ZooKeeperClient.cs:327
Kafka.Client.ZooKeeperIntegration.ZooKeeperClient.Exists(String path, Boolean watch) in C:\Users\yidepiao\Documents\CSharpClient-for-Kafka\src\KafkaNET.Library\ZooKeeperIntegration\ZooKeeperClient.cs:460
Kafka.Client.ZooKeeperIntegration.ZooKeeperClient.Exists(String path) in C:\Users\yidepiao\Documents\CSharpClient-for-Kafka\src\KafkaNET.Library\ZooKeeperIntegration\ZooKeeperClient.cs:440
Kafka.Client.ZooKeeperIntegration.<>c__DisplayClass82_0.b__0() in C:\Users\yidepiao\Documents\CSharpClient-for-Kafka\src\KafkaNET.Library\ZooKeeperIntegration\ZooKeeperClient.Watcher.cs:373
Kafka.Client.ZooKeeperIntegration.ZooKeeperClient.RetryUntilConnected(Func1 callback) in C:\Users\yidepiao\Documents\CSharpClient-for-Kafka\src\KafkaNET.Library\ZooKeeperIntegration\ZooKeeperClient.cs:327 Kafka.Client.ZooKeeperIntegration.ZooKeeperClient.WatchForChilds(String path) in C:\Users\yidepiao\Documents\CSharpClient-for-Kafka\src\KafkaNET.Library\ZooKeeperIntegration\ZooKeeperClient.Watcher.cs:370 Kafka.Client.ZooKeeperIntegration.ZooKeeperClient.Subscribe(String path, IZooKeeperChildListener listener) in C:\Users\yidepiao\Documents\CSharpClient-for-Kafka\src\KafkaNET.Library\ZooKeeperIntegration\ZooKeeperClient.Watcher.cs:253 Kafka.Client.Consumers.ZookeeperConsumerConnector.Consume(IDictionary2 topicCountDict, IDecoder1 decoder) in C:\Users\yidepiao\Documents\CSharpClient-for-Kafka\src\KafkaNET.Library\Consumers\ZookeeperConsumerConnector.cs:618 Kafka.Client.Consumers.ZookeeperConsumerConnector.CreateMessageStreams(IDictionary2 topicCountDict, IDecoder1 decoder) in C:\Users\yidepiao\Documents\CSharpClient-for-Kafka\src\KafkaNET.Library\Consumers\ZookeeperConsumerConnector.cs:330

Producer example is not complete

When I'm trying to use

var config = new ProducerConfiguration(new List<BrokerConfiguration> { brokerConfig });
kafkaProducer = new Producer(config);

It gives me

Failed to load configuration The type initializer for 'Kafka.Client.Producers.Sync.SyncProducerPool' threw an exception.

From README it's not clear what should be set to initialize Producer properly

An existing consumer can become locked if consumed offset is too low

Use Case

As an consumer in an existing group
I want to know when my offset is out of range
So that I can make a decision about how to proceed

Description

As described in issue #39, a balanced consumer will stop receiving messages if its committed offset is too far behind the start of the topic. This line seems to be causing this problem - https://github.com/Microsoft/Kafkanet/blob/master/src/KafkaNET.Library/Consumers/FetcherRunnable.cs#L83

IEnumerable fetchablePartitionTopicInfos = _partitionTopicInfos.Where(pti => pti.NextRequestOffset - pti.ConsumeOffset < _fetchBufferLength);

If the consume offset is too low then the FetcherRunnable will consider it not fetchable and not issue any fetches...

We need to either present this as an issue to the consuming library or move on our merry way by resetting it to whatever the value of auto.offset.reset is.

Repro Steps

  1. Create a balanced consumer with a max fetch batch size of 1000 (default) and start consuming messages from a topic with a short retention period
  2. Stop the consumer, push a lot of messages in and wait until the retention period has passed and the consumed offset is more than 1000 behind the start of the topic
  3. Start the consumer
  4. Observe nothing happening...

consumer unable to connect zookeeper

ConsumerConfiguration consumer = new ConsumerConfiguration()
            {
                AutoCommit = true,
                GroupId ="microsoft_console_group",
                ZooKeeper = new ZooKeeperConfiguration("10.0.0.4:2181",0,6000,2000)
            };
            ZookeeperConsumerConnector zoo = new ZookeeperConsumerConnector(consumer, true);

throw error at
zookeeperIntegration\zookeeperconnection.cs
this._zkclient = new ZooKeeper(this.Servers, new TimeSpan(0, 0, 0, 0, this.SessionTimeout), watcher);
said unable to connect to zookeeper
try to check my zookeeper it was working fine and able to telnet to 10.0.0.4 and used terminal console to connect, any method to solve this ?

In BrokerPartitionInfo.cs list of brokers is never refreshed from Zookeeper

BrokerPartitionInfo UpdateInfo() is used by KafkaSimpleManager.RefreshMetadataInternal() to find new leader of partitions.

The problem is that even if ZkUtils.GetTopicMetadataInzookeeper() is called, the list of broker used for metadata call is from syncProducerPool and is never refreshed from Zookeeper because ZkUtils.GetAllBrokersInCluster() is only called once.

Multiple fetcher threads created when partition leader changes

We have found ourselves receiving duplicate messages within the same consumer instance. When looking through the logs I noticed the following series of events:

  1. Broker no longer the leader for a partition causing a rebalance
  2. Consumer rebalances and receives n partitions (2 in this case)
  3. Consumer creates n+1 fetcher threads
  4. 2 Fetcher threads consistently retrieving messages from kafka for the same topic and partition

I've attached a scrubbed log excerpt... Any ideas would be helpful -
consumer_log.txt

Digging into the source it looks like the issue might be a race condition in Fetcher.cs. On line 189 a topicInfo dictionary is built, but how does it know which broker to use...? On line 201 a new PartitionLeaderFinder is spun up, which ideally finds the leaders for partitions added to the and starts fetchers. Could this finish before the loop through partitions on line 209 starts and multiple fetchers be created...? Shouldn't this just be completed synchronously before the new fetchers are create...?

I'm a bit lost in this part of the code so please excuse the ignorance... 😄

Support of Kafka offsets vs Zookeeper offsets

Hi there. I couldn't find something in documentation, and I was wondering if the client supports Kafka offsets in addition to Zookeeper offsets.

Ideally I would like to enable double commitment​s to both types until all the topics are migrated, then completely switch to kafka offsets.

Thank you in advance for your time

The lag can not be committed to zero

dears
I have tried this library , it works very well except one small problem.
The lag can not be committed to zero.For example,if there is 20 messages in the partition, when i call the commit method, the offset in that partition is always 19, there is still one message can not be committed and the lag is 1.
I have tried to attempted the problem as best as my can, but it do not work,so i need your help.
Thanks very much!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.