yet another queue
It's just for fun.
I want a queue kind of like what you would get with Redis's LPUSH and BRPOP commands, and using an append-only file for persistence.
However, I want the queues to persist as directories full of files. That is, I
want to use directory entries as the core storage data structure for persistent
message queue. I want to be able to cd
into a queue and inspect its message
files. Not for any particular reason.
It will speak HTTP, handle a zillion concurrent connections, and be as simple as I can manage.
yaq
is an HTTP server, written in Go, that provides persistent message queues
backed by the file system.
Each queue has a name that is a URL path, e.g. /profile-updates/v2/user12345
is a queue, and so are /profile-updates/v2/user43554
, /
, and
/snazzy%20party
.
- GET {queue-path}
- GET {queue-path}?quantity={integer}
- GET {queue-path}?quantity=unlimited
- GET {queue-path}?timeout={time-spec}
- Dequeue the optionally specified quantity of message from the specified
queue. If quantity is not specified, dequeue one message. If quantity is
"unlimited," then keep dequeueing messages until the client disconnects.
If timeout is specified, then its value is a time duration as accepted by
Go's time.ParseDuration
function. If timeout is not specified, then it is infinite. The timeout
begins whenever the client is waiting for a message but the queue is empty.
The timeout applies separately to each message requested (i.e. it resets
when a message is received). If only one message is being dequeued, then
the body of the response is the message. If multiple messages are being
dequeued, then the body of the response is a chunked
multipart/mixed
MIME message as described in RFC 2046 Section 5.1.1 "Common Syntax". If fewer than the specified quantity of messages are dequeued, such as when a timeout occurs or some other error, then the response will end with a trailer containing aX-Next-Message-Status
header whose value is the effective HTTP response status code of the message that failed to dequeue. Some clients might prefer to avoid the "multipart" complexity by dequeuing messages one at a time using separate requests instead. - The response will have one of the following status codes:
- 200 OK
- One message was requested and successfully dequeued. The response body is the message.
- 202 Accepted
- Two or more messages were requested and will be streamed in the response. The response uses the
chunked
Transfer-Encoding
and is amultipart/mixed
MIME message containing the dequeued messages. - 504 Gateway Timeout
- The timeout that was specified as a query parameter in the request expired before a message could be dequeued.
- 500 Internal Server Error
- An error occurred. The response body will be a description of the error.
- POST {queue-path}
- POST {queue-path}?timeout={time-spec}
- Enqueue a message onto the specified queue. The body of the request is the
message. The request's
Content-Type
andContent-Encoding
will remain associated with the message. If timeout is specified, then its value is a time duration as accepted by Go's time.ParseDuration function. If timeout is not specified, then it is infinite. If a queue has reached its capacity, then attempts to enqueue messages will block until enough messages are dequeued to bring the queue back to below capacity. - The response will have one of the following status codes:
- 200 OK
- The message was successfully enqueued.
- 504 Gateway Timeout
- The timeout that was specified as a query parameter in the request expired before the message could be enqueued.
- 500 Internal Server Error
- An error occurred. The response body will be a description of the error.
$ go build
$ mkdir /tmp/queues
$ ./yaq /tmp/queues 127.0.0.1:1337 &
Listening on 127.0.0.1:1337. Storing queues under "/tmp/queues".
$ curl --request POST --data 'tuna' http://localhost:1337/fish/saltwater
$ curl --request POST --data 'swordfish' http://localhost:1337/fish/saltwater
$ find /tmp/queues
/tmp/queues
/tmp/queues/fish
/tmp/queues/fish/saltwater/
/tmp/queues/fish/saltwater/@0000000000
/tmp/queues/fish/saltwater/@0000000001
/tmp/queues/fish/saltwater/@next
/tmp/queues/fish/saltwater/@oldest
$ cat /tmp/queues/fish/saltwater/@0000000001
Content-Type: text/plain; charset=UTF-8
swordfish
$ cat /tmp/queues/fish/saltwater/@next
@0000000002
$ cat /tmp/queues/fish/saltwater/@oldest
@0000000000
$ curl http://localhost:1337/fish/saltwater
tuna
$ cat /tmp/queues/fish/saltwater/@oldest
@0000000001
$ curl http://localhost:1337/fish/saltwater
swordfish
$ curl http://localhost:1337/fish/saltwater
^C
$ curl http://localhost:1337/fish/saltwater?timeout=2s
$ curl --write-out '%{http_code}' http://localhost:1337/fish/saltwater?timeout=2s
504
$ curl http://localhost:1337/fish/saltwater | sed 's/^/from background consumer: /' &
$ curl --request POST --data 'wrasse' http://localhost:1337/fish/saltwater
from background consumer: wrasse
[2]+ Done curl http://localhost:1337/fish/saltwater | sed 's/^/from background consumer: /'
$ curl --request POST --data 'shark' http://localhost:1337/fish/saltwater
$ find /tmp/queues/
/tmp/queues/
/tmp/queues/fish/
/tmp/queues/fish/saltwater/
/tmp/queues/fish/saltwater/@0000000003
/tmp/queues/fish/saltwater/@next
$ cat /tmp/queues/fish/saltwater/@0000000003
Content-Type: text/plain; charset=UTF-8
shark
$ cat /tmp/queues/fish/saltwater/@next
@0000000004
$ kill %1
[1]+ Terminated ./yaq /tmp/queues 127.0.0.1:1337
$ find /tmp/queues/
/tmp/queues/
/tmp/queues/fish/
/tmp/queues/fish/saltwater/
/tmp/queues/fish/saltwater/@0000000003
/tmp/queues/fish/saltwater/@next
$