Git Product home page Git Product logo

esq's Introduction

Embeddable Simple Queue

The library implements persistent queue data structure for Erlang applications.

Build Status Coverage Status Hex.pm Hex Downloads

Inspiration

Queuing is an essential features required to implement scalable and fault tolerant applications. Any asynchronous communication is build around queues. There are various queuing systems on the market RabbitMQ, Kafka, AWS SQS, AWS Kinesis, etc. Each Erlang process has in-memory queue -- mailbox. Sometimes, persistence of messages is required for robustness and reliability. The library implements embeddable queue (data structure) with message persistence that enhances traditional mailbox features of Erlang processes.

Getting started

The latest version of the library is available at its master branch. All development, including new features and bug fixes, take place on the master branch using forking and pull requests as described in contribution guidelines.

Installation

The stable library release is available via hex packages, add the library as dependency to rebar.config

{deps, [esq]}.

Usage

The library exposes public interface through exports of esq.erl module. Just call required function with required arguments, check out Key features for details.

Build library and run the development console

make
make run

Key features

Queue-compatible interface

The library implements mutable queue data structure due to file I/O. The data structure is a product of in-memory head and on-disk persistent tail. The head is kept in memory using dequeue data structure. It's capacity is limited to C messages. The disk queue is built as chain of files (64MB each segment). The queue rotates file segment when head is fully consumed by application.

Let's take a short tour to the queue interface

%%
%% create an empty queue data structure
{ok, Q} = esq:new("/tmp/q").

%%
%% enqueue the message to queue, it returns a new copy of the queue.
ok = esq:enq(a, Q).

%%
%% enqueue multiple messages to queue
[esq:enq(X, Q) || X <- [b, c, d, e]].


%%
%% dequeue message(s) from queue, it returns a list of elements
%% each element is the map #{payload => payload()} that carries payload 
%% and other message properties
[#{payload := a}] = esq:deq(Q).

%%
%% dequeue multiple messages from queue
_ = esq:deq(4, Q).

Message persistency

Queue persistency uses sequential disk I/O. It implements a queue as collection of file segments. Messages are appended to last file segments during enqueue operation and read from first segment.

         head             tail                                
         +----------+     +---+   +---+       +---+            
deq <----+    C     <-----+ q |   | q |  ...  | q <-------+ enq
         +----------+     +---+   +---+       +---+            
                             ro      ro          wr

|-----( in-memory )-----|---------( on-disk )---------| 

Any point of time a segment is either open for write or read. Segments are rotated with a frequency defined by time-to-sync (tts) timer. Note this timer might cause and invisibility of overflow message if dequeue rate is higher then enqueue. The queue always writes message to last disk segment.

In-flight capabilities

A message is in-flight after it's dequeue from a queue by a consumer, but not yet acknowledged. There is no guarantee in distributed system that the consumer receive and process message. Thus, the consumer must explicitly acknowledge message using its receipt identity.

         head             tail                                
         +----------+     +---+   +---+       +---+            
deq <-+--+    C     <-----+ q |   | q |  ...  | q <-------+ enq
      |  +----------+     +---+   +---+       +---+            
      |   ^                  ro      ro          wr            
      |   |                                                    
      |  ++---------+                                          
ack +-+-->    C     |                                          
         +----------+                                          
         in-flight heap                                                  

Let's evaluate the in-flight feature.

%%
%% create a queue and enable in-flight feature using time-to-flight (ttf)
{ok, Q} = esq:new("/tmp/q", [{capacity, 10}, {ttf, 5000}]).

%%
%% enqueue multiple messages to queue
[esq:enq(X, Q) || X <- [a, b, c, d, e, f, g, h]].

%%
%% dequeue message and read it's receipt
[#{receipt := Receipt}] = esq:deq(Q).

%%
%% acknowledge the message to queue
esq:ack(Receipt, Q).

%%
%% message becomes visible to consumer again if acknowledgement is not
%% delivered with-in time-to-flight
[#{payload := X}] = esq:deq(Q).

timer:sleep(6000).

[#{payload := X}] = esq:deq(Q).

Queue timers

  • ttl message time-to-live in milliseconds, expired messages are evicted from queue. The eviction process is executed during reads.
  • ttf message time-to-flight in milliseconds, the time required to deliver message acknowledgment before it reappears to client(s) again. If parameter is not defined then in-flight heap is not used and message acknowledgment is not required.
  • tts queue time-to-sync (rotate) file segments in milliseconds. Any enqueue message might remain invisible until sync is performed.

Performance

The queue performance is evaluated using basho benchmark with 25% dequeue and 75% enqueue workload on MacBook Pro, Intel Core i7, 2.8GHz, 16GB 2133 MHz LPDDR3, 256 SSD

Queue performance

How to Contribute

The daemon is Apache 2.0 licensed and accepts contributions via GitHub pull requests:

  • Fork the repository on GitHub
  • Read build instructions
  • Make a pull request

The build process requires Erlang/OTP version 19.0 or later and essential build tools.

Build and run service in your development console. The following command boots Erlang virtual machine and opens Erlang shell.

git clone https://github.com/fogfish/esq
cd esq
make
make run

Now you are able to create queues and debug them.

commit message

The commit message helps us to write a good release note, speed-up review process. The message should address two question what changed and why. The project follows the template defined by chapter Contributing to a Project of Git book.

Short (50 chars or less) summary of changes

More detailed explanatory text, if necessary. Wrap it to about 72 characters or so. In some contexts, the first line is treated as the subject of an email and the rest of the text as the body. The blank line separating the summary from the body is critical (unless you omit the body entirely); tools like rebase can get confused if you run the two together.

Further paragraphs come after blank lines.

Bullet points are okay, too

Typically a hyphen or asterisk is used for the bullet, preceded by a single space, with blank lines in between, but conventions vary here

bugs

If you experience any issues with the library, please let us know via GitHub issues. We appreciate detailed and accurate reports that help us to identity and replicate the issue.

  • Specify the configuration of your environment. Include which operating system you use and the versions of runtime environments.

  • Attach logs, screenshots and exceptions, in possible.

  • Reveal the steps you took to reproduce the problem.

Changelog

The library uses semantic versions to identify stable releases.

  • 1.0.0 - a simplified persistent queue
  • 0.8.5 - a stable release of queue that supports various back-ends

License

Copyright 2013 Dmitry Kolesnikov

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

esq's People

Contributors

fogfish avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

esq's Issues

Head functionality to handle multiple requests

Hi,

currently with deq/2, i can get multiple elements dequeued at a time. I would like to request, is there anyway i can read multiple request at a time using head operation, similar to the way deq works.

Fails to restart after process crash

terminated with reason: no match of right hand value {error,eexist} in esq_writer:open/1 line 75

The error is caused by Out-Of-Disk space. The segment rotation shall handle errors carefully.

How to build locally with otp-25?

Hi,

I wanted to give this a go locally, but I'm unable to build it with the recent otp.
I haven't been following erlang too closely lately; do you perhaps know if there's a quick fix for the following error?

Thanks!

Erlang/OTP 25 [erts-13.1.2] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [jit:ns]
12:33:32 carb ~/dev/esq
master $ make
==> install rebar (3.20.0)
===> Verifying dependencies...
===> Analyzing applications...
===> Compiling datum
===> Compiling _build/default/lib/datum/src/maplike/heap.erl failed
_build/default/lib/datum/src/maplike/heap.erl:69:2: type heap() is unused
_build/default/lib/datum/src/maplike/heap.erl:72:2: type rank() is unused

make: *** [erlang.mk:84: compile] Error 1

Validate queue options and exit if option is not acceptable

Hello,

Having the following queue:

{ok, Q}= esq:new( "priv/client_producer", [{tts, 0}, {capacity, 0}]).

After I add few items inside the queue and restart the app and do:

esq:head(Q).

I always receive undefined but esq:deq always returns items. I'm expecting head to return same stuff as deq but without removing the item from queue.

I'm using [{tts, 0}, {capacity, 0}] to make sure all items are going on the disk directly. I'm wrong ?

Silviu

Any new release planned

Hello,

I see master if behind the other branch. any release planned with on-disk-overflow ?

why there is a need for storing a hash for each message

Hello @fogfish,

Looking to the code I see you are storing a hash for each message when writing to the file and also you check if it's match when reading.

I understand that you are checking the data integrity but what are your concern here for this overhead ?

Silviu

broken build

Hello,

IN the last change you removed feta but it's still in app.src

{applications,[kernel,stdlib,feta,datum,pipe,uid]},

Silviu

Length of the queue

A function -spec esq:length(_) -> integer() is a nice feature. There are no rocket science to make it for in-memory queue. However, the file queue requires persistency of counter that would cause extra unnecessary disk I/O.

Alternative option `-spec esq:empty(_) -> true | false.'

Larger Data packets are not getting de-queued

Hi,

I was trying to enque and deque the large data packet using esq's. I was able to enque a 4MB file. But while trying to deque its returning [] list, and data is lost from the disk.

In the README section of the document, it is mentioned as below
The head is kept in memory using dequeue data structure. It's capacity is limited to C messages.
which is of 64KB limit(tested).

Is there anyways we can increase the in-flight memory in holding a larger packets.

rebar2.config is missing from hex.

This package is a dependency of erlkaf and when erlkaf is build as a dependency in a mix project, there is an error:

Error evaluating Rebar config script ./rebar.config.script:22: evaluation failed with reason error:{badmatch,{error,enoent}} and stacktrace [{erl_eval,expr,5,[{file,[101,114,108,95,101,118,97,108,46,101,114,108]}
,{line,450}]},{erl_eval,exprs,5,[{file,[101,114,108,95,101,118,97,108,46,101,114,108]},{line,123}]},{erl_eval,expr_list,6,[{file,[101,114,108,95,101,118,97,108,46,101,114,108]},{line,893}]},{erl_eval,expr,5,[{fil
e,[101,114,108,95,101,118,97,108,46,101,114,108]},{line,426}]},{file,eval_stream2,6,[{file,[102,105,108,101,46,101,114,108]},{line,1504}]},{file,script,2,[{file,[102,105,108,101,46,101,114,108]},{line,1142}]},{'E
lixir.File','cd!',2,[{file,[108,105,98,47,102,105,108,101,46,101,120]},{line,1560}]},{'Elixir.Mix.Rebar',eval_script,2,[{file,[108,105,98,47,109,105,120,47,114,101,98,97,114,46,101,120]},{line,204}]}]            
Any dependencies defined in the script won't be available unless you add them to your Mix project                                                                                                                   
Error evaluating Rebar config script ./rebar.config.script:22: evaluation failed with reason error:{badmatch,{error,enoent}} and stacktrace [{erl_eval,expr,5,[{file,[101,114,108,95,101,118,97,108,46,101,114,108]}
,{line,450}]},{erl_eval,exprs,5,[{file,[101,114,108,95,101,118,97,108,46,101,114,108]},{line,123}]},{erl_eval,expr_list,6,[{file,[101,114,108,95,101,118,97,108,46,101,114,108]},{line,893}]},{erl_eval,expr,5,[{fil
e,[101,114,108,95,101,118,97,108,46,101,114,108]},{line,426}]},{file,eval_stream2,6,[{file,[102,105,108,101,46,101,114,108]},{line,1504}]},{file,script,2,[{file,[102,105,108,101,46,101,114,108]},{line,1142}]},{'E
lixir.File','cd!',2,[{file,[108,105,98,47,102,105,108,101,46,101,120]},{line,1560}]},{'Elixir.Mix.Rebar',eval_script,2,[{file,[108,105,98,47,109,105,120,47,114,101,98,97,114,46,101,120]},{line,204}]}]            
Any dependencies defined in the script won't be available unless you add them to your Mix project  

If I add the rebar2.config file manually, the error is gone.

I don't know rebar very well, neither management of erl apps in Hex, but would it be a solution to add the rebar2.config file to the hex package?

Thank you

Wrap a queue data structure into process.

The pure data structure is usable only if queue is immutable. The usage of file descriptor make is the structure mutable. Existed API is complicated enough. The usage of Erlang process simplify the client interface.

Lock a disk segment if in-flight heap contains its message

In-flight capability implements a simple protocol that ensures message retransmission in case of consumer failure. In-flight heap is pure in-memory segment. Existed queue behaviour deletes the on-disk segment one its last message reads (delete happens during rotation). The crash of Erlang node will cause lost of in-flight heap. We need to ensure that disk segments are deleted only when its messages are acknowledged by consumer.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.