Git Product home page Git Product logo

iris-bufferqueue's Introduction

Iris-BufferQueue

Iris-BufferQueue is a message queue implemented as a Java library. It is primarily intended for buffering data on local host before processing it asynchronously.

Iris-BufferQueue has the following characteristics:

  • It's an in-process queue, although writing to the queue and consuming from it can happen in two different processes
  • It's persisted on disk, preserving data across application restarts and crashes
  • Publishing to the queue can happen in parallel from multiple threads; no synchronisation is required
  • Consumption from the queue can only be done by a single thread
  • Consumption can be happen in batches

These characteristics are explained in greater detail in the documentation.

Usage

Get the Jar

Maven

Add the following repository to the 'repositories' section of your pom.xml

<repository>
  <id>clojars</id>
  <name>Clojars repository</name>
  <url>https://clojars.org/repo</url>
</repository>

And add the following dependency to the 'dependencies' section of your pom.xml.

<dependency>
  <groupId>com.flipkart.iris</groupId>
  <artifactId>bufferqueue</artifactId>
  <version>0.1</version>
</dependency>

Download

You can download the jar and find all the dependencies on Clojars.

Create an instance

File file = new File("test.ibq");
if (!file.exists()) {
    int maxDataLength = 4 * 1024; // max size of data that can be written to the queue
    long numMessages = 1000000; // maximum number of unconsumed messages that can be kept in the queue
    MappedBufferQueueFactory.format(file, maxDataLength, numMessages);
}
BufferQueue bufferQueue = MappedBufferQueueFactory.getInstance(file);

Publish

Let's publish a simple message to the queue.

byte[] data = "Hello world!".getBytes();
High-level API
bufferQueue.publish(data);
Low-level API
BufferQueueEntry entry = bufferQueue.next().orNull();
if (entry != null) {
	try {
	    entry.set(data);
	}
	finally {
	    entry.markPublished();
	}
}
else {
	System.out.println("Queue full, cannot write message");
}

It is important that the markPublished() call is done within a finally block to ensure that it is always made.

Consuming from the queue

Simple API
BufferQueueEntry entry = bufferQueue.consume().orNull());
if (entry != null) {
    try {
        byte[] data = entry.get();
        System.out.println(data);
    }
    finally {
        entry.markConsumed();
    }
}
else {
	System.out.prinltn("Nothing to consume");
}

It is important that the markConsumed() call is done within a finally block to ensure that it is always made.

Batch API
int batchSize = 100;
List<BufferQueueEntry> entries = bufferQueue.consume(batchSize);
if (entries.size() > 0) {
    for (BufferQueueEntry entry : entries) {
        try {
            byte[] data = entry.get();
            System.out.println(data);
        }
        finally {
            entry.markConsumed();
        }
    }
}
else {
	System.out.prinltn("Nothing to consume");
}	

It is important that the markConsumed() call is done within a finally block to ensure that it is always made.

Documentation

TODO: Point to detailed design, usage and API docs.

Contribution, Bugs and Feedback

For bugs, questions and discussions please use the Github Issues.

Please follow the contribution guidelines when submitting pull requests.

LICENSE

Copyright 2014 Flipkart Internet Pvt. Ltd.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

iris-bufferqueue's People

Contributors

sids avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

iris-bufferqueue's Issues

consume() and consume(n) can return entries which are yet to be published

Apparently there are 3 states of a BufferQueueEntry -> UnPublished, Published and Consumed. Additionally, BufferQueueEntry.isPublished() and BufferQueueEntry.isConsumed() can both return 'true' for the same entry. I believe this is as-designed.

isPublished isConsumed Effective state Possible?
false false UnPublished Possible
false true - Not Possible
true false Published Possible
true true Consumed Possible

In consume(), we are checking for entry.isPublished() which is true implying, the entry can be in Consumed state as well.
In consume(n), the exit condition is !entry.isPublished(); which implies entries which are consumed can be added as well.
So consider this case

1 2 3 4 5 6 7 8 9 10
C C C P P P C C C C

If we call consume(6), when readCursor is at 3 and writeCursor is at 8 during the call to next but before "mappedEntries.makeEntry(n = 8)" call -> forwardReadCursor() will move the readCursor to 4 and the for-loop will add the elements 4, 5, 6 and 7 to the list (note that BufferQueueEntry[7].isPublished() will return true.

And if the consumer code chose to ignore elements based on 'isConsumed()', then we will lose an entry which will be eventually published.

Thread safety issues in forwardReadCursor() when used in next()

forwardReadCursor() can return same value for 2 concurrent calls. Having established that, if we try and use this function when the queue has only 1 vacancy, it can overwrite an already published entry.

public Optional<BufferQueueEntry> next() {
    if (writeCursor.get() - readCursor.get() >= capacity()) {
        forwardReadCursor(); //<---------- #(1)
        if (writeCursor.get() - readCursor.get() >= capacity()) {
            return Optional.absent();
        }
    }

    long n = writeCursor.getAndIncrement(); //<---------- #(2)
    return Optional.of(mappedEntries.makeEntry(n)); //<---------- #(3)
}

If there were 2 threads, and execution got interleaved at #(1), then, both the thread will proceed to #(2) and #(3), thereby overwriting previous value at (n % capacity()).

size() method returns wrong value

If we publish 5 messages, and in tight loop, call BufferQueue.consume() exactly 5 times followed by BufferQueueEntry.markConsumed(), the size() method returns 1 instead of 0.

See testcase in testSizeConsistencyBug() in com.flipkart.iris.bufferqueue.mmapped.MappedBufferQueueTest

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.