Git Product home page Git Product logo

alamut.kafka's Introduction

Alamut.Kafka

The purpose of this library is to easy to use Apache.Kafka in Dotnet (Console, ASP.NET, Web API) application on top of Dotnet Core Dependency Injection infrastructure.
Actually it's a wrapper of Confluent's Apache Kafka .NET client

Documentation is based on v0.8.1


Installing Alamut.Kafka

You should install Alamut.Kafka with NuGet:

Install-Package Alamut.Kafka

Or via the .NET Core command-line interface:

dotnet add package Alamut.Kafka

Either commands, from Package Manager Console or .NET Core CLI, will download and install all required dependencies.


Basic Configuration

These simple basic settings are needed to communicate with Kafka
*This configuration will be used for both ProducerConfig and ConsumerConfig *

"KafkaConfig": {
    "BootstrapServers": "10.104.51.12:9092,10.104.51.13:9092,10.104.51.14:9092",
    "GroupId": "alamut.group",
    // All Producer and Consumer configuration
  }

You have to inject configuration into your DI :

// for Consumer configuration
services.AddPoco<ConsumerConfig>(Configuration, "KafkaConfig");
// for Producer configuration
services.AddPoco<ProducerConfig>(Configuration, "KafkaConfig");

AddPoco is an Alamut Helper

comprehensive samples


Producer

Producer publish a message into specified topic. We ususally use IPublisher as a producer in our application. The publisher could publish a message in a variety types of data structure:

  • String (use native Kafka Client's serializer)
  • Object (serialize it to JSON)
  • IMessage

Producer Sample

IPublisher publisher = new KafkaProducer(*/dependencies provided by DI*/);

// string message
await publisher.Publish("alamut.messaging.kafka", "a string message");

// object message
var objectMessage = new Foo
{
    Bar = message
};
await publisher.Publish("alamut.messaging.kafka", objectMessage);

// IMessage message
var typedMessage = new Foo
{
    Bar = message
};
await publisher.Publish("alamut.messaging.kafka", MessageFactory.Build(typedMessage));

we will talk about the MessageFactory in more details latter.

Register Producer
If you want to get IPublisher through DI you should register it in project Startup:

services.AddSingleton<IPublisher, KafkaProducer>();

Consumer

Consumer subscribes to the specified topic(s) and works as a Background Hosted Service.

Consumer automatically calls the classes that implemented IMessageHandler interface that decorated with TopicsAttribute.

Message Handler Sample:

using System.Threading;
using System.Threading.Tasks;

using Alamut.Abstractions.Messaging;
using Alamut.Kafka.Models;
using Microsoft.Extensions.Logging;

namespace Alamut.Kafka.Consumer.Subscribers
{
    [Topics("alamut.messaging.kafka")]
    public class SendSmsGeneric : IMessageHandler<Message<Foo>>
    {
        private readonly ILogger _logger;

        public SendSmsGeneric(ILogger<SendSmsGeneric> logger)
        {
            _logger = logger;

        }
        public Task Handle(Message<Foo> message, CancellationToken token)
        {
            _logger.LogInformation($"Received message <{ message.Body.Bar }>");

            return Task.CompletedTask;
        }
    }
}

In the example above SendSmsGeneric handles FooMessage that published in alamut.messaging.kafka Topic. (other Message handlers have not yet documented)

Consumer Registration and Wiring

  • First of all, you need a simple configuration that described above.
  • Register Message Handlers:
    services.RegisterMessageHandlers(typeof(SendSmsGeneric).Assembly);
    registers all classes that implemented IMessageHandler in the specified assembly.
  • Then you have to register your Hosted Service to subscribe to Kafka Messages, There are two ways:
    • Register Hosted Service with default GroupId and specified Topics that discovered in RegisterMessageHandlers section:
      services.AddHostedSubscriber();
    • Register Hosted Service for specifics Topic(s):
      • services.AddNewHostedSubscriber("alamut.messaging.kafka", ... );
        in this case, a Hosted Services registered and handles just provided Topic(s).
      • services.AddNewHostedSubscriber(KafkaHelper.GetAllTopics(typeof(SendSmsGeneric).Assembly));
        a Hosted Service registered and handles topics provide by KafkaHelper.GetAllTopics

With these two steps your wiring will be completed. (example)
There are other ways to do this that will be explained later

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.