Git Product home page Git Product logo

lfqueue's Introduction

lfqueue Build Status

Minimize lock-free queue, it's easy to use and easy to read. It's only 150 line code so it is easy to understand, best code for education ever!

Do not use these code in production

Support multiple comsumer and multiple producer at sametime.

Arch Build status Test
Linux Build Status On testing
Windows(msbuild) Build status Not tested Issue
Windows(Mingw) Build status Not tested
Windows(MinGW64) Build status Not tested
Windows(Cygwin) Build status Not tested
Windows(Cygwin64) Build status Not tested

Build Guide

In any gnu toolchain, just type make.

In any visual studio build toolchain, just type msbuild "Visual Stdio\lfqueue.sln" /verbosity:minimal /property:Configuration=Release /property:Platform=x86 or 64bit version msbuild "Visual Stdio\lfqueue.sln" /verbosity:minimal /property:Configuration=Release /property:Platform=x64.

How to use

Just copy past everywhere and use it. If you copy these code into your project.

Next Milestone

  • Compile on MACOS ( I do not have MAC, need somebody help!! )
  • Compile in kernel module.
  • Use lock-free memory manager. (free is very slow in windows)

Example

Sample example

It is an minimize sample code for how to using lfq.

Even if int or long value is valid input data, but you will hard to distinguish empty queue or other error message.

We not suggestion use int or long as queue data type

#include <stdio.h>
#include <stdlib.h>
#include "lfq.h"

int main() {
	long ret;
	struct lfq_ctx ctx;
	lfq_init(&ctx, 0);
	lfq_enqueue(&ctx,(void *)1);
	lfq_enqueue(&ctx,(void *)3);
	lfq_enqueue(&ctx,(void *)5);
	lfq_enqueue(&ctx,(void *)8);
	lfq_enqueue(&ctx,(void *)4);
	lfq_enqueue(&ctx,(void *)6);

	while ( (ret = (long)lfq_dequeue(&ctx)) != 0 )
		printf("lfq_dequeue %ld\n", ret);

	lfq_clean(&ctx);
	return 0;
}

Advance example

If you want to get best performance with the cost of developing speed, you can control thread_id yourself.

This API only impilement on HP branch

#include <stdio.h>
#include <stdlib.h>
#include "lfq.h"

#define MAX_CONSUMER_THREAD 4

int main() {
	long ret;
	struct lfq_ctx ctx;
	lfq_init(&ctx, MAX_CONSUMER_THREAD);
	lfq_enqueue(&ctx,(void *)1);
	
	// The second number is thread id, this thread id should unique between threads.
	// And this tid must less than MAX_CONSUMER_THREAD
	// In this sample code, this tid must 0, 1, 2, 3.
	ret = (long)lfq_dequeue_tid(&ctx, 1);

	lfq_clean(&ctx);
	return 0;
}

API

lfq_init(struct lfq_ctx *ctx, int max_consume_thread)

Init lock-free queue.

Arguments:

  • ctx : Lock-free queue handler.
  • max_consume_thread : Max consume thread numbers. If this value set to zero, use default value (16).

Return: The lfq_init() functions return zero on success. On error, this functions return negative errno.

lfq_clean(struct lfq_ctx *ctx)

Clean lock-free queue from ctx.

Arguments:

  • ctx : Lock-free queue handler.

Return: The lfq_clean() functions return zero on success. On error, this functions return -1.

lfq_enqueue(struct lfq_ctx *ctx, void * data)

Push data into queue.

Arguments:

  • ctx : Lock-free queue handler.
  • data : User data.

Return: The lfq_clean() functions return zero on success. On error, this functions return negative errno.

lfq_dequeue(struct lfq_ctx *ctx)

Pop data from queue.

Arguments:

  • ctx : Lock-free queue handler.

Return: The lfq_clean() functions return zero if empty queue. Return positive pointer. On error, this functions return negative errno.

lfq_dequeue_tid(struct lfq_ctx *ctx, int tid)

Pop data from queue.

Arguments:

  • ctx : Lock-free queue handler.
  • tid : Unique thread id.

Return: The lfq_dequeue_tid() functions return zero if empty queue. Return positive pointer. On error, this functions return negative errno.

Issues

ENOMEM

This lfqueue do not have size limit, so count your struct yourself.

Can i iterate lfqueue inner struct?

No, iterate inner struct is not threadsafe.

If you do not get segmentation fault because you are iterate lfqueue in single thread.

We should always iterate lfqueue by lfq_enqueue and lfq_dequeue method.

CPU time slice waste? Other thread blocking and waiting for swap?

Enqueue

No, CAS operator not lock. Loser not block and wait winner.

If a thread race win a CAS, other thread will get false and try to retrive next pointer. Because winner already swap tail, so other losers can do next race.

Example:

4 thread race push
1 win Push A, 2 lose, 1 do not have CPU time slice
1 win go out queue, 2 losers race A as tail, 1 race initnode as tail
1 win go out queue, 1 win push B, 1 lose, 1 race A failed because tail not initnode now

So lock-free queue have better performance then lock queue.

Dequeue

We choice Hazard Pointers to reaolve ABA problems. So dequeue is fast as enqueue.

There has many papers to resolve this problem:

Free memory very slow in Visual studio

Sorry, i have no idea why. Still finding problems in windows.

Contributions

pcordes

License

WTFPL

lfqueue's People

Contributors

darkautism avatar pcordes avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

lfqueue's Issues

having memory crash while multiple dequeue

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <assert.h>
#include <sys/time.h>
#include <stdio.h>
#include "lfq.h"


void one_enq_and_multi_deq(pthread_t *threads);
void*  worker_s(void *);
void*  worker_c(void *);

struct timeval  tv1, tv2;
#define total_put 50000
int nthreads = 4; //sysconf(_SC_NPROCESSORS_ONLN); // Linux
int one_thread = 1;
int nthreads_exited = 0;
//lfqueue_t *myq;
struct lfq_ctx *myq;

void*  worker_c(void *arg) {
	int i = 0;
	int *int_data;
	int total_loop = total_put * (*(int*)arg);
	while (i++ < total_loop) {
		/*Dequeue*/
		while ((int_data = lfq_dequeue(myq)) == 0) {

		}
		//	printf("%d\n", *int_data);

		free(int_data);
	}
	__sync_add_and_fetch(&nthreads_exited, 1);
	return 0;
}



/** Worker Keep Sending at the same time, do not try instensively **/
void*  worker_s(void *arg)
{
	int i = 0, *int_data;
	int total_loop = total_put * (*(int*)arg);
	while (i++ < total_loop) {
		int_data = (int*)malloc(sizeof(int));
		assert(int_data != NULL);
		*int_data = i;
		/*Enqueue*/

		while (lfq_enqueue(myq, int_data)) {
			// printf("ENQ FULL?\n");
		}
	}
	// __sync_add_and_fetch(&nthreads_exited, 1);
	return 0;
}

#define detach_thread_and_loop \
for (i = 0; i < nthreads; i++)\
pthread_detach(threads[i]);\
while ( nthreads_exited < nthreads ) \
	usleep(2000);\
if(myq->count != 0){\
usleep(2000);\
printf("current size= %d\n", myq->count);\
}

void one_enq_and_multi_deq(pthread_t *threads) {
	printf("-----------%s---------------\n", "one_enq_and_multi_deq");
	int i;
	for (i = 0; i < nthreads; i++)
		pthread_create(threads + i, NULL, worker_c, &one_thread);

	worker_s(&nthreads);

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wimplicit-function-declaration"
	detach_thread_and_loop;
#pragma GCC diagnostic pop
}

int main(void)
{
	int n;

	myq = malloc(sizeof	(struct lfq_ctx));
	lfq_init(myq, 4);

	for (n = 0; n < 1000; n++) {
		printf("Current running at =%d, ", n);
		nthreads_exited = 0;

		/* Spawn threads. */
		pthread_t threads[nthreads];
		printf("Using %d thread%s.\n", nthreads, nthreads == 1 ? "" : "s");
		printf("Total requests %d \n", total_put);
		gettimeofday(&tv1, NULL);

		one_enq_and_multi_deq(threads);
		//one_deq_and_multi_enq(threads);
		// multi_enq_deq(threads);

		gettimeofday(&tv2, NULL);
		printf ("Total time = %f seconds\n",
		        (double) (tv2.tv_usec - tv1.tv_usec) / 1000000 +
		        (double) (tv2.tv_sec - tv1.tv_sec));

		//getchar();
		usleep(1000);
		assert ( 0 == myq->count && "Error, all queue should be consumed but not");
	}
	lfq_clean(myq);
	free(myq);
	return 0;
}

Why are you using a node counter in the queue?

Why are you using a node counter in the queue? The queue is empty if head == tail right? Mutatis mutandis it is not empty if head != tail. So why count if the only thing you're interested in is if the queue is empty or not? (I.e. count == 0 or count != 0)

underlying algorithm

Hi
Thanks for sharing this. What is the underlying algorithm for this code? Is there any formal proof?

regards

Memory Leak while using valgrind

==14613== Memcheck, a memory error detector
==14613== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==14613== Using Valgrind-3.13.0 and LibVEX; rerun with -h for copyright info
==14613== Command: ./a.out
==14613==
Using 16 threads.
Total time = 90.838795 seconds
==14613==
==14613== HEAP SUMMARY:
==14613== in use at exit: 64,000,000 bytes in 16,000,000 blocks
==14613== total heap usage: 32,000,020 allocs, 16,000,020 frees, 576,010,048 bytes allocated
==14613==
==14613== 12 bytes in 3 blocks are possibly lost in loss record 1 of 2
==14613== at 0x4C29C23: malloc (vg_replace_malloc.c:299)
==14613== by 0x40091F: worker (in /home/booking/cground/lfqueue/a.out)
==14613== by 0x4E3DDD4: start_thread (in /usr/lib64/libpthread-2.17.so)
==14613== by 0x5150B3C: clone (in /usr/lib64/libc-2.17.so)
==14613==
==14613== 63,999,988 bytes in 15,999,997 blocks are definitely lost in loss record 2 of 2
==14613== at 0x4C29C23: malloc (vg_replace_malloc.c:299)
==14613== by 0x40091F: worker (in /home/booking/cground/lfqueue/a.out)
==14613== by 0x4E3DDD4: start_thread (in /usr/lib64/libpthread-2.17.so)
==14613== by 0x5150B3C: clone (in /usr/lib64/libc-2.17.so)
==14613==
==14613== LEAK SUMMARY:
==14613== definitely lost: 63,999,988 bytes in 15,999,997 blocks
==14613== indirectly lost: 0 bytes in 0 blocks
==14613== possibly lost: 12 bytes in 3 blocks
==14613== still reachable: 0 bytes in 0 blocks
==14613== suppressed: 0 bytes in 0 blocks
==14613==
==14613== For counts of detected and suppressed errors, rerun with: -v
==14613== ERROR SUMMARY: 2 errors from 2 contexts (suppressed: 0 from 0)

Test program

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <assert.h>
#include <sys/time.h>
#include "lfq.h"

struct lfq_ctx ctx;
struct timeval  tv1, tv2;
// lfstack_t results;

void *worker(void *);
void *worker(void *arg)
{
    long long i = 0;
    int *int_data;
    while (i < 1000000) {
        int_data = (int*) malloc(sizeof(int));
        assert(int_data != NULL);
        *int_data = i++;

        /*Enqueue*/
        while (lfq_enqueue(&ctx, int_data) != 0);

        /*Dequeue*/
        while ( (int_data = lfq_dequeue(&ctx)) != 0);
        
        // printf("%d\n", *int_data);
        free(int_data);

    }

    return NULL;
}


int main(void)
{   
    int nthreads = sysconf(_SC_NPROCESSORS_ONLN); // Linux
    int i;

    lfq_init(&ctx, nthreads);

    /* Spawn threads. */
    pthread_t threads[nthreads];
    printf("Using %d thread%s.\n", nthreads, nthreads == 1 ? "" : "s");
    gettimeofday(&tv1, NULL);
    for (i = 0; i < nthreads; i++)
        pthread_create(threads + i, NULL, worker, NULL);


    for (i = 0; i < nthreads; i++)
        pthread_join(threads[i], NULL);
     gettimeofday(&tv2, NULL);
    printf ("Total time = %f seconds\n",
         (double) (tv2.tv_usec - tv1.tv_usec) / 1000000 +
         (double) (tv2.tv_sec - tv1.tv_sec));

    lfq_clean(&ctx);
    return 0;
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.