Type-safe AMQP workers. Compatible with RabbitMQ
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
module Main where
import Control.Concurrent (forkIO)
import Control.Monad.Catch (SomeException)
import Data.Aeson (FromJSON, ToJSON)
import Data.Function ((&))
import Data.Text (Text, pack)
import GHC.Generics (Generic)
import Network.AMQP.Worker (Connection, Message (..),
WorkerException, def, fromURI)
import qualified Network.AMQP.Worker as Worker
import Network.AMQP.Worker.Key
import System.IO (BufferMode (..), hSetBuffering,
stderr, stdout)
data TestMessage = TestMessage
{ greeting :: Text }
deriving (Generic, Show, Eq)
instance FromJSON TestMessage
instance ToJSON TestMessage
newMessages :: Key Routing TestMessage
newMessages = key "messages" & word "new"
results :: Key Routing Text
results = key "results"
anyMessages :: Key Binding TestMessage
anyMessages = key "messages" & star
example :: IO ()
example = do
-- connect
conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672")
let handleAnyMessages = Worker.topic anyMessages "handleAnyMessage"
-- initialize the queues
Worker.bindQueue conn (Worker.direct newMessages)
Worker.bindQueue conn (Worker.direct results)
-- topic queue!
Worker.bindQueue conn handleAnyMessages
putStrLn "Enter a message"
msg <- getLine
-- publish a message
putStrLn "Publishing a message"
Worker.publish conn newMessages (TestMessage $ pack msg)
-- create a worker, the program loops here
_ <- forkIO $ Worker.worker conn def (Worker.direct newMessages) onError (onMessage conn)
_ <- forkIO $ Worker.worker conn def (handleAnyMessages) onError (onMessage conn)
putStrLn "Press any key to exit"
_ <- getLine
return ()
onMessage :: Connection -> Message TestMessage -> IO ()
onMessage conn m = do
let testMessage = value m
putStrLn "Got Message"
print testMessage
Worker.publish conn results (greeting testMessage)
onError :: WorkerException SomeException -> IO ()
onError e = do
putStrLn "Do something with errors"
print e
main :: IO ()
main = do
hSetBuffering stdout LineBuffering
hSetBuffering stderr LineBuffering
example