workbunny / webman-rabbitmq Goto Github PK
View Code? Open in Web Editor NEW🚀🐇 A PHP implementation of RabbitMQ Client for webman plugin.
Home Page: https://www.workerman.net/plugin/67
License: MIT License
🚀🐇 A PHP implementation of RabbitMQ Client for webman plugin.
Home Page: https://www.workerman.net/plugin/67
License: MIT License
应该是自动生成的build类代码中\$queueConfigs
应该是\$queueConfig
报错如下:
Error: Call to undefined method Workerman\Events\Revolt::add() in D:\EServer-3.5.3-win\core\www\ld-game-webman-api\vendor\workerman\rabbitmq\src\Client.php:158
Stack trace:
#0 D:\EServer-3.5.3-win\core\www\ld-game-webman-api\vendor\workbunny\webman-rabbitmq\src\Connection.php(93): Workerman\RabbitMQ\Client->connect()
#1 D:\EServer-3.5.3-win\core\www\ld-game-webman-api\vendor\workbunny\webman-rabbitmq\src\Builders\QueueBuilder.php(64): Workbunny\WebmanRabbitMQ\Connection->consume(Object(Workbunny\WebmanRabbitMQ\BuilderConfig))
#2 D:\EServer-3.5.3-win\core\www\ld-game-webman-api\support\helpers.php(432): Workbunny\WebmanRabbitMQ\Builders\QueueBuilder->onWorkerStart(Object(Workerman\Worker))
#3 D:\EServer-3.5.3-win\core\www\ld-game-webman-api\support\helpers.php(470): worker_bind(Object(Workerman\Worker), Object(process\workbunny\rabbitmq\RestyBuilder))
#4 D:\EServer-3.5.3-win\core\www\ld-game-webman-api\vendor\workerman\workerman\src\Worker.php(2443): {closure}(Object(Workerman\Worker))
#5 D:\EServer-3.5.3-win\core\www\ld-game-webman-api\vendor\workerman\workerman\src\Worker.php(1451): Workerman\Worker->run()
#6 D:\EServer-3.5.3-win\core\www\ld-game-webman-api\vendor\workerman\workerman\src\Worker.php(1370): Workerman\Worker::forkWorkersForWindows()
#7 D:\EServer-3.5.3-win\core\www\ld-game-webman-api\vendor\workerman\workerman\src\Worker.php(572): Workerman\Worker::forkWorkers()
#8 D:\EServer-3.5.3-win\core\www\ld-game-webman-api\runtime\windows\start_RestyBuilder.php(23): Workerman\Worker::runAll()
#9 {main}
服务首次启动调用MQ,异步发送多次任务的时候会报错, 此时控制台只会输出一个信息,第一次请求就不会出问题了
for ($i=1; $i <= 3; $i++){
async_publish(TestBuilder::instance(), date('Y-m-d H:i:s').'_______'.$i);
}
{
"code": 0,
"msg": "Call to a member function publish() on null",
"data": {
"request_url": "GET //127.0.0.1:8666/index/mq",
"timestamp": "2023-02-24 11:17:05",
"client_ip": "127.0.0.1",
"request_param": [],
"error_message": "Call to a member function publish() on null",
"error_trace": [
"#0 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workbunny\\webman-rabbitmq\\src\\helpers.php(56): Workbunny\\WebmanRabbitMQ\\Connection->publish()",
"#1 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\app\\index\\controller\\Index.php(43): Workbunny\\WebmanRabbitMQ\\async_publish()",
"#2 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\webman-framework\\src\\App.php(318): app\\index\\controller\\Index->mq()",
"#3 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\webman-framework\\src\\App.php(348): Webman\\App::Webman\\{closure}()",
"#4 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\webman\\log\\src\\Middleware.php(58): Webman\\App::Webman\\{closure}()",
"#5 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\webman-framework\\src\\App.php(340): Webman\\Log\\Middleware->process()",
"#6 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\webman\\cors\\src\\CORS.php(12): Webman\\App::Webman\\{closure}()",
"#7 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\webman-framework\\src\\App.php(340): Webman\\Cors\\CORS->process()",
"#8 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\app\\common\\middleware\\RequestMonitoring.php(30): Webman\\App::Webman\\{closure}()",
"#9 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\webman-framework\\src\\App.php(340): app\\common\\middleware\\RequestMonitoring->process()",
"#10 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\webman-framework\\src\\App.php(167): Webman\\App::Webman\\{closure}()",
"#11 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\workerman\\Connection\\TcpConnection.php(646): Webman\\App->onMessage()",
"#12 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\workerman\\Events\\Select.php(311): Workerman\\Connection\\TcpConnection->baseRead()",
"#13 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\workerman\\Worker.php(1479): Workerman\\Events\\Select->loop()",
"#14 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\workerman\\Worker.php(1399): Workerman\\Worker::forkWorkersForWindows()",
"#15 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\workerman\\Worker.php(560): Workerman\\Worker::forkWorkers()",
"#16 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\webman-framework\\src\\support\\App.php(131): Workerman\\Worker::runAll()",
"#17 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\start.php(4): support\\App::run()",
"#18 {main}"
],
"file": "D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workbunny\\webman-rabbitmq\\src\\Connection.php",
"line": 224
}
}
Windows:8
CentOS:6.*
PHP: 8.0.2
webman-rabbitmq:1.0.9
webman-framework:1.4.3
在本地启了一个docker运行rabbitmq,通过实时跟踪日志查看到,当webman退出时,调用消费队列进程的onWorkerStop方法时,虽然FasterBuilder中针对连接处理$this->_connection->close();
,但发现rabbitmq这边有warning日志:
2022-09-04 02:55:53.458 [warning] <0.20366.1> closing AMQP connection <0.20366.1> (172.19.0.1:60280 -> 172.19.0.2:5672, vhost: '/', user: 'rabbitmq'):
client unexpectedly closed TCP connection
同时发现如果在发布消息时,通过调用helpers的async_publish方法,设置close参数为true时,连接是正常关闭的:
2022-09-04 03:02:36.367 [info] <0.20824.1> connection <0.20824.1> (172.19.0.1:60316 -> 172.19.0.2:5672): user 'rabbitmq' authenticated and granted access to vhost '/'
******
2022-09-04 03:02:36.421 [info] <0.20824.1> closing AMQP connection <0.20824.1> (172.19.0.1:60316 -> 172.19.0.2:5672, vhost: '/', user: 'rabbitmq')
感谢作者提供简单好用的插件!
当前使用1.0版本
我想实现的效果是:
exchange = mqTopic
,mqTopic_1
和mqTopic_2
,routing_key = task.1 (mqTopic_1)
和routing_key = task.2 (mqTopic_2)
绑定我的做法:
php webman workbunny:rabbitmq-builder mqTopic 2
创建了builder
sync_publish()
目前情况是投递消息可以到达,但是无法消费$b = MqTopicBuilder::instance();
$b->setMessage(new Message([
'exchange_name' => 'mqTopic',
'exchange_type' => Constants::TOPIC,
'queue_name' => 'mqTopic_'.$request->get('queue_name', 0), //对应mqTopic_1 mqTopic_2
'routing_key' => $request->get('routing_key',get_called_class()), //对应 task.1 task.2
'consumer_tag' => 'mqTopic',
'prefetch_size' => 0,
'prefetch_count' => 30,
'is_global' => false,
]));
sync_publish($b, json_encode($data));
请问我该如何实现?如果有demo最好!
在次感谢作者无私奉献!
由于与rabbitmq-server的通讯工作是异步的,由workerman event-loop接管所有事件的执行工作;
当进程杀死时,event-loop也被销毁,event-loop内可能存在还未执行的事件,从而导致一些问题,如:重复消费、发送失败。
相似问题: #11
2.0beta-rc2版本
sync_publish方法执行后提示错误:
Error: Call to a member function then() on bool in ~/x/vendor/workbunny/webman-rabbitmq/src/Connection.php:204
同步方法应该返回的是布尔值不是PromiseInterface,所以不可以继续链式执行->then等promise相关操作了。
如果是用法不对请有空指正,感谢
PHP:8.1
workerman:4.1.14
推送:sync_publish(Builder::instance(), $params, null, true); 说明文档中,最后的close 参数,最好默认是true。 如果该方法应用在 controller 部分,这个会因为没有固定的 心跳监听而造成大量的发布失败。 虽然每次都会重新创建连接,那也是稳定和高效的。
process 里的消息接收。建议加入重连检查。有时候网络断开造成的消费者丢失没有重连。 或者搞个重连开关。
`
public function onWorkerStart(Worker $worker): void
{
parent::onWorkerStart($worker);
Timer::add(10, function () {
$this->checkConnection();
});
}
private function checkConnection() {
try {
if($this->connection()->client()->isConnected() == false) {
$this->logger->debug('Reconnect');
$this->connection()->consume($this->getMessage());
}else {
$this->logger->debug('Connection is OK');
}
} catch (\Exception $e) {
$this->logger->error($e->getMessage());
}
}
`
for ($i=1;$i<=50000;$i++){ sync_publish(TestBuilder::instance(),$msg,'',[],true); }
调用时,发现channel一直涨,如何在发布时释放掉channel
MQ消费者只开了2个进程,接口DemoController.php里demo()每访问一次,Connections、Channels就一直增长,详细见附件
MQ.zip
首先感谢 workbunny 提供这么一款很好用的插件。使用【workbunny】RabbitMQ客户端,插件地址:https://www.workerman.net/plugin/67, 在消费者中如果有阻塞并超过一定时间,就会导致重复消费。
"workerman/webman-framework": "^1.5.0"
"workbunny/webman-rabbitmq": "^1.0"
生产者:IndexController.php
通过日志可以看到,在出现错误前,是有消息消费成功并返回了 ACK。在出现错误后,相关进程重启,导致消息开始重新消费,以前返回的 ACK 似乎无效?RabbitMQ的控制台,始终显示有对应数量的信息处于 Unacked 状态。
看了一下sync_publish,async_publish 都需要创建builder,里面都有handler消费回调。如果只做投递,自己不消费,该如何调用?
我们项目的消费者在另一个系统已经建好,现在我只想发送消息不消费,应该怎么操作呢?
安装执行 ./webman workbunny:rabbitmq-list报错 There are no commands defined in the "workbunny" namespace.
php7.4.3,workbunny/webman-rabbitmq 1.0.9,Workerman version:4.1.0 ,请问如何解决?
How to use Publish/Subscribe provided by RabbitMQ?
2.X版本什么时候更新?
2.x版本存在很多问题,问题如下:
文档有误:
(1)在创建消费者的延迟QueueBuilder时,如 "./webman workbunny:rabbitmq-builder test --delayed--mode=queue" 缺少空格
(2)移除QueueBuilder时,如 "./webman workbunny:rabbitmq-remove test --mode=queue",提示没有 --mode 选项
创建延时QueueBuilder,投递信息有误,以下为复现步骤:
(1)版本如下:
"workerman/webman-framework": "1.4.7",
"workbunny/webman-rabbitmq": "^2.1"
(2)创建延时QueueBuilder:
/webman workbunny:rabbitmq-builder test --delayed --mode=queue
(3)投递消息
$taskDelayedBuilder = TestBuilderDelayed::instance();
$taskDelayedMsg = json_encode([ 'id'=>1111 ], JSON_UNESCAPED_UNICODE);
$taskDelayedHeader['x-delay'] = 3000;
$taskDelayedResult = sync_publish($taskDelayedBuilder, $taskDelayedMsg, null, $taskDelayedHeader);
提示:
Invalid publish.
(4)定位问题:vendor/workbunny/webman-rabbitmq/src/helpers.php
(5)尝试解决:
将 TestBuilderDelayed 里的 $exchangeType 值修改为 "Constants::DELAYED" ,重启后发现不会创建对应的消费者,且投递时第一次投递失败,后面可以投递成功,但不会消费
最近在看rabbitmq相关的组件,在尝试使用这个组件时,关于helpers下的两个方法有个疑问,如下
* 同步生产
* @param FastBuilder $builder
* @param string $body
* @param array|null $headers
* @param bool $close
* @return bool
*/
function sync_publish(FastBuilder $builder, string $body, ?array $headers = null, bool $close = false) : bool
{
$message = $builder->getMessage();
if(
($message->getExchangeType() !== Constants::DELAYED and $headers['x-delay'] ?? 1) or
($message->getExchangeType() === Constants::DELAYED and !($headers['x-delay'] ?? 0))
){
throw new WebmanRabbitMQException('Invalid publish. ');
}
$message->setBody($body);
if($headers !== null){
$message->setHeaders(array_merge($message->getHeaders(), $headers));
}
return $builder->syncConnection()->publish($message, $close);
}
若这里$builder
是普通队列,则$message
的ExchangeType是Constants::DIRECT,当$headers
不传递时,默认为null
,那这里不是始终抛WebmanRabbitMQException
了?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.