jet / kafunk Goto Github PK
View Code? Open in Web Editor NEWKafunk: F# Kafka client
Home Page: https://jet.github.io/kafunk/
License: Other
Kafunk: F# Kafka client
Home Page: https://jet.github.io/kafunk/
License: Other
Create a CSharp.fs
with namespace Kafunk.CSharp
with adapters for idiomatic C#. Some thoughts:
Async
to Task
.KafkaConnection
class wrapping KafkaConn
, with methods for each API operation.KafkaConsumer
class wrapping Consumer
, with methods to start consumption, accepting a callabackKafkaProducer
class wrapping Producer
, with a method to produce a message.Hi there,
What version of Kafka does this repo support?
What are the release plans for the client to nuget in a 'stable' version (given that major can be bumped for major changes)?
Prototype using .NET BufferManager here. The interface is:
type IBufferPool
val alloc : int -> byte[]
val free : byte[] -> unit
Pooling buffers for requests is trivial because the lifecycle is controlled explicitly by the Chan
module - the region is well known.
Pooling buffers for responses is more challenging, because the response lifecycle is in the hands of the user. For example, take the Consumer.stream
operation which yields control to the caller.
Some options:
ConsumerMessageSet -> 'a
) into the consumer such that after decoding, the buffers could be freed.Consumer.consume
and a contract to free buffers corresponding to the message set after the user provided handler completes.ConsumerMessageSet
implement IDisposable
, along with a finalizer, and urge users to dispose when done.IDisposable
.IBufferPool
and have the responsibility to free buffers for `ConsumerMessageSet.AsyncSeq
which would eschew buffering, thereby giving us a controlled lifecycle for its elements.We need a framework for fault tolerance which will capture Kafka error codes and react accordingly:
Ideally, we would build on fault tolerance mechanisms based on Async.
when running the tutorial I get:
System.Exception
at [email protected](Unit unitVar) in D:\code\kafunk\src\kafunk\Kafka.fs:line 286
at [email protected](AsyncParams`1 args)
--- End of stack trace from previous location where exception was thrown ---
at Microsoft.FSharp.Control.AsyncBuilderImpl.commit[a](Result`1 res)
at Microsoft.FSharp.Control.CancellationTokenOps.RunSynchronously[a](CancellationToken token, FSharpAsync`1 computation, FSharpOption`1 timeout)
at Microsoft.FSharp.Control.FSharpAsync.RunSynchronously[T](FSharpAsync`1 computation, FSharpOption`1 timeout, FSharpOption`1 cancellationToken)
at <StartupCode$FSI_0002>.$FSI_0002.main@() in D:\code\kafunk\docs\content\index.fsx:line 103
> Stopped due to error
there are a couple of things like:
match byGroupId |> Dict.tryGet r.groupId with
| Some send -> return! send req
| None -> return failwith ""
this should be changed to provide additional information.
Related question: why is it failing for me? ;-)
Getting this error when I try to use localhost:9092
to connect to my local Kafka broker
2017-01-12 15:01:58:2242|INFO|Kafunk.Chan|tcp_connecting|remote_endpoint=127.0.0.1:9092 client_id=21ca81fd3f31488e98a8f8055c9f144b
2017-01-12 15:01:58:2553|INFO|Kafunk.Chan|tcp_connected|remote_endpoint=127.0.0.1:9092 local_endpoint=127.0.0.1:11226
Unhandled Exception: System.AggregateException: One or more errors occurred. ---> System.ArgumentException: The option value was None
Parameter name: option
at Microsoft.FSharp.Core.OptionModule.GetValue[T](FSharpOption`1 option)
at Kafunk.Routes.addGroupCoordinator(String gid, String host, Int32 port, Routes rt)
at <StartupCode$Kafunk>[email protected](Routes arg10)
at Kafunk.ConnState.updateRoutes(FSharpFunc`2 f, ConnState s)
at <StartupCode$Kafunk>[email protected](GroupCoordinatorResponse _arg6)
at [email protected](a a)
--- End of inner exception stack trace ---
at Microsoft.FSharp.Control.AsyncBuilderImpl.commit[a](Result`1 res)
at Microsoft.FSharp.Control.CancellationTokenOps.RunSynchronously[a](CancellationToken token, FSharpAsync`1 computation, FSharpOption`1 timeout)
at Microsoft.FSharp.Control.FSharpAsync.RunSynchronously[T](FSharpAsync`1 computation, FSharpOption`1 timeout, FSharpOption`1 cancellationToken)
at KafunkData.createConsumer() in C:\Users\erich\Documents\Visual Studio 2015\Projects\learn_marvel_tablestorage\learn_marvel_tablestorage\KafunkData.fs:line 41
at KafunkData.testConsumerStream() in C:\Users\erich\Documents\Visual Studio 2015\Projects\learn_marvel_tablestorage\learn_marvel_tablestorage\KafunkData.fs:line 92
at Program.main(String[] argv) in C:\Users\erich\Documents\Visual Studio 2015\Projects\learn_marvel_tablestorage\learn_marvel_tablestorage\Program.fs:line 89
Start with GZip -
Currently, a single consumer group id can be used to consume a single topic (with multiple consumer instances). However, it may be desirable to have a consumer instance consume a single topic, but have a leader of a consumer group be aware of multiple topics.
The AsyncSeq.mergeAll is deadlocking on new generations.
The cancellation of a generation needs to be linked to the cancellation of the merge Task.WaitAny or the merge Async computation.
@jturek said Apache makes sense
There are a few nuances to the protocol:
These need to be handled gracefully.
Currently, Kafunk logs a good deal of information to STDOUT, and the events are structured loosely as follows:
type Evt =
struct
val name : string
val data : Map<string, obj>
val msg : string
end
But the structure isn't explicit and there isn't an easy way to subscribe to specific events. We should consider the more appropriate way to expose these events.
IObservable
Loggary
Currently, decoding of individual fields returns a tuple corresponding to the value and the new state of the read buffer. Since tuples are heap-allocated (for now), this incurs a notable GC cost. The recourse is to create a purpose built struct to contain the pair.
The current code gives a pretty heavy hit on the build time. This is partly due to the statically resolved generic dispatch for read/write/size. These could be replaced with explicitly called implementations for primitives like int64, int32, int16, byte, and so forth.
During this process, we should keep track of the performance of our protocol serialization performance. I plan on using a few simple message samples in a round trip loop. It's certainly primitive but better than nothing for the most performance sensitive parts of the code.
We'll want to keep an eye on allocations here too. The structs help but we pack them into arrays in some cases, moving them off the stack. It may be cheaper to use a ArraySegment style api to allow unpacking structures on demand rather than returning values immediately. For message sets, this could allow better streamed processing of larger batches.
As the community looks to contribute or simply follow along, I think writing up something more substantial than the current issues list would help dramatically.
Use case -> send messages to different partitions in batches, so that they consumed in the same order for the same key. Assuming that’s should be better than sending one by one.
let produceBatch (p:Producer) (createBatch:PartitionCount -> Partition * ProducerMessage[])
allows to send only to a single partition, but messages should go to different partitions.
So, produceBatch must be called multiple times, at the same time it’s unknown how many because can’t determine the distribution until received partitionCount, which can be received only inside createBatch after called produceBatch as partitionCount is not exposed in any other way.
So, e.g. would be ok in case possible to form batches before calling “produceBatch”.
Or provide the API @ kafunk/src/kafunk/Producer.fs :
++ let produceBatch (p:Producer) (msgs:ProducerMessage[])
The current topic metadata handling is hacked together to allow progress to be made on other parts of the client. Right now we need to refine how we manage the metadata and then do proper ejection of stale values based on error code feedback from Kafka.
Stack traces will sometimes include the entirety of the message content to their logfile. For large messages this can generate a lot of logs, which may impact log forwarding systems. Suggest truncating logs unless a special flag is set.
Thanks
We need:
Currently, TCP framing is implemented by transforming an octet stream from the socket to a stream of messages decoded via length-prefix framer. This may be inefficient due to allocations of async sequence nodes. This is worth a concentrated benchmark in addition to benchmarking the entire path.
We need to complete the consumer groups protocol. There is a POC implementation, but a great deal of work remains. There is a dependency on #7.
The link to library design notes is a 404 on https://jet.github.io/kafunk/.
Implement a logging.
Any preference or dislike on loggers (I generally use serilog behind the microsoft.framework logging generic interface these days).
Regarding the logger, we use NLog internally, but we can revisit this decision for the Kafka client. I’ll take a look at serilog.
Currently, we expose the wire protocol to consumers of the client. This may not be ideal. A few options are:
Currently, the consumer callback is as follows:
val callback : TopicName -> Partition -> MessageSet -> commit:Async<unit> -> Async<unit>
The commit
computation will commit offsets corresponding to the message set in the topic-partition. This should be invoked after the message set has been processed to ensure at-least-once delivery. However, this commit does not need to complete before the processing of the next message set can begin. An commit mode should be supported where commits are queued on a per-topic-partition basis, keeping only the latest, and invoked periodically. This would ensure commits are performed in the correct order, and eliminate them from the critical path of message set processing.
Above the lower level apis, we should provide a managed producer API. This will handle concerns such as connection multiplexing, topic metadata refresh (and creation if enabled), batching single messages into message sets, and error handling (retry, back off, throttling).
The design will be done from the application's point of view. We'll start by building up some module signatures and go from there. The implementation will try to align with the mechanisms we already use (like AsyncSeq and DVar?).
Currently, there is a single script in the tests project which invokes various protocol operations on a running Kafka instance. This should be split up to capture individual operations.
Add exponential and randomized constructors for RetryPolicy
https://github.com/jet/kafunk/blob/master/src/kafunk/Utility/Faults.fs#L72
Something like (taken from another codebase):
let private checkOverflow x =
if x = System.Int32.MinValue then 2000000000
else x
/// Creates a back-off strategy which increases the interval exponentially.
let exp (initialIntervalMs:int) (multiplier:float) : Backoff =
fun i -> (float initialIntervalMs) * (pown multiplier i) |> int |> checkOverflow |> Some
/// Randomizes the output produced by a back-off strategy:
/// randomizedInterval = retryInterval * (random in range [1 - randomizationFactor, 1 + randomizationFactor])
let rand (randomizationFactor:float) =
let rand = new System.Random()
let maxRand,minRand = (1.0 + randomizationFactor), (1.0 - randomizationFactor)
map (fun x -> (float x) * (rand.NextDouble() * (maxRand - minRand) + minRand) |> int)
Currently, GZip is supported. For Snappy, need to find a good .NET library.
Seems both yml files are already in place so this is just a matter of jet activating the appveyor and travis build servers.
Currently, CodecTests.fs tests a few of the message types. We should have tests for all of them, ideally.
We've incorporated some great changes from a branch which has much more of the protocol working. The main issue is that it has evolved quite a bit from where master was making a direct merge very hard. In the spirit of getting the core protocol working, it seems best to establish this code as a base and port contributions to this branch.
We'll use this issue to track items to port. A few main areas come to mind: logging, documentation, and using external libraries in some places. Some of these may be applied a bit differently, for example, we'll use some of the event driven instrumentation ideas to add logging support for multiple adapters including Logary.
Currently, offsets are auto-committed only if they've changed from previously committed offsets. However, due to the way offset retention works as described in
https://issues.apache.org/jira/browse/KAFKA-3806
Offsets should be committed even if they aren't changing to prevent them from getting lost.
An interim workaround is to specify a value for offsetRetentionTime
in ConsumerConfig to a value other than the default of -1, which uses the server's offset retention time. This value is expressed in milliseconds.
In the event that a consumer offset isn't present for a given partition, the current calculation of high watermark offset minus consumer offset does not result in an accurate lag. This should be changed the result of high watermark offset minus low watermark offset.
Currently, the consumer makes individual fetch requests per partition. This doesn't take advantage of request batching. Some steps:
Expose Metadata
field on OffsetCommitRequest
and OffsetFetchResponse
.
Some ideas for names:
A logo would be cool too.
Thoughts?
Looking at https://github.com/jet/kafunk/blob/master/support/Cluster.fsx I think it's high time to create Docker helpers for FAKE.
I've talked abut it with Steffen some time ago, but there wasn't anyone else who would use it / asking about this feature. But since it looks like you guys are using Docker, and FAKE together, I think it might be good idea to create normal, nicer helpers for Docker.
CC: @forki
In the consumer module, make the assignment strategy configurable.
Add TCP SSL support to the Chan
module.
After installing Mono Framework I executed build.sh
and ended up with the following Errors and Failures. Is there a way to build kafunk without executing the tests?
Runtime Environment
OS Version: MacOSX 14.5.0.0
CLR Version: 4.0.30319.42000
Test Files
/Users/jacek/dev/oss/fsharp-dev-area/kafunk/tests/kafunk.Tests/bin/Release/Kafunk.Tests.dll
Errors and Failures
1) Failed : FaultsTests.AsyncFunc.timeoutResult should return timeout result and cancel when past timeout
expected=true
actual=false
at Kafunk.Testing.shouldEqual[a] (a expected, a actual, Microsoft.FSharp.Core.FSharpOption`1[T] msg) [0x0005e] in <586a5f4d6cd901b4a74503834d5f6a58>:0
at FaultsTests.AsyncFunc.timeoutResult should return timeout result and cancel when past timeout () [0x000cc] in <586a5f4d6cd901b4a74503834d5f6a58>:0
Run Settings
DisposeRunners: True
DefaultTimeout: 1200000
WorkDirectory: /Users/jacek/dev/oss/fsharp-dev-area/kafunk
ImageRuntimeVersion: 4.0.30319
ImageRequiresX86: False
ImageRequiresDefaultAppDomainAssemblyResolver: False
NumberOfTestWorkers: 8
Test Run Summary
Overall result: Failed
Test Count: 21, Passed: 20, Failed: 1, Inconclusive: 0, Skipped: 0
Failed Tests - Failures: 1, Errors: 0, Invalid: 0
Start time: 2017-01-02 14:10:23Z
End time: 2017-01-02 14:10:33Z
Duration: 10.438 seconds
Results (nunit3) saved as TestResult.xml
Running build failed.
Error:
NUnit test failed (1).
---------------------------------------------------------------------
Build Time Report
---------------------------------------------------------------------
Target Duration
------ --------
Clean 00:00:00.0036445
AssemblyInfo 00:00:00.0185139
Build 00:00:12.4788000
CopyBinaries 00:00:00.0075849
Total: 00:00:24.2204278
Status: Failure
---------------------------------------------------------------------
1) Fake.UnitTestCommon+FailedTestsException: NUnit test failed (1).
at Fake.Testing.NUnit3.NUnit3 (Microsoft.FSharp.Core.FSharpFunc`2[T,TResult] setParams, System.Collections.Generic.IEnumerable`1[T] assemblies) [0x00118] in <5864dd88ccf1c534a745038388dd6458>:0
at [email protected] (Microsoft.FSharp.Core.Unit _arg7) [0x00014] in <4cdaca18880e4ee180217aec4ce648f0>:0
at Fake.TargetHelper+targetFromTemplate@195[a].Invoke (Microsoft.FSharp.Core.Unit unitVar0) [0x00001] in <5864dd88ccf1c534a745038388dd6458>:0
at Fake.TargetHelper.runSingleTarget (Fake.TargetHelper+TargetTemplate`1[a] target) [0x00040] in <5864dd88ccf1c534a745038388dd6458>:0
---------------------------------------------------------------------
Currently, the logic to route requests to brokers is contained in the connection. Fetch, Produce and Offset requests can be addressed to multiple brokers based on the partitions involved. When the consumer sends fetch requests, it does so for the entire set of partitions assigned to it. The connection then routes them to the appropriate brokers in a fork-join fashion. This join however is needless and reduces utilization/parallelism. It is better to have a fetch process per broker independent of the fetch processes of other brokers. Either way, the fetch processes publish messages into an exchange which distributes into per-partition streams.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.