Git Product home page Git Product logo

dart_amqp's People

Contributors

a14n avatar achilleasa avatar andre-alck avatar boaz-amit avatar brizaldi avatar christian-thiele avatar faisalabid avatar hoylen avatar jafarili avatar jrobindb avatar kiithnabaal avatar rajmaniar avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

dart_amqp's Issues

Lost connection to the server

Hello,

I have a RabbitMQ broker in localhost in the port 15672 and I am trying to connect using your library to be able to send messages through AMQP.

I could do it with python but in flutter I am not be able to connect.

void main() async {
  ConnectionSettings connectionSettings = ConnectionSettings(
    authProvider: PlainAuthenticator("user","password"),
    host: 'localhost',
    port: 15672
  );
  Client client = Client(settings: connectionSettings);
  await client.connect();
  Channel channel = await client.channel();
  Queue queue = await channel.queue("hello");
  queue.publish("Hello World!");
  print(" [x] Sent 'Hello World!'");
  await client.close();
}

All the credentials are ok and the broker is up. if I write "localhost" the system throws "ConnectionFailedException: Could not connect to 127.0.0.1:15672 after 1 attempts. Giving up" and if I put my ip address, it throws "FatalException: Lost connection to the server".

Help

Code:
ConnectionSettings settings = new ConnectionSettings(
host: "192.168.50.83",
port: 5672,
virtualHost: '/',
authProvider: new PlainAuthenticator("arx7", "arbalest"));
Client client = new Client(settings: settings);

Channel channel = await client.channel();
Exchange exchange =
await channel.exchange("luwak_topic", ExchangeType.TOPIC);
Consumer consumer = await exchange.bindPrivateQueueConsumer(['#']);

    consumer.listen((message) {
  print(
      "[Exchange: ${message.exchangeName}] [${message.routingKey}] ${message.payloadAsString}");
});

Question:

  • inequivalent arg 'durable' for exchange 'luwak_topic' in vhost '/': received 'false' but current is 'true'

Issues decoding strings with ØÆÅ

If i publish a message to a queue, where the message is a string containing Ø, Æ or Å, an error is thrown:

FormatException: Bad UTF-8 encoding 0xf8 (at offset 21)

Most likely caused by this line, not sure how to fix it thought, any thoughts on a workaround?

https://github.com/achilleasa/dart_amqp/blob/master/lib/src/protocol/frame/impl/decoded_message_impl.dart#L51

Code to replicate:

import 'dart:convert';

import 'package:dart_amqp/dart_amqp.dart';

void main() async {
  var client = await Client(
    settings: ConnectionSettings(
      host: "localhost",
      port: 5672,
      authProvider: PlainAuthenticator(
        "rabbitmq",
        "rabbitmq",
      ),
    ),
  );

  var queueName = "example_queue";

  // Setup a listener for a queue
  client
      .channel()
      .then((channel) => channel.queue(queueName))
      .then((Queue queue) => queue.consume())
      .then((Consumer consumer) {
    consumer.listen((AmqpMessage message) {
      // Both will fail with the following exception:
      // FormatException: Bad UTF-8 encoding 0xf8 (at offset 21)
      print(message.payloadAsString);
      print(message.payloadAsJson);
    });
  });

  // Publish a message to the queue
  client
      .channel()
      .then((channel) => channel.queue(queueName))
      .then((Queue queue) {
    // Publish the JSON string
    queue.publish("This string contains ø, æ or å");
  });
}

DartPad with example:
https://dartpad.dartlang.org/94b3febd063256dd8b0445a698fb0c22

Dart 2 support

Hey there,

Are there any expectations to convert the code for dart2? Or to have a specific branch of it.
I've tested it with Flutter (Dart 1) and RabbitMQ and it's working nicely. But I do think to use dart2 instead.

I would like to help if possible.

Cheers,

After connection I get The null object does not have a getter 'msgClassId'.

Sometimes after connecting to my RabbitMQ instance I get the following error.

Jan 05 22:22:02 a41a608c3bea clever_bardeen: The null object does not have a getter 'msgClassId'.
Jan 05 22:22:02 a41a608c3bea clever_bardeen: NoSuchMethodError: method not found: 'msgClassId'
Jan 05 22:22:02 a41a608c3bea clever_bardeen: Receiver: null
Jan 05 22:22:02 a41a608c3bea clever_bardeen: Arguments: []

Which ends up eventually throwing

FatalException: Lost connection to the server.

Even though the server is fine.

Client dispose after connection lost error

when i add heartbeat to client
after connection lost i get this error
flutter: #0 _ChannelImpl.writeHeartbeat package:dart_amqp/…/impl/channel_impl.dart:66 #1 _ChannelImpl._processHandshake.<anonymous closure> package:dart_amqp/…/impl/channel_impl.dart:191 #2 _rootRunUnary (dart:async/zone.dart:1399:47) #3 _CustomZone.runUnary (dart:async/zone.dart:1300:19) #4 _CustomZone.runUnaryGuarded (dart:async/zone.dart:1209:7) #5 _CustomZone.bindUnaryCallbackGuarded.<anonymous closure> (dart:async/zone.dart:1246:26) #6 _rootRunUnary (dart:async/zone.dart:1407:13) #7 _CustomZone.runUnary (dart:async/zone.dart:1300:19) #8 _CustomZone.bindUnaryCallback.<anonymous closure> (dart:async/zone.dart:1230:26) #9 _Timer._runTimers (dart:isolate-patch/timer_impl.dart:398:19) #10 _Timer._handleMessage (dart:isolate-patch/timer_impl.dart:429:5) #11 _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:192:12)

web pltform

hello,i was trying this package on mobile platforms (android /ios) and it's work fine.
but what about web. I think add web support it's will be a big contribution

FatalException thrown when multiple connection attempts made to an offline server that suddenly comes online.

// test.dart
var client = new amqp.Client(settings: amqp.ConnectionSettings(
  host                 : 'localhost',
  port                 : 5672,
  maxConnectionAttempts: 99999, // -- "infinite" retries because setting this to 0 or -1 isn't supported
  authProvider         : amqp.PlainAuthenticator('username', 'password'),
));
await client.connect();

With the rabbit instance stopped, start the above dart script. Let it make a couple unsuccessful attempts / retries, and then start the rabbit instance. As soon as it comes online, notice the FatalException thrown by the AMQP client.

dart run ./test.dart
INFO: 2022-01-04 11:07:16.902302: dart_amqp.Connection: Trying to connect to localhost:5672 [attempt 1/99999]
INFO: 2022-01-04 11:07:20.509093: dart_amqp.Connection: Trying to connect to localhost:5672 [attempt 2/99999]
INFO: 2022-01-04 11:07:24.066724: dart_amqp.Connection: Trying to connect to localhost:5672 [attempt 3/99999]
Unhandled exception:
FatalException: Lost connection to the server
SEVERE : 2022-01-04 11:07:24.588279: dart_amqp.Connection: FatalException: Lost connection to the server

Process finished with exit code 255

auto-delete for exchanges

there isn't an option to set the auto-delete flags for exchange.
I was digging the source and you kinda use some source generator, right?
Can you explain a little bit about it?
So then I can add the flag :)

Example request : re-creating client topology when re-establishing connection

Hi,

As, "the driver does not currently support recovering client topologies when re-establishing connections",

Could it be possible to add an example demonstrating how to do this properly ?

There are a lot of hints in the comments on how it should be done, but I think it would help newcomers like me in order to have a fast grab on the library.

Any plans on implementing AMQP 1.0?

First of all, thank you so very much for this project.

Do you have any plans &/ capacity for implementing the AMQP 1.0? ActiveMQ Artemis (first release 2015) requires it.

Channel Exceptions, Channel Closes, etc don't inform Consumers

When a channel exception is thrown or channel is closed the Consumer's error or done callbacks are not invoked.

Consider:

var c = Client();
var chan = await c.channel();
var queue = await chan.queue("test123", autoDelete: true);
var consumer = await queue.consume(noAck: true);
consumer.listen((event) {print("queue: $event");}, onDone: (){print("consumer done");}, onError: (e){print("consumer error: $e");}, cancelOnError: true);

try {
    await chan.queue("test456", passive: true);
 } on QueueNotFoundException {
    print("QueueNotFoundException will break the channel");
}
await Future.delayed(Duration(seconds: 60));

In the above scenario the consumer done or consumer error are never emitted and the stream isn't closed.

It seems to me the channel exception handler should loop over all the consumer stream controllers, pass the error down and/or close the streams.

Channel.close does not appear to release resource

It looks like the channelId inserted into the _ClientImpl._channel map here: https://github.com/achilleasa/dart_amqp/blob/master/lib/src/client/impl/client_impl.dart#L268

Isn't removed when the channel is closed here:
https://github.com/achilleasa/dart_amqp/blob/master/lib/src/client/impl/channel_impl.dart#L200

For long running processes the instance of Client eventually hits the 65536 max and closes the client.

We could add .then((_)=>_client._channels.remove(channelId)); but it's probably better to expose an api in _ClientImpl or bind an onCancel to the _basicReturnStream or something.

I'm happy to issue a PR with a fix, just let me know how you'd prefer to tackle it.

Multiple android (flutter) cannot connect RabbitMQ but multiple emulator can

I face the case that I setup RabbitMQ as channel A1, A2, ..., A12 which expected 12 of Android to be able to connect by my simple flutter code as below:

ConnectionSettings settings = ConnectionSettings(
host: _rabbitMQServer,
port: 5672,
authProvider: const PlainAuthenticator(_rabbitMQUser, _rabbitMQPass));
Client client = Client(settings: settings);
Channel channel = await client
.channel();
Queue queue = await channel.queue("A" + android.toString());
Consumer consumer = await queue.consume();
consumer.listen((AmqpMessage message) {
// ... do things ...

This code is working fine in multiple emulators, but it's not working when I upload into multiple android devices.

At Android number 1 for channel A1, we can connect and receive the message.
But at Android number 2, 3, ..., 12 I got exception "ConnectionFailedException: Cloud not connect to 192.168.1.63:5672 after 1 attempts. Giving up"

At first I suspected relate with RabbitMQ configuration to allow connection but since emulator can connect normally then it's not the case.

image
In this picture, first line (192.168.1.43 is Android number 1) and second line (192.168.1.47 is the emulator)

Please help to suggest if I missed anything.

Cannot close new connections opened by Client after closing once

I create a Client singleton object and open/close a single connection on that Client only whenever I need to consume from RabbitMQ. I can open and close a connection just fine the first time, but when I open a connection a second time on that same Client object, that connection doesn't close.

This is because of the check on line 237 in client_impl.dart:

if (_clientClosed != null) { return _clientClosed.future; }

_clientClosed is set to null when the Client object is first created, but after the connection itself is closed, _clientClosed isn't set back to null. This means the check above always returns a Future, rather than proceeding to the code to close the connection.

It could be that the library doesn't intend for Clients to be re-used like this, but my assumption was it's okay to re-use a Client. Right now as a work-around I just create a new Client object each time I need to establish a new connection to RabbitMQ.

I could submit a fix for this, but wanted to ensure that you all agree this is a bug and how to write unit tests for this.

Thanks!

yield inside listen / on.data()

How can i yield a message inside the listen or data part (which both are void)?

Par example:

 Consumer consumer = await queue.consume(consumerTag: queueName);
 consumer.listen((data) {
      yield data.payloadAsString;
 });
 Consumer consumer = await queue.consume(consumerTag: queueName);
 var c = consumer.listen((AmqpMessage msg) {});
 c.onData((data) {
     yield data.payloadAsString;
 })

After the first messages read no more messages are received

I just did a quick test with this package (version 0.0.1) and the following code. After the first message (or bunch of messages) no more messages are received and the rabbitmq queue grows up.

import "package:dart_amqp/dart_amqp.dart";

main() async {
  // auto-connect to localhost:5672 using guest credentials
  Client client = new Client();
  final channel = await client.channel();
  final queue = await channel.queue("test", durable: true);
  final consumer = await queue.consume();
  consumer.listen((message) {
    // Get the payload as a string
    print(" [x] Received string: ${message.payloadAsString}");

    // Or unserialize to json
    print(" [x] Received json: ${message.payloadAsJson}");

    // Or just get the raw data as a Uint8List
    print(" [x] Received raw: ${message.payload}");

    message.ack();
  });
}

Show me an error when i am using dart_amqp with bloc

E/flutter (25971): [ERROR:flutter/lib/ui/ui_dart_state.cc(209)] Unhandled Exception: Unhandled error ConnectionFailedException: Could not connect to mustang.rmq.cloudamqp.com:5672 after 1 attempts. Giving up occurred in Instance of 'OneToOneMatchLiveScreenBloc'.
E/flutter (25971): 
E/flutter (25971): #0      BlocBase.onError.<anonymous closure> (package:bloc/src/bloc.dart:743:7)
E/flutter (25971): #1      BlocBase.onError (package:bloc/src/bloc.dart:744:6)
E/flutter (25971): #2      _rootRunBinary (dart:async/zone.dart:1452:47)

Client timeout handling

Hi,

Any example of handling the client timeout?? and also can we set the timeout length manually??

Thanks.

Any plans to implement heartbeats?

Are you guys having any plans to implement heartbeats?

This is pretty necessary while we keep having lots of zombie connections on the server side!

TLS support

Is there any plans of adding TLS support?

If so, is there a timeline?

How to Confirm Publish e.g. Message Sent Successfully?

FIrst off, thanks for all your work on dart_amqp. You have saved me countless hours!

To achieve high reliability, I want to confirm i.e. acknowledge both message send and message consume. My question is how do you confirm that a message has been successfully sent/published?

I have read the RabbitMQ docs at https://www.rabbitmq.com/confirms.html#publisher-confirms and have been unable to make this work. I have tried using [channel.ack()] and see no properties as recommended in your docs to instead use [MessageProperties], but I have been unsuccessful. Additionally, there is no doc on the queue [noWait] flag, which if I set to true "hangs" my app. What is the purpose of [noWait]?

Thanks in advance for any assistance with these questions.

HELP: executing messages only one time

i want to perform a simple task

run consumer in background for a queue, and pick 5 messages at a time and perform operation on each. how to achieve this?

Publishing on default exchange?

Hi. I tried to use your library to put a message on the default exchange (which by convention has the empty string as a name) of my server. However, your code does not allow for this (line 547 of client/impl/channel_impl.dart deliberately throws an exception).

Is there a reason the library does not allow this, and is there any other way to publish on the default exchange?

Thanks.

`TypeError` thrown in `_ClientImpl._handleException`

I don't know the how to reproduce this but we just saw this in our logs:

type '_CastError' is not a subtype of type 'Exception'


package:dart_amqp/src/client/impl/client_impl.dart 225          _ClientImpl._handleException
package:dart_amqp/src/client/impl/client_impl.dart 189          _ClientImpl._handleMessage
dart:async                                                      _EventSinkWrapper.add
package:dart_amqp/src/protocol/io/amqp_message_decoder.dart 42  AmqpMessageDecoder.handleData
dart:async                                                      _EventSinkWrapper.add
package:dart_amqp/src/protocol/io/raw_frame_parser.dart 79      RawFrameParser.handleData

It seems like there might be two issues here:

  • _handleException accepts a dynamic error but is trying to add it to the error stream which is typed Exception
  • What in the _handleMessage is throwing the CastError

Not Support AMQP 1.0.0

I got this error when i try to use ActiveMQ

E/flutter (29276): [ERROR:flutter/lib/ui/ui_dart_state.cc(177)] Unhandled Exception: FatalException: Could not negotiate a valid AMQP protocol version. Server supports AMQP 1.0.0

Expose client socket to handle errors

Hi,

Is it possible to expose the socket in ClientImpl to handle errors? At present, if the client disconnects after establishing a connection, there is no way of knowing. If the socket could be exposed, we could add on socket events to handle such events and re-establish the connection.

Is this possible? If not, are there better ways of doing this? Thanks!

Not working on release mode (flutter)

Hello everyone!

I am trying to develop an app using dart_amqp and it is working fine when in debug mode.
But when I run the app on release mode, it can't connect to my Rabbit server:

ectionFailedException: Could not connect to MYSERVERADDRESS:5672 after 1 attempts. Giving up

Anyone have a clue about that could be causing this?

client unexpectedly closed TCP connection

After a weekend without traffic on a rabbitmq connection the first publication leads to the following message in rabbitmq logs:

=INFO REPORT==== 22-Mar-2019::08:04:05 ===
connection <0.580.0> (10.233.77.108:54878 -> 10.233.81.213:5672): user 'guest' authenticated and granted access to vhost '/'

=WARNING REPORT==== 25-Mar-2019::07:54:25 ===
closing AMQP connection <0.580.0> (10.233.77.108:54878 -> 10.233.81.213:5672, vhost: '/', user: 'guest'):
client unexpectedly closed TCP connection

I don't really know why this happens because my dart code doesn't display anything in its logs (and it doesn't crash) although I set a errorListener:

      rabbitmqClient = amqp.Client(
        settings: amqp.ConnectionSettings(
          host: config.rabbitmq.host,
          port: config.rabbitmq.port,
          virtualHost: config.rabbitmq.virtualHost,
          authProvider: amqp.PlainAuthenticator(
            config.rabbitmq.user,
            config.rabbitmq.password,
          ),
        ),
      )..errorListener((e) {
          _log.warning('rabbit client error: $e');
          if (e is amqp.FatalException) {
            _log.severe('shutting down because of $e');
            exit(1);
          }
        });

Unknown disconnection from a queue

Hi,

I'm using dart_ampq to listen a queue like in your example. I am testing the application on an android emulator.
After a while, the application no longer listens to the queue and does so without invoking any error or without executing any print (see code below)

ConnectionSettings settings = new ConnectionSettings (
        port: port,
        host: host,
        authProvider: new PlainAuthenticator (userName, password))

 _client = Client(settings: settings);

try {
      _client
          .channel ()
          .then ((Channel channel) => channel.queue (queueName, durable: true))
          .then ((Queue queue) => queue.consume ())
          .then (
            (Consumer consumer) => consumer.listen ((AmqpMessage message) {
              // do stuff
            }, onDone: () {
              print ("onDone");
            }, onError: (e, k) {
              print ("onError");
            }),
          )
          .whenComplete (() {
        print ("whenComplete");
      }). then ((t) {
        print ("then");
      });
    } on ConnectionException catch (e) {
      print ("ConnectionException $e");
    }

Am I doing something wrong ?

amqp connection lost, and recreate multiples consumers in same queue

I have been used this lib, but after a while the connection auto close and when start it again it connect with another consumer in queue,
I can publish in another queues but it stop consuming. in order to finger out what is happen the solution is complete delete the queue and restart it again. I read in pub.dev that
the driver does not currently support recovering client topologies when re-establishing connections. This feature may be implemented in a future version.

is that what is happening ?
how can I try implement this an check is there is an stable connection or if the queue has consumers?
how can I persist the consumer to avoid this?
thanks

class Amqp {

  late bool isRunning;
  late Client client;
  late SoundsService soundsService;

  Amqp() {

    isRunning = false;
    ConnectionSettings settings = ConnectionSettings(
      virtualHost:"/",
      host: "rabbitmqhost",
      port: rabbitmqport,
      authProvider: const PlainAuthenticator(
          "rabbitmqusers",
          "rabbitmqpass"
      ),
    );
    client        = Client(settings: settings);
    soundsService = SoundsService();
  }

  getClient() {
    return client;
  }

  start() async {
    print("AMQP::starting");

    //consume();

    client.connect().then( (defaultChannel) => {
      consume()
    }).catchError((onError) => {
      print("ERROR: $onError"),
    });

    return true;
  }

  close() async {
    isRunning = false;
    print("isRunning: $isRunning");
    client.channel().then((channel) => {
      print("amqp::channel::closed"),
      channel.queue(CurrentInfo.queue, durable: true).then((Queue queue) => {
        queue.delete().then((_) => {
          print("amqp::queue::closed"),
          client.close().then((_) => {
            print("amqp::client::closed"),
          })
        })
      }),
      channel.close()
    });
    client.close();
    return true;
  }

  void deleteQueue() {
    client.channel().then((channel) => {
      channel.queue(CurrentInfo.queue, durable: true).then((Queue queue) => {
        queue.delete().then((_) => {
          print("FORCE::amqp::queue::deleted"),
        })
      }),
    });
  }

  /// *************************************
  /// consumer
  consume() async {

    isRunning = true;

    print("AMQP::connected::starting-consume");
    print("isRunning: $isRunning");

    Channel channel   = await client.channel();

    Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);

    Queue queue       = await channel.queue(CurrentInfo.queue, durable: true);
    queue.bind(exchange, CurrentInfo.queue);

    Consumer consumer = await queue.consume();
    consumer.listen( (AmqpMessage message) {

      var payload = Payload.fromJson(json.decode(message.payloadAsString));
      if ( payload.action == "notification" ) {
        print("AMQP::consume::action::notification");
        print(payload.toJson());
      }

      if ( payload.action == 'answer-order' ) {
        print("AMQP::consume::action::answer-order");
        print(payload.toJson());

        Vibration.vibrate(duration: 10000, intensities: [100, 200, 100, 200, 100, 200, 100, 200, 100, 200]);
        showActivityBottomSheet(AnswerOrderDetails.fromJson(payload.data));

       });
      }

      /**
          print(" [x] Received:: payloadString :: ${message.payloadAsString}");
          print(" [x] Received:: payloadJson   :: ${message.payloadAsJson}");
          print(" [x] Received:: routingKey    :: ${message.routingKey}");
       */

    });
  }


  /// *************************************
  /// publisher functions
  ///
  /// users/track
  publishUpdateLocation(double lat,double lng, String usersId,  String status) async {
    Channel channel = await client.channel();
    Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);
    var position = {
      'usersId': usersId,
      "lat": lat,
      "lng": lng,
      "status": status
    };
    print("POSITION::DATA:: $position");
    exchange.publish(json.encode(position), 'position-listener');
    
  }

  /// users/track/orders
  publishUpdateLocationTrackOrder(double lat,double lng, String usersId, String ordersId, String status) async {
    Channel channel = await client.channel();
    Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);
    var position = {
      'usersId':  usersId,
      'ordersId': ordersId,
      "lat": lat,
      "lng": lng,
      "status": status
    };
    exchange.publish(json.encode(position), 'position-orders-listener');
   
  }

  /// answer-order
  publishAnswerOrder(String answer, String usersId, String ordersId) async {
    Channel channel = await client.channel();
    Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);
    var data = {
      'usersId':  usersId,
      "ordersId": ordersId,
      "answer":   answer
    };
    exchange.publish(json.encode(data), 'answer-orders-listener');
    print("ANSWER::ORDER:: $data");
   
  }

  ///
  publishInteractOrder(String type, String usersId, String ordersId) async {
    Channel channel = await client.channel();
    Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);
    var data = {
      'usersId'  : usersId,
      "ordersId" : ordersId,
      "type"     : type // collected, delivered,
    };
    exchange.publish(json.encode(data), 'interact-orders-listener');
    print("INTERACT::ORDER:: $data");
   
  }



}

heartbeat

Hello. When will you do heartbeat in your amqp client?

Stack Overflow in Client

I am testing the performance of the dart_amqp client. I am publishing 100,000 messages in my test, and then receiving them in a separate client. I am basically using the code from your readme example, except that I added a loop to publish the messages:

`Client publisher = new Client(settings : settings);

publisher
.channel()
.then((Channel channel) {
channel.queue('logs').then((Queue q) {

  channel.exchange("logs", ExchangeType.FANOUT)
      .then((Exchange exchange) {
    q.bind(exchange,'logs');
    int x = 0;
    while(x < 100000) {
    exchange.publish("Testing 1-2-3", 'logs');
      x++;
    }
    return publisher.close();
  });`

My client code is exactly the same as your readme example.

When I start the publisher and then start the server, I begin to receive messages, but then, after processing 4500 messages, I get a stack overflow exception:

Unhandled exception:
Stack Overflow
#0 _startMicrotaskLoop (dart:async/schedule_microtask.dart:51)
#1 _runPendingImmediateCallback (dart:isolate-patch/isolate_patch.dart:96)
#2 _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:149)
#3 _runPendingImmediateCallback (dart:isolate-patch/isolate_patch.dart:96)
#4 _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:149)
#5 RawFrameParser.handleData (package:dart_amqp/src/protocol/io/raw_frame_parser.dart:79:9)
#6 RawFrameParser.handleData (package:dart_amqp/src/protocol/io/raw_frame_parser.dart:79:9)
#7 RawFrameParser.handleData (package:dart_amqp/src/protocol/io/raw_frame_parser.dart:79:9)
#8 RawFrameParser.handleData (package:dart_amqp/src/protocol/io/raw_frame_parser.dart:79:9)
#9 RawFrameParser.handleData (package:dart_amqp/src/protocol/io/raw_frame_parser.dart:79:9)

...
...
#10 _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:341)
#11 _BufferingStreamSubscription._add (dart:async/stream_impl.dart:270)
#12 _StreamController&&_SyncStreamControllerDispatch._sendData (dart:async/stream_controller.dart:744)
#13 _StreamController._add (dart:async/stream_controller.dart:616)
#14 _StreamController.add (dart:async/stream_controller.dart:562)
#15 _RawSocket._RawSocket. (dart:io-patch/socket_patch.dart:1215)
#16 _NativeSocket.issueReadEvent.issue (dart:io-patch/socket_patch.dart:749)
#17 _microtaskLoop (dart:async/schedule_microtask.dart:41)
#18 _startMicrotaskLoop (dart:async/schedule_microtask.dart:50)
#19 _runPendingImmediateCallback (dart:isolate-patch/isolate_patch.dart:96)
#20 _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:149)

Am I doing something wrong here?

Add option not to declare queue before use

In my use case, the user has read access to the queue but not write or config (RabbitMQ hosted by Amazon) and should not need config access to read from a queue. Trying to access the queue with channel.queue(name) results in:

Caught error: ChannelException(ACCESS_REFUSED): ACCESS_REFUSED - access to queue 'queue' in vhost 'vhost' refused for user 'user'

heartbeat support

In dart_amqp-0.1.1 I can see a TuningSettings.heartbeatPeriod but this property doesn't seem to be used.

Is it supposed to work?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.