- Kafka-JS
- Socket-IO (Server & Client)
- React
- Dotenv
Atomic Kafka currently supports running Apache Kafka clusters either using a Docker image or by connecting to Confluent Cloud.
Docker:
- Download this .yml and run the following command in your terminal:
docker-compose up -d
Confluent Cloud:
- Follow the steps on Confluent Cloud to create a free account with Confluent cloud. Obtain the API_ACCESS_KEY, API_ACCESS_SECRET, and BOOTSTRAP_SERVER
Include the following lines in your .env depending on your Kafka environment.
- Docker .env config: (API_KEY and API_SECRET are intentionally left blank)
PORT=<USER_DEFINED> API_KEY= API_SECRET= KAFKA_BOOTSTRAP_SERVER=localhost:9092 KAFKA_SSL=false
- Confluent Cloud .env config: (PORT intentionally left blank)
PORT=<USER_DEFINED> API_KEY=<API_ACCESS_KEY> API_SECRET=<API_ACCESS_SECRET> KAFKA_BOOTSTRAP_SERVER=<BOOTSTRAP_SERVER>
Initialize a server instance of your choice (HTTP, Node.js, etc). The example below contemplates a Node.js Express server.
ATTENTION: a Server instance must be created for every remote Atomic Kafka Client.
- Initialize and configure expressApp according to desired specifications.
- Require in AtomicKafkaServer.
- Define a server that listens on the user-defined PORT environment variable.
- Initialize an AtomicKafkaServer instance aks by passing in the server.
//initialize and configure expressApp according to user specifications
const AtomicKafkaServer = require('atomic-kafka/server')
const server = expressApp.listen(process.env.PORT, () => {
console.log(`Listening on port ${process.env.PORT}`);
})
const aks = new AtomicKafkaServer(server);
- Initailize a newConsumer on the aks instance and pass in the group_ID_string.
- Enable the built-in websocket by invoking socketConsume and passing in the group_ID_string, an event_string, and the topic_string.
atomicKafkaInstance.newConsumer('group_ID_string');
atomicKafkaInstance.socketConsume('group_ID_string', 'event_string', 'topic_string');
- Initailize a newProducer on the aks instance and pass in the topic_string.
- Enable the built-in websocket by invoking globalProducer and passing in an event_string and the topic_string.
atomicKafkaInstance.newProducer('topic_string');
atomicKafkaInstance.globalProduce('postMessage', 'test_topic')
import AtomicKafkaClient from 'atomic-kafka/client';
declare function require(name:string)
const AtomicKafkaClient = require('atomic-kafka/client').default
- Initialize akc as an AtomicKafkaClient. Pass in AtomicKafkaServer instance host's URI_STRING
- Define a callback to process message payload through the React state management tool of your choice.
- Implement useInterval to consume from the kafka cluster on interval.
- Return the invocation of the consumer function on the akc instance. Pass in a user-defined websocket event_string, the previously defined callback, and the interval_delay in milliseconds.
function Consumer_Component () {
const akc = new AtomicKafkaClient(URI_STRING);
const callback = (payload) => {
//user-defined function definition
}
akc.useInterval(() => akc.consumer(<event_string>, callback), <interval_delay>)
}
- Initialize akc as an AtomicKafkaClient. Pass in AtomicKafkaServer instance host's URI_STRING
- Generate a payload formatted as an arbitrarily-nested JSON object. The example below defines a payload, but it can be generated at any point in the client according to the user's specification.
- Invoke the consumer function. Pass in the websocket event_string and the payload.
function Producer_Component () {
const akc = new AtomicKafkaClient(URI_STRING);
const payload = {
//arbitrarily nested key value pairs
};
akc.producer(<event_string>, payload);
}