Git Product home page Git Product logo

memphis.js's Introduction

Memphis light logo

Memphis light logo

Simple as RabbitMQ, Robust as Apache Kafka, and Perfect for microservices.

Memphis UI

CNCF Silver Member

CNCF Silver Member

Sandbox - Docs - Twitter - YouTube

Discord Code Of Conduct GitHub release (latest by date)

Memphis is a next-generation message broker.
A simple, robust, and durable cloud-native message broker wrapped with
an entire ecosystem that enables fast and reliable development of next-generation event-driven use cases.

Memphis enables building modern applications that require large volumes of streamed and enriched data,
modern protocols, zero ops, rapid development, extreme cost reduction,
and a significantly lower amount of dev time for data-oriented developers and data engineers.

Installation

$ npm install memphis-dev

Importing

for javascript, you can choose to use the import or required keyword

const memphis = require('memphis-dev');

for Typescript, use the import keyword to aid for typechecking assistance

import memphis from 'memphis-dev';
import type { Memphis } from 'memphis-dev/types';

To leverage Nestjs dependency injection feature

import { Module } from '@nestjs/common';
import { MemphisModule, MemphisService } from 'memphis-dev/nest';
import type { Memphis } from 'memphis-dev/types';

Connecting to Memphis

First, we need to connect with Memphis by using memphis.connect.

/* Javascript and typescript project */
await memphis.connect({
            host: "<memphis-host>",
            port: <port>, // defaults to 6666
            username: "<username>", // (root/application type user)
            connectionToken: "<broker-token>", // you will get it on application type user creation
            reconnect: true, // defaults to true
            maxReconnect: 3, // defaults to 3
            reconnectIntervalMs: 1500, // defaults to 1500
            timeoutMs: 1500, // defaults to 1500
            // for TLS connection:
            keyFile: '<key-client.pem>',
            certFile: '<cert-client.pem>',
            caFile: '<rootCA.pem>'
      });

Nest injection

@Module({
    imports: [MemphisModule.register()],
})

class ConsumerModule {
    constructor(private memphis: MemphisService) {}

    startConnection() {
        (async function () {
            let memphisConnection: Memphis;

            try {
               memphisConnection = await this.memphis.connect({
                    host: "<memphis-host>",
                    username: "<application type username>",
                    connectionToken: "<broker-token>",
                });
            } catch (ex) {
                console.log(ex);
                memphisConnection.close();
            }
        })();
    }
}

Once connected, the entire functionalities offered by Memphis are available.

Disconnecting from Memphis

To disconnect from Memphis, call close() on the memphis object.

memphisConnection.close();

Creating a Station

If a station already exists nothing happens, the new configuration will not be applied

const station = await memphis.station({
    name: '<station-name>',
    schemaName: '<schema-name>',
    retentionType: memphis.retentionTypes.MAX_MESSAGE_AGE_SECONDS, // defaults to memphis.retentionTypes.MAX_MESSAGE_AGE_SECONDS
    retentionValue: 604800, // defaults to 604800
    storageType: memphis.storageTypes.DISK, // defaults to memphis.storageTypes.DISK
    replicas: 1, // defaults to 1
    idempotencyWindowMs: 0, // defaults to 120000
    sendPoisonMsgToDls: true, // defaults to true
    sendSchemaFailedMsgToDls: true // defaults to true
});

Creating a station with Nestjs dependency injection

@Module({
    imports: [MemphisModule.register()],
})

class stationModule {
    constructor(private memphis: MemphisService) { }

    createStation() {
        (async function () {
                  const station = await this.memphis.station({
                        name: "<station-name>",
                        schemaName: "<schema-name>",
                        retentionType: memphis.retentionTypes.MAX_MESSAGE_AGE_SECONDS, // defaults to memphis.retentionTypes.MAX_MESSAGE_AGE_SECONDS
                        retentionValue: 604800, // defaults to 604800
                        storageType: memphis.storageTypes.DISK, // defaults to memphis.storageTypes.DISK
                        replicas: 1, // defaults to 1
                        idempotencyWindowMs: 0, // defaults to 120000
                        sendPoisonMsgToDls: true, // defaults to true
                        sendSchemaFailedMsgToDls: true // defaults to true
                  });
        })();
    }
}

Retention types

Memphis currently supports the following types of retention:

memphis.retentionTypes.MAX_MESSAGE_AGE_SECONDS;

Means that every message persists for the value set in retention value field (in seconds)

memphis.retentionTypes.MESSAGES;

Means that after max amount of saved messages (set in retention value), the oldest messages will be deleted

memphis.retentionTypes.BYTES;

Means that after max amount of saved bytes (set in retention value), the oldest messages will be deleted

Storage types

Memphis currently supports the following types of messages storage:

memphis.storageTypes.DISK;

Means that messages persist on disk

memphis.storageTypes.MEMORY;

Means that messages persist on the main memory

Destroying a Station

Destroying a station will remove all its resources (producers/consumers)

await station.destroy();

Attaching a Schema to an Existing Station

await memphisConnection.attachSchema({ name: '<schema-name>', stationName: '<station-name>' });

Detaching a Schema from Station

await memphisConnection.detachSchema({ stationName: '<station-name>' });

Produce and Consume messages

The most common client operations are produce to send messages and consume to receive messages.

Messages are published to a station and consumed from it by creating a consumer. Consumers are pull based and consume all the messages in a station unless you are using a consumers group, in this case messages are spread across all members in this group.

Memphis messages are payload agnostic. Payloads are Uint8Arrays.

In order to stop getting messages, you have to call consumer.destroy(). Destroy will terminate regardless of whether there are messages in flight for the client.

Creating a Producer

const producer = await memphisConnection.producer({
    stationName: '<station-name>',
    producerName: '<producer-name>',
    genUniqueSuffix: false
});

Creating producers with nestjs dependecy injection

@Module({
    imports: [MemphisModule.register()],
})

class ProducerModule {
    constructor(private memphis: MemphisService) { }

    createProducer() {
        (async function () {
                const producer = await memphisConnection.producer({
                    stationName: "<station-name>",
                    producerName: "<producer-name>"
                });
        })();
    }
}

Producing a message

await producer.produce({
    message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema)
    ackWaitSec: 15 // defaults to 15
});

Add Header

const headers = memphis.headers();
headers.add('<key>', '<value>');
await producer.produce({
    message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema)
    headers: headers // defults to empty
});

or

const headers = { key: "value" }
await producer.produce({
    message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema)
    headers: headers
});

Async produce

Meaning your application won't wait for broker acknowledgement - use only in case you are tolerant for data loss

await producer.produce({
    message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema)
    ackWaitSec: 15, // defaults to 15
    asyncProduce: true // defaults to false
});

Message ID

Stations are idempotent by default for 2 minutes (can be configured), Idempotency achieved by adding a message id

await producer.produce({
    message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema)
    ackWaitSec: 15, // defaults to 15
    msgId: 'fdfd' // defaults to null
});

Destroying a Producer

await producer.destroy();

Creating a Consumer

const consumer = await memphisConnection.consumer({
    stationName: '<station-name>',
    consumerName: '<consumer-name>',
    consumerGroup: '<group-name>', // defaults to the consumer name.
    pullIntervalMs: 1000, // defaults to 1000
    batchSize: 10, // defaults to 10
    batchMaxTimeToWaitMs: 5000, // defaults to 5000
    maxAckTimeMs: 30000, // defaults to 30000
    maxMsgDeliveries: 10, // defaults to 10
    genUniqueSuffix: false,
    startConsumeFromSequence: 1, // start consuming from a specific sequence. defaults to 1
    lastMessages: -1 // consume the last N messages, defaults to -1 (all messages in the station)
});

To set Up connection in nestjs

import { MemphisServer } from 'memphis-dev/nest'

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      strategy: new MemphisServer({
        host: '<memphis-host>',
        username: '<application type username>',
        connectionToken: '<broker-token>'
      }),
    },
  );

  await app.listen();
}
bootstrap();

To Consume messages in nestjs

export class Controller {
    import { consumeMessage } from 'memphis-dev/nest';
    import { Message } from 'memphis-dev/types';

    @consumeMessage({
        stationName: '<station-name>',
        consumerName: '<consumer-name>',
        consumerGroup: ''
    })
    async messageHandler(message: Message) {
        console.log(message.getData().toString());
        message.ack();
    }
}

Passing context to message handlers

consumer.setContext({key: "value"});

Processing messages

consumer.on('message', (message, context) => {
    // processing
    console.log(message.getData());
    message.ack();
});

Acknowledge a message

Acknowledge a message indicates the Memphis server to not re-send the same message again to the same consumer / consumers group

message.ack();

Get headers

Get headers per message

headers = message.getHeaders();

Get message sequence number

Get message sequence number

sequenceNumber = message.getSequenceNumber();

Catching async errors

consumer.on('error', (error) => {
    // error handling
});

Destroying a Consumer

await consumer.destroy();

memphis.js's People

Contributors

idanasulin2706 avatar kayzethegeek avatar shay23b avatar yanivbh1 avatar valerabr avatar shohamroditimemphis avatar ormemphis avatar elchinmemphis avatar saarryan avatar avitaltrifsik avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.