Git Product home page Git Product logo

webman-rabbitmq's Introduction

trolley

webman-rabbitmq's People

Contributors

chaz6chez avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar

webman-rabbitmq's Issues

Event-loop:Workerman\Events\Revolt不可用

报错如下:
Error: Call to undefined method Workerman\Events\Revolt::add() in D:\EServer-3.5.3-win\core\www\ld-game-webman-api\vendor\workerman\rabbitmq\src\Client.php:158
Stack trace:
#0 D:\EServer-3.5.3-win\core\www\ld-game-webman-api\vendor\workbunny\webman-rabbitmq\src\Connection.php(93): Workerman\RabbitMQ\Client->connect()
#1 D:\EServer-3.5.3-win\core\www\ld-game-webman-api\vendor\workbunny\webman-rabbitmq\src\Builders\QueueBuilder.php(64): Workbunny\WebmanRabbitMQ\Connection->consume(Object(Workbunny\WebmanRabbitMQ\BuilderConfig))
#2 D:\EServer-3.5.3-win\core\www\ld-game-webman-api\support\helpers.php(432): Workbunny\WebmanRabbitMQ\Builders\QueueBuilder->onWorkerStart(Object(Workerman\Worker))
#3 D:\EServer-3.5.3-win\core\www\ld-game-webman-api\support\helpers.php(470): worker_bind(Object(Workerman\Worker), Object(process\workbunny\rabbitmq\RestyBuilder))
#4 D:\EServer-3.5.3-win\core\www\ld-game-webman-api\vendor\workerman\workerman\src\Worker.php(2443): {closure}(Object(Workerman\Worker))
#5 D:\EServer-3.5.3-win\core\www\ld-game-webman-api\vendor\workerman\workerman\src\Worker.php(1451): Workerman\Worker->run()
#6 D:\EServer-3.5.3-win\core\www\ld-game-webman-api\vendor\workerman\workerman\src\Worker.php(1370): Workerman\Worker::forkWorkersForWindows()
#7 D:\EServer-3.5.3-win\core\www\ld-game-webman-api\vendor\workerman\workerman\src\Worker.php(572): Workerman\Worker::forkWorkers()
#8 D:\EServer-3.5.3-win\core\www\ld-game-webman-api\runtime\windows\start_RestyBuilder.php(23): Workerman\Worker::runAll()
#9 {main}

Bug: Call to a member function publish() on null

问题描述

服务首次启动调用MQ,异步发送多次任务的时候会报错, 此时控制台只会输出一个信息,第一次请求就不会出问题了

程序代码

        for ($i=1; $i <= 3; $i++){
            async_publish(TestBuilder::instance(), date('Y-m-d H:i:s').'_______'.$i);
        }

报错信息

{
    "code": 0,
    "msg": "Call to a member function publish() on null",
    "data": {
        "request_url": "GET //127.0.0.1:8666/index/mq",
        "timestamp": "2023-02-24 11:17:05",
        "client_ip": "127.0.0.1",
        "request_param": [],
        "error_message": "Call to a member function publish() on null",
        "error_trace": [
            "#0 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workbunny\\webman-rabbitmq\\src\\helpers.php(56): Workbunny\\WebmanRabbitMQ\\Connection->publish()",
            "#1 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\app\\index\\controller\\Index.php(43): Workbunny\\WebmanRabbitMQ\\async_publish()",
            "#2 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\webman-framework\\src\\App.php(318): app\\index\\controller\\Index->mq()",
            "#3 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\webman-framework\\src\\App.php(348): Webman\\App::Webman\\{closure}()",
            "#4 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\webman\\log\\src\\Middleware.php(58): Webman\\App::Webman\\{closure}()",
            "#5 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\webman-framework\\src\\App.php(340): Webman\\Log\\Middleware->process()",
            "#6 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\webman\\cors\\src\\CORS.php(12): Webman\\App::Webman\\{closure}()",
            "#7 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\webman-framework\\src\\App.php(340): Webman\\Cors\\CORS->process()",
            "#8 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\app\\common\\middleware\\RequestMonitoring.php(30): Webman\\App::Webman\\{closure}()",
            "#9 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\webman-framework\\src\\App.php(340): app\\common\\middleware\\RequestMonitoring->process()",
            "#10 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\webman-framework\\src\\App.php(167): Webman\\App::Webman\\{closure}()",
            "#11 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\workerman\\Connection\\TcpConnection.php(646): Webman\\App->onMessage()",
            "#12 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\workerman\\Events\\Select.php(311): Workerman\\Connection\\TcpConnection->baseRead()",
            "#13 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\workerman\\Worker.php(1479): Workerman\\Events\\Select->loop()",
            "#14 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\workerman\\Worker.php(1399): Workerman\\Worker::forkWorkersForWindows()",
            "#15 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\workerman\\Worker.php(560): Workerman\\Worker::forkWorkers()",
            "#16 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\webman-framework\\src\\support\\App.php(131): Workerman\\Worker::runAll()",
            "#17 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\start.php(4): support\\App::run()",
            "#18 {main}"
        ],
        "file": "D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workbunny\\webman-rabbitmq\\src\\Connection.php",
        "line": 224
    }
}

操作系统及workerman/webman等框架组件具体版本

Windows:8
CentOS:6.*
PHP: 8.0.2
webman-rabbitmq:1.0.9
webman-framework:1.4.3

关于消费队列退出优雅关闭连接

在本地启了一个docker运行rabbitmq,通过实时跟踪日志查看到,当webman退出时,调用消费队列进程的onWorkerStop方法时,虽然FasterBuilder中针对连接处理$this->_connection->close();,但发现rabbitmq这边有warning日志:

2022-09-04 02:55:53.458 [warning] <0.20366.1> closing AMQP connection <0.20366.1> (172.19.0.1:60280 -> 172.19.0.2:5672, vhost: '/', user: 'rabbitmq'):
client unexpectedly closed TCP connection

同时发现如果在发布消息时,通过调用helpers的async_publish方法,设置close参数为true时,连接是正常关闭的:

2022-09-04 03:02:36.367 [info] <0.20824.1> connection <0.20824.1> (172.19.0.1:60316 -> 172.19.0.2:5672): user 'rabbitmq' authenticated and granted access to vhost '/'
******
2022-09-04 03:02:36.421 [info] <0.20824.1> closing AMQP connection <0.20824.1> (172.19.0.1:60316 -> 172.19.0.2:5672, vhost: '/', user: 'rabbitmq')

请教如何实现topic模式,exchange对应多个队列对应各自消费者

感谢作者提供简单好用的插件!

当前使用1.0版本

我想实现的效果是:

  • 实现topic模式, 比如exchange = mqTopic ,
  • 这个交换机有两个队列mqTopic_1mqTopic_2,
  • 两个队列分别通过routing_key = task.1 (mqTopic_1)routing_key = task.2 (mqTopic_2)绑定
  • 每个队列有各自的消费者进程去消费

我的做法:

  • 通过php webman workbunny:rabbitmq-builder mqTopic 2创建了builder
  • 投递消息使用sync_publish() 目前情况是投递消息可以到达,但是无法消费
$b = MqTopicBuilder::instance();
$b->setMessage(new Message([
         'exchange_name'     => 'mqTopic',
          'exchange_type'     => Constants::TOPIC,
          'queue_name'        => 'mqTopic_'.$request->get('queue_name', 0), //对应mqTopic_1 mqTopic_2
          'routing_key'       => $request->get('routing_key',get_called_class()), //对应 task.1 task.2
          'consumer_tag'      => 'mqTopic',
          'prefetch_size'     => 0,
          'prefetch_count'    => 30,
          'is_global'         => false,
      ]));
      sync_publish($b, json_encode($data));

请问我该如何实现?如果有demo最好!

在次感谢作者无私奉献!

进程停止后导致event-loop内还有未执行的事件

由于与rabbitmq-server的通讯工作是异步的,由workerman event-loop接管所有事件的执行工作;
当进程杀死时,event-loop也被销毁,event-loop内可能存在还未执行的事件,从而导致一些问题,如:重复消费、发送失败。

相似问题: #11

2.x rc版本同步发布队列任务返回异常

2.0beta-rc2版本
sync_publish方法执行后提示错误:
Error: Call to a member function then() on bool in ~/x/vendor/workbunny/webman-rabbitmq/src/Connection.php:204

同步方法应该返回的是布尔值不是PromiseInterface,所以不可以继续链式执行->then等promise相关操作了。

如果是用法不对请有空指正,感谢

关于消息发送和接收的稳定性优化

推送:sync_publish(Builder::instance(), $params, null, true); 说明文档中,最后的close 参数,最好默认是true。 如果该方法应用在 controller 部分,这个会因为没有固定的 心跳监听而造成大量的发布失败。 虽然每次都会重新创建连接,那也是稳定和高效的。

process 里的消息接收。建议加入重连检查。有时候网络断开造成的消费者丢失没有重连。 或者搞个重连开关。

`
public function onWorkerStart(Worker $worker): void
{
parent::onWorkerStart($worker);

    Timer::add(10, function () {
        $this->checkConnection();
    });

}

private function checkConnection() {
    try {
        if($this->connection()->client()->isConnected() == false) {
            $this->logger->debug('Reconnect');
            $this->connection()->consume($this->getMessage());
        }else {
            $this->logger->debug('Connection is OK');
        }
    } catch (\Exception $e) {
        $this->logger->error($e->getMessage());
    }
}

`

【workbunny/webman-rabbitmq】消费者超时导致重复消费

问题描述

首先感谢 workbunny 提供这么一款很好用的插件。使用【workbunny】RabbitMQ客户端,插件地址:https://www.workerman.net/plugin/67, 在消费者中如果有阻塞并超过一定时间,就会导致重复消费。

框架及插件版本

"workerman/webman-framework": "^1.5.0"
"workbunny/webman-rabbitmq": "^1.0"

复现代码

生产者:IndexController.php

image

workbunny rabbitmq配置:app.php
image

消费者:TestBuilder.php
image

关键日志打印

image

关键日志说明

通过日志可以看到,在出现错误前,是有消息消费成功并返回了 ACK。在出现错误后,相关进程重启,导致消息开始重新消费,以前返回的 ACK 似乎无效?RabbitMQ的控制台,始终显示有对应数量的信息处于 Unacked 状态。

image

只发送消息,不消费

我们项目的消费者在另一个系统已经建好,现在我只想发送消息不消费,应该怎么操作呢?

2.x版本存在问题

2.x版本存在很多问题,问题如下:

  1. 文档有误:
    (1)在创建消费者的延迟QueueBuilder时,如 "./webman workbunny:rabbitmq-builder test --delayed--mode=queue" 缺少空格
    (2)移除QueueBuilder时,如 "./webman workbunny:rabbitmq-remove test --mode=queue",提示没有 --mode 选项

  2. 创建延时QueueBuilder,投递信息有误,以下为复现步骤:
    (1)版本如下:

"workerman/webman-framework": "1.4.7",
"workbunny/webman-rabbitmq": "^2.1"

(2)创建延时QueueBuilder:
/webman workbunny:rabbitmq-builder test --delayed --mode=queue
(3)投递消息

$taskDelayedBuilder = TestBuilderDelayed::instance();
$taskDelayedMsg = json_encode([ 'id'=>1111 ], JSON_UNESCAPED_UNICODE);
$taskDelayedHeader['x-delay'] = 3000;
$taskDelayedResult = sync_publish($taskDelayedBuilder, $taskDelayedMsg, null, $taskDelayedHeader);

提示:
Invalid publish.
(4)定位问题:vendor/workbunny/webman-rabbitmq/src/helpers.php
image
(5)尝试解决:
将 TestBuilderDelayed 里的 $exchangeType 值修改为 "Constants::DELAYED" ,重启后发现不会创建对应的消费者,且投递时第一次投递失败,后面可以投递成功,但不会消费

关于helpers提供的消息发布方法

最近在看rabbitmq相关的组件,在尝试使用这个组件时,关于helpers下的两个方法有个疑问,如下

 * 同步生产
 * @param FastBuilder $builder
 * @param string $body
 * @param array|null $headers
 * @param bool $close
 * @return bool
 */
function sync_publish(FastBuilder $builder, string $body, ?array $headers = null, bool $close = false) : bool
{
    $message = $builder->getMessage();
    if(
        ($message->getExchangeType() !== Constants::DELAYED and $headers['x-delay'] ?? 1) or
        ($message->getExchangeType() === Constants::DELAYED and !($headers['x-delay'] ?? 0))
    ){
        throw new WebmanRabbitMQException('Invalid publish. ');
    }
    $message->setBody($body);
    if($headers !== null){
        $message->setHeaders(array_merge($message->getHeaders(), $headers));
    }
    return $builder->syncConnection()->publish($message, $close);
}

若这里$builder是普通队列,则$message的ExchangeType是Constants::DIRECT,当$headers不传递时,默认为null,那这里不是始终抛WebmanRabbitMQException了?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.