achilleasa / dart_amqp Goto Github PK
View Code? Open in Web Editor NEWDart AMQP client implementing protocol version 0.9.1
License: MIT License
Dart AMQP client implementing protocol version 0.9.1
License: MIT License
Hello,
I have a RabbitMQ broker in localhost in the port 15672 and I am trying to connect using your library to be able to send messages through AMQP.
I could do it with python but in flutter I am not be able to connect.
void main() async {
ConnectionSettings connectionSettings = ConnectionSettings(
authProvider: PlainAuthenticator("user","password"),
host: 'localhost',
port: 15672
);
Client client = Client(settings: connectionSettings);
await client.connect();
Channel channel = await client.channel();
Queue queue = await channel.queue("hello");
queue.publish("Hello World!");
print(" [x] Sent 'Hello World!'");
await client.close();
}
All the credentials are ok and the broker is up. if I write "localhost" the system throws "ConnectionFailedException: Could not connect to 127.0.0.1:15672 after 1 attempts. Giving up" and if I put my ip address, it throws "FatalException: Lost connection to the server".
Code:
ConnectionSettings settings = new ConnectionSettings(
host: "192.168.50.83",
port: 5672,
virtualHost: '/',
authProvider: new PlainAuthenticator("arx7", "arbalest"));
Client client = new Client(settings: settings);
Channel channel = await client.channel();
Exchange exchange =
await channel.exchange("luwak_topic", ExchangeType.TOPIC);
Consumer consumer = await exchange.bindPrivateQueueConsumer(['#']);
consumer.listen((message) {
print(
"[Exchange: ${message.exchangeName}] [${message.routingKey}] ${message.payloadAsString}");
});
Question:
If i publish a message to a queue, where the message is a string containing Ø, Æ or Å, an error is thrown:
FormatException: Bad UTF-8 encoding 0xf8 (at offset 21)
Most likely caused by this line, not sure how to fix it thought, any thoughts on a workaround?
Code to replicate:
import 'dart:convert';
import 'package:dart_amqp/dart_amqp.dart';
void main() async {
var client = await Client(
settings: ConnectionSettings(
host: "localhost",
port: 5672,
authProvider: PlainAuthenticator(
"rabbitmq",
"rabbitmq",
),
),
);
var queueName = "example_queue";
// Setup a listener for a queue
client
.channel()
.then((channel) => channel.queue(queueName))
.then((Queue queue) => queue.consume())
.then((Consumer consumer) {
consumer.listen((AmqpMessage message) {
// Both will fail with the following exception:
// FormatException: Bad UTF-8 encoding 0xf8 (at offset 21)
print(message.payloadAsString);
print(message.payloadAsJson);
});
});
// Publish a message to the queue
client
.channel()
.then((channel) => channel.queue(queueName))
.then((Queue queue) {
// Publish the JSON string
queue.publish("This string contains ø, æ or å");
});
}
DartPad with example:
https://dartpad.dartlang.org/94b3febd063256dd8b0445a698fb0c22
Dart sdk 1.14 introduced DateTime.microsecond
which seems to break the unit tests that use whole second accuracy
For example: https://github.com/achilleasa/dart_amqp/blob/master/test/lib/queue_test.dart#L167
I have a patch for it but I think drone.io needs to be updated to use the latest version of dart.
master...rajmaniar:unittests_1.16
When we have a connection to rabbit with very low consumer traffic or a connection with not consumers (only used for publishing) we get an occasion close with the message too many missed heartbeats
I'll add more details to this ticket as I investigate this a bit more so it's a placeholder for now.
Hey there,
Are there any expectations to convert the code for dart2? Or to have a specific branch of it.
I've tested it with Flutter (Dart 1) and RabbitMQ and it's working nicely. But I do think to use dart2 instead.
I would like to help if possible.
Cheers,
Sometimes after connecting to my RabbitMQ instance I get the following error.
Jan 05 22:22:02 a41a608c3bea clever_bardeen: The null object does not have a getter 'msgClassId'.
Jan 05 22:22:02 a41a608c3bea clever_bardeen: NoSuchMethodError: method not found: 'msgClassId'
Jan 05 22:22:02 a41a608c3bea clever_bardeen: Receiver: null
Jan 05 22:22:02 a41a608c3bea clever_bardeen: Arguments: []
Which ends up eventually throwing
FatalException: Lost connection to the server.
Even though the server is fine.
Hi,
is there any plan to support Sound Null Safety?
when i add heartbeat to client
after connection lost i get this error
flutter: #0 _ChannelImpl.writeHeartbeat package:dart_amqp/…/impl/channel_impl.dart:66 #1 _ChannelImpl._processHandshake.<anonymous closure> package:dart_amqp/…/impl/channel_impl.dart:191 #2 _rootRunUnary (dart:async/zone.dart:1399:47) #3 _CustomZone.runUnary (dart:async/zone.dart:1300:19) #4 _CustomZone.runUnaryGuarded (dart:async/zone.dart:1209:7) #5 _CustomZone.bindUnaryCallbackGuarded.<anonymous closure> (dart:async/zone.dart:1246:26) #6 _rootRunUnary (dart:async/zone.dart:1407:13) #7 _CustomZone.runUnary (dart:async/zone.dart:1300:19) #8 _CustomZone.bindUnaryCallback.<anonymous closure> (dart:async/zone.dart:1230:26) #9 _Timer._runTimers (dart:isolate-patch/timer_impl.dart:398:19) #10 _Timer._handleMessage (dart:isolate-patch/timer_impl.dart:429:5) #11 _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:192:12)
hello,i was trying this package on mobile platforms (android /ios) and it's work fine.
but what about web. I think add web support it's will be a big contribution
// test.dart
var client = new amqp.Client(settings: amqp.ConnectionSettings(
host : 'localhost',
port : 5672,
maxConnectionAttempts: 99999, // -- "infinite" retries because setting this to 0 or -1 isn't supported
authProvider : amqp.PlainAuthenticator('username', 'password'),
));
await client.connect();
With the rabbit instance stopped, start the above dart script. Let it make a couple unsuccessful attempts / retries, and then start the rabbit instance. As soon as it comes online, notice the FatalException
thrown by the AMQP client.
dart run ./test.dart
INFO: 2022-01-04 11:07:16.902302: dart_amqp.Connection: Trying to connect to localhost:5672 [attempt 1/99999]
INFO: 2022-01-04 11:07:20.509093: dart_amqp.Connection: Trying to connect to localhost:5672 [attempt 2/99999]
INFO: 2022-01-04 11:07:24.066724: dart_amqp.Connection: Trying to connect to localhost:5672 [attempt 3/99999]
Unhandled exception:
FatalException: Lost connection to the server
SEVERE : 2022-01-04 11:07:24.588279: dart_amqp.Connection: FatalException: Lost connection to the server
Process finished with exit code 255
Pass through onBadCertificate handler from client connection settings to SecureSocket constructor.
there isn't an option to set the auto-delete flags for exchange.
I was digging the source and you kinda use some source generator, right?
Can you explain a little bit about it?
So then I can add the flag :)
is there any event for disconnection? if not please add one.
Hi,
As, "the driver does not currently support recovering client topologies when re-establishing connections",
Could it be possible to add an example demonstrating how to do this properly ?
There are a lot of hints in the comments on how it should be done, but I think it would help newcomers like me in order to have a fast grab on the library.
First of all, thank you so very much for this project.
Do you have any plans &/ capacity for implementing the AMQP 1.0? ActiveMQ Artemis (first release 2015) requires it.
When a channel exception is thrown or channel is closed the Consumer's error or done callbacks are not invoked.
Consider:
var c = Client();
var chan = await c.channel();
var queue = await chan.queue("test123", autoDelete: true);
var consumer = await queue.consume(noAck: true);
consumer.listen((event) {print("queue: $event");}, onDone: (){print("consumer done");}, onError: (e){print("consumer error: $e");}, cancelOnError: true);
try {
await chan.queue("test456", passive: true);
} on QueueNotFoundException {
print("QueueNotFoundException will break the channel");
}
await Future.delayed(Duration(seconds: 60));
In the above scenario the consumer done
or consumer error
are never emitted and the stream isn't closed.
It seems to me the channel exception handler should loop over all the consumer stream controllers, pass the error down and/or close the streams.
It looks like the channelId
inserted into the _ClientImpl._channel
map here: https://github.com/achilleasa/dart_amqp/blob/master/lib/src/client/impl/client_impl.dart#L268
Isn't removed when the channel is closed here:
https://github.com/achilleasa/dart_amqp/blob/master/lib/src/client/impl/channel_impl.dart#L200
For long running processes the instance of Client eventually hits the 65536 max and closes the client.
We could add .then((_)=>_client._channels.remove(channelId));
but it's probably better to expose an api in _ClientImpl
or bind an onCancel
to the _basicReturnStream
or something.
I'm happy to issue a PR with a fix, just let me know how you'd prefer to tackle it.
I face the case that I setup RabbitMQ as channel A1, A2, ..., A12 which expected 12 of Android to be able to connect by my simple flutter code as below:
ConnectionSettings settings = ConnectionSettings(
host: _rabbitMQServer,
port: 5672,
authProvider: const PlainAuthenticator(_rabbitMQUser, _rabbitMQPass));
Client client = Client(settings: settings);
Channel channel = await client
.channel();
Queue queue = await channel.queue("A" + android.toString());
Consumer consumer = await queue.consume();
consumer.listen((AmqpMessage message) {
// ... do things ...
This code is working fine in multiple emulators, but it's not working when I upload into multiple android devices.
At Android number 1 for channel A1, we can connect and receive the message.
But at Android number 2, 3, ..., 12 I got exception "ConnectionFailedException: Cloud not connect to 192.168.1.63:5672 after 1 attempts. Giving up"
At first I suspected relate with RabbitMQ configuration to allow connection but since emulator can connect normally then it's not the case.
In this picture, first line (192.168.1.43 is Android number 1) and second line (192.168.1.47 is the emulator)
Please help to suggest if I missed anything.
I create a Client singleton object and open/close a single connection on that Client only whenever I need to consume from RabbitMQ. I can open and close a connection just fine the first time, but when I open a connection a second time on that same Client object, that connection doesn't close.
This is because of the check on line 237 in client_impl.dart:
if (_clientClosed != null) { return _clientClosed.future; }
_clientClosed is set to null when the Client object is first created, but after the connection itself is closed, _clientClosed isn't set back to null. This means the check above always returns a Future, rather than proceeding to the code to close the connection.
It could be that the library doesn't intend for Clients to be re-used like this, but my assumption was it's okay to re-use a Client. Right now as a work-around I just create a new Client object each time I need to establish a new connection to RabbitMQ.
I could submit a fix for this, but wanted to ensure that you all agree this is a bug and how to write unit tests for this.
Thanks!
How can i yield a message inside the listen or data part (which both are void)?
Par example:
Consumer consumer = await queue.consume(consumerTag: queueName);
consumer.listen((data) {
yield data.payloadAsString;
});
Consumer consumer = await queue.consume(consumerTag: queueName);
var c = consumer.listen((AmqpMessage msg) {});
c.onData((data) {
yield data.payloadAsString;
})
I just did a quick test with this package (version 0.0.1) and the following code. After the first message (or bunch of messages) no more messages are received and the rabbitmq queue grows up.
import "package:dart_amqp/dart_amqp.dart";
main() async {
// auto-connect to localhost:5672 using guest credentials
Client client = new Client();
final channel = await client.channel();
final queue = await channel.queue("test", durable: true);
final consumer = await queue.consume();
consumer.listen((message) {
// Get the payload as a string
print(" [x] Received string: ${message.payloadAsString}");
// Or unserialize to json
print(" [x] Received json: ${message.payloadAsJson}");
// Or just get the raw data as a Uint8List
print(" [x] Received raw: ${message.payload}");
message.ack();
});
}
E/flutter (25971): [ERROR:flutter/lib/ui/ui_dart_state.cc(209)] Unhandled Exception: Unhandled error ConnectionFailedException: Could not connect to mustang.rmq.cloudamqp.com:5672 after 1 attempts. Giving up occurred in Instance of 'OneToOneMatchLiveScreenBloc'.
E/flutter (25971):
E/flutter (25971): #0 BlocBase.onError.<anonymous closure> (package:bloc/src/bloc.dart:743:7)
E/flutter (25971): #1 BlocBase.onError (package:bloc/src/bloc.dart:744:6)
E/flutter (25971): #2 _rootRunBinary (dart:async/zone.dart:1452:47)
Hi,
Any example of handling the client timeout?? and also can we set the timeout length manually??
Thanks.
Hi,
Is it possible to get deliveryTag
from the received AmqpMessage
?
Are you guys having any plans to implement heartbeats?
This is pretty necessary while we keep having lots of zombie connections on the server side!
Is there any plans of adding TLS support?
If so, is there a timeline?
FIrst off, thanks for all your work on dart_amqp. You have saved me countless hours!
To achieve high reliability, I want to confirm i.e. acknowledge both message send and message consume. My question is how do you confirm that a message has been successfully sent/published?
I have read the RabbitMQ docs at https://www.rabbitmq.com/confirms.html#publisher-confirms and have been unable to make this work. I have tried using [channel.ack()] and see no properties as recommended in your docs to instead use [MessageProperties], but I have been unsuccessful. Additionally, there is no doc on the queue [noWait] flag, which if I set to true
"hangs" my app. What is the purpose of [noWait]?
Thanks in advance for any assistance with these questions.
i want to perform a simple task
run consumer in background for a queue, and pick 5 messages at a time and perform operation on each. how to achieve this?
Hi!
RabbitMQ requires separate "qos" settings for each new consumer. (global = false)
https://www.rabbitmq.com/consumer-prefetch.html#overview
The abstract class "Channel" in the "qos" function is missing the "global" parameter.
This will fix everything.
Thanks!
Hi. I tried to use your library to put a message on the default exchange (which by convention has the empty string as a name) of my server. However, your code does not allow for this (line 547 of client/impl/channel_impl.dart deliberately throws an exception).
Is there a reason the library does not allow this, and is there any other way to publish on the default exchange?
Thanks.
I want listen a topic from my app. Can i do that?
I am using RabbitMq as broker service. the issue is when my app is closed or terminated i am not receiving any message in queue.
I don't know the how to reproduce this but we just saw this in our logs:
type '_CastError' is not a subtype of type 'Exception'
package:dart_amqp/src/client/impl/client_impl.dart 225 _ClientImpl._handleException
package:dart_amqp/src/client/impl/client_impl.dart 189 _ClientImpl._handleMessage
dart:async _EventSinkWrapper.add
package:dart_amqp/src/protocol/io/amqp_message_decoder.dart 42 AmqpMessageDecoder.handleData
dart:async _EventSinkWrapper.add
package:dart_amqp/src/protocol/io/raw_frame_parser.dart 79 RawFrameParser.handleData
It seems like there might be two issues here:
_handleException
accepts a dynamic error but is trying to add it to the error stream which is typed Exception
_handleMessage
is throwing the CastError
Shouldn't it be dart_amqp?
I got this error when i try to use ActiveMQ
E/flutter (29276): [ERROR:flutter/lib/ui/ui_dart_state.cc(177)] Unhandled Exception: FatalException: Could not negotiate a valid AMQP protocol version. Server supports AMQP 1.0.0
Hi,
Is it possible to expose the socket in ClientImpl to handle errors? At present, if the client disconnects after establishing a connection, there is no way of knowing. If the socket could be exposed, we could add on socket events to handle such events and re-establish the connection.
Is this possible? If not, are there better ways of doing this? Thanks!
Hello everyone!
I am trying to develop an app using dart_amqp and it is working fine when in debug mode.
But when I run the app on release mode, it can't connect to my Rabbit server:
ectionFailedException: Could not connect to MYSERVERADDRESS:5672 after 1 attempts. Giving up
Anyone have a clue about that could be causing this?
After a weekend without traffic on a rabbitmq connection the first publication leads to the following message in rabbitmq logs:
=INFO REPORT==== 22-Mar-2019::08:04:05 ===
connection <0.580.0> (10.233.77.108:54878 -> 10.233.81.213:5672): user 'guest' authenticated and granted access to vhost '/'
=WARNING REPORT==== 25-Mar-2019::07:54:25 ===
closing AMQP connection <0.580.0> (10.233.77.108:54878 -> 10.233.81.213:5672, vhost: '/', user: 'guest'):
client unexpectedly closed TCP connection
I don't really know why this happens because my dart code doesn't display anything in its logs (and it doesn't crash) although I set a errorListener:
rabbitmqClient = amqp.Client(
settings: amqp.ConnectionSettings(
host: config.rabbitmq.host,
port: config.rabbitmq.port,
virtualHost: config.rabbitmq.virtualHost,
authProvider: amqp.PlainAuthenticator(
config.rabbitmq.user,
config.rabbitmq.password,
),
),
)..errorListener((e) {
_log.warning('rabbit client error: $e');
if (e is amqp.FatalException) {
_log.severe('shutting down because of $e');
exit(1);
}
});
Hi,
I'm using dart_ampq to listen a queue like in your example. I am testing the application on an android emulator.
After a while, the application no longer listens to the queue and does so without invoking any error or without executing any print (see code below)
ConnectionSettings settings = new ConnectionSettings (
port: port,
host: host,
authProvider: new PlainAuthenticator (userName, password))
_client = Client(settings: settings);
try {
_client
.channel ()
.then ((Channel channel) => channel.queue (queueName, durable: true))
.then ((Queue queue) => queue.consume ())
.then (
(Consumer consumer) => consumer.listen ((AmqpMessage message) {
// do stuff
}, onDone: () {
print ("onDone");
}, onError: (e, k) {
print ("onError");
}),
)
.whenComplete (() {
print ("whenComplete");
}). then ((t) {
print ("then");
});
} on ConnectionException catch (e) {
print ("ConnectionException $e");
}
Am I doing something wrong ?
I have been used this lib, but after a while the connection auto close and when start it again it connect with another consumer in queue,
I can publish in another queues but it stop consuming. in order to finger out what is happen the solution is complete delete the queue and restart it again. I read in pub.dev that
the driver does not currently support recovering client topologies when re-establishing connections. This feature may be implemented in a future version.
is that what is happening ?
how can I try implement this an check is there is an stable connection or if the queue has consumers?
how can I persist the consumer to avoid this?
thanks
class Amqp {
late bool isRunning;
late Client client;
late SoundsService soundsService;
Amqp() {
isRunning = false;
ConnectionSettings settings = ConnectionSettings(
virtualHost:"/",
host: "rabbitmqhost",
port: rabbitmqport,
authProvider: const PlainAuthenticator(
"rabbitmqusers",
"rabbitmqpass"
),
);
client = Client(settings: settings);
soundsService = SoundsService();
}
getClient() {
return client;
}
start() async {
print("AMQP::starting");
//consume();
client.connect().then( (defaultChannel) => {
consume()
}).catchError((onError) => {
print("ERROR: $onError"),
});
return true;
}
close() async {
isRunning = false;
print("isRunning: $isRunning");
client.channel().then((channel) => {
print("amqp::channel::closed"),
channel.queue(CurrentInfo.queue, durable: true).then((Queue queue) => {
queue.delete().then((_) => {
print("amqp::queue::closed"),
client.close().then((_) => {
print("amqp::client::closed"),
})
})
}),
channel.close()
});
client.close();
return true;
}
void deleteQueue() {
client.channel().then((channel) => {
channel.queue(CurrentInfo.queue, durable: true).then((Queue queue) => {
queue.delete().then((_) => {
print("FORCE::amqp::queue::deleted"),
})
}),
});
}
/// *************************************
/// consumer
consume() async {
isRunning = true;
print("AMQP::connected::starting-consume");
print("isRunning: $isRunning");
Channel channel = await client.channel();
Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);
Queue queue = await channel.queue(CurrentInfo.queue, durable: true);
queue.bind(exchange, CurrentInfo.queue);
Consumer consumer = await queue.consume();
consumer.listen( (AmqpMessage message) {
var payload = Payload.fromJson(json.decode(message.payloadAsString));
if ( payload.action == "notification" ) {
print("AMQP::consume::action::notification");
print(payload.toJson());
}
if ( payload.action == 'answer-order' ) {
print("AMQP::consume::action::answer-order");
print(payload.toJson());
Vibration.vibrate(duration: 10000, intensities: [100, 200, 100, 200, 100, 200, 100, 200, 100, 200]);
showActivityBottomSheet(AnswerOrderDetails.fromJson(payload.data));
});
}
/**
print(" [x] Received:: payloadString :: ${message.payloadAsString}");
print(" [x] Received:: payloadJson :: ${message.payloadAsJson}");
print(" [x] Received:: routingKey :: ${message.routingKey}");
*/
});
}
/// *************************************
/// publisher functions
///
/// users/track
publishUpdateLocation(double lat,double lng, String usersId, String status) async {
Channel channel = await client.channel();
Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);
var position = {
'usersId': usersId,
"lat": lat,
"lng": lng,
"status": status
};
print("POSITION::DATA:: $position");
exchange.publish(json.encode(position), 'position-listener');
}
/// users/track/orders
publishUpdateLocationTrackOrder(double lat,double lng, String usersId, String ordersId, String status) async {
Channel channel = await client.channel();
Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);
var position = {
'usersId': usersId,
'ordersId': ordersId,
"lat": lat,
"lng": lng,
"status": status
};
exchange.publish(json.encode(position), 'position-orders-listener');
}
/// answer-order
publishAnswerOrder(String answer, String usersId, String ordersId) async {
Channel channel = await client.channel();
Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);
var data = {
'usersId': usersId,
"ordersId": ordersId,
"answer": answer
};
exchange.publish(json.encode(data), 'answer-orders-listener');
print("ANSWER::ORDER:: $data");
}
///
publishInteractOrder(String type, String usersId, String ordersId) async {
Channel channel = await client.channel();
Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);
var data = {
'usersId' : usersId,
"ordersId" : ordersId,
"type" : type // collected, delivered,
};
exchange.publish(json.encode(data), 'interact-orders-listener');
print("INTERACT::ORDER:: $data");
}
}
Hello. When will you do heartbeat in your amqp client?
So after the network lost, the the client can connect to mq automatically?
I am testing the performance of the dart_amqp client. I am publishing 100,000 messages in my test, and then receiving them in a separate client. I am basically using the code from your readme example, except that I added a loop to publish the messages:
`Client publisher = new Client(settings : settings);
publisher
.channel()
.then((Channel channel) {
channel.queue('logs').then((Queue q) {
channel.exchange("logs", ExchangeType.FANOUT)
.then((Exchange exchange) {
q.bind(exchange,'logs');
int x = 0;
while(x < 100000) {
exchange.publish("Testing 1-2-3", 'logs');
x++;
}
return publisher.close();
});`
My client code is exactly the same as your readme example.
When I start the publisher and then start the server, I begin to receive messages, but then, after processing 4500 messages, I get a stack overflow exception:
Unhandled exception:
Stack Overflow
#0 _startMicrotaskLoop (dart:async/schedule_microtask.dart:51)
#1 _runPendingImmediateCallback (dart:isolate-patch/isolate_patch.dart:96)
#2 _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:149)
#3 _runPendingImmediateCallback (dart:isolate-patch/isolate_patch.dart:96)
#4 _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:149)
#5 RawFrameParser.handleData (package:dart_amqp/src/protocol/io/raw_frame_parser.dart:79:9)
#6 RawFrameParser.handleData (package:dart_amqp/src/protocol/io/raw_frame_parser.dart:79:9)
#7 RawFrameParser.handleData (package:dart_amqp/src/protocol/io/raw_frame_parser.dart:79:9)
#8 RawFrameParser.handleData (package:dart_amqp/src/protocol/io/raw_frame_parser.dart:79:9)
#9 RawFrameParser.handleData (package:dart_amqp/src/protocol/io/raw_frame_parser.dart:79:9)
...
...
#10 _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:341)
#11 _BufferingStreamSubscription._add (dart:async/stream_impl.dart:270)
#12 _StreamController&&_SyncStreamControllerDispatch._sendData (dart:async/stream_controller.dart:744)
#13 _StreamController._add (dart:async/stream_controller.dart:616)
#14 _StreamController.add (dart:async/stream_controller.dart:562)
#15 _RawSocket._RawSocket. (dart:io-patch/socket_patch.dart:1215)
#16 _NativeSocket.issueReadEvent.issue (dart:io-patch/socket_patch.dart:749)
#17 _microtaskLoop (dart:async/schedule_microtask.dart:41)
#18 _startMicrotaskLoop (dart:async/schedule_microtask.dart:50)
#19 _runPendingImmediateCallback (dart:isolate-patch/isolate_patch.dart:96)
#20 _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:149)
Am I doing something wrong here?
In my use case, the user has read access to the queue but not write or config (RabbitMQ hosted by Amazon) and should not need config access to read from a queue. Trying to access the queue with channel.queue(name)
results in:
Caught error: ChannelException(ACCESS_REFUSED): ACCESS_REFUSED - access to queue 'queue' in vhost 'vhost' refused for user 'user'
I want to bind a fanout exchange to a direct exchange.
In dart_amqp-0.1.1
I can see a TuningSettings.heartbeatPeriod
but this property doesn't seem to be used.
Is it supposed to work?
As in the topic, i need to bind a queue to exchange.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.