hyperf / gotask Goto Github PK
View Code? Open in Web Editor NEW⚡️ A lightning speed replacement for Swoole TaskWorker in Go
License: MIT License
⚡️ A lightning speed replacement for Swoole TaskWorker in Go
License: MIT License
版本:hyperf2.0
php bin/hyperf.php vendor:publish -vvv
In Input.php line 76:
[Symfony\Component\Console\Exception\RuntimeException]
Not enough arguments (missing: "package").
Exception trace:
at /home/aa/project/vvapi/vendor/symfony/console/Input/Input.php:76
Symfony\Component\Console\Input\Input->validate() at /home/aa/project/vvapi/vendor/symfony/console/Command/Command.php:253
Symfony\Component\Console\Command\Command->run() at /home/aa/project/vvapi/vendor/symfony/console/Application.php:929
Symfony\Component\Console\Application->doRunCommand() at /home/aa/project/vvapi/vendor/symfony/console/Application.php:264
Symfony\Component\Console\Application->doRun() at /home/aa/project/vvapi/vendor/symfony/console/Application.php:140
Symfony\Component\Console\Application->run() at /home/aa/project/vvapi/bin/hyperf.php:21
{closure}() at /home/aa/project/vvapi/bin/hyperf.php:22
I'm exploring the usage of the gotask package in my project and I'm encountering a scenario where I need to execute heavy processes using Go tasks. However, I'm looking for a feature that allows asynchronous communication between the code triggering the Go tasks and the tasks themselves. This feature would enable me to receive partial results or updates from the process as it progresses, similar to a "yield" mechanism in asynchronous languages.
If possible, I would appreciate seeing examples of how this functionality can be implemented, whether by using existing features within the gotask package or through suggestions on extending the code to achieve this goal. Below is an example of what I have in mind:
package main
import (
"log"
"time"
"github.com/hyperf/gotask/v2/pkg/gotask"
)
// App sample
type App struct{}
// ProcessBatch executes batch processing and sends asynchronous updates.
func (a *App) ProcessBatch(input interface{}, r *string) error {
for i := 1; i <= 10; i++ {
// Simulating heavy processing
time.Sleep(1 * time.Second)
// Sending asynchronous update
gotask.AsyncUpdate(fmt.Sprintf("Processed %d items", i))
}
*r = "Processing completed"
return nil
}
func main() {
if err := gotask.Register(new(App)); err != nil {
log.Fatalln(err)
}
if err := gotask.Run(); err != nil {
log.Fatalln(err)
}
}
Implementing this asynchronous communication feature between Go tasks and the triggering code would open up new possibilities for utilizing the gotask package. Furthermore, this feature would enhance the utility of the library for heavy processing scenarios.
Thank you in advance for your attention and assistance!
gotask2.1.1版,报错:unable to establish connection unix:///tmp/vvapi_5f586fa5958b8.sock
Hi
Can gotask support mongodb transactions? Has anyone done that before in hyperf?
db version v3.0.15-1.10
$col->find(['gender' => 'male'], ['skip' => 1, 'limit' => 1]);
当 limit
等于 1 的时候,正常返回一笔,其他值的时候,返回全部记录,limit 失效。
gotask 3.0
gotask/src/Wrapper/ConfigWrapper.php 34行
gotask/src/Wrapper/LoggerWrapper.php 24行
两处语法错误,希望能尽快修复
Problem 1
- hyperf/gotask[dev-dependabot/composer/spiral/goridge-tw-2.4or-tw-4.0, dev-master, v3.0.0] require symfony/event-dispatcher ^6.3 -> found symfony/event-dispatcher[v6.3.0-BETA1, ..., 6.4.x-dev] but the package is fixed to v7.0.0 (lock file version) by a partial update and that version does not match. Make sure you list it as an argument for the update command.
- hyperf/gotask[v1.0.0, ..., v1.0.1] require hyperf/command ^1.1 -> found hyperf/command[v1.1.0, ..., 1.1.x-dev] but it conflicts with your root composer.json require (~3.1.0).
- hyperf/gotask[0.0.3, ..., v0.3.0, v1.0.2, v2.0.0, ..., v2.1.1] require hyperf/pool ^1.1 -> found hyperf/pool[v1.1.0, ..., 1.1.x-dev] but the package is fixed to v3.1.0 (lock file version) by a partial update and that version does not match. Make sure you list it as an argument for the update command.
- hyperf/gotask[v2.2.0, ..., v2.2.3] require hyperf/pool ~2.0.0 -> found hyperf/pool[v2.0.0, v2.0.3, v2.0.12, 2.0.x-dev] but the package is fixed to v3.1.0 (lock file version) by a partial update and that version does not match. Make sure you list it as an argument for the update command.
- hyperf/gotask[dev-dependabot/composer/friendsofphp/php-cs-fixer-tw-2.14or-tw-3.0, dev-perm, v2.1.x-dev, ..., v2.2.10] require hyperf/pool ^2.0.0 -> found hyperf/pool[v2.0.0, ..., 2.2.x-dev] but the package is fixed to v3.1.0 (lock file version) by a partial update and that version does not match. Make sure you list it as an argument for the update command.
- hyperf/gotask v3.0.0-alpha requires symfony/event-dispatcher ^5.1 -> found symfony/event-dispatcher[v5.1.0-BETA1, ..., 5.4.x-dev] but the package is fixed to v7.0.0 (lock file version) by a partial update and that version does not match. Make sure you list it as an argument for the update command.
- Root composer.json requires hyperf/gotask * -> satisfiable by hyperf/gotask[dev-dependabot/composer/spiral/goridge-tw-2.4or-tw-4.0, dev-dependabot/composer/friendsofphp/php-cs-fixer-tw-2.14or-tw-3.0, dev-master, dev-perm, 0.0.3, ..., v0.3.0, v1.0.0, v1.0.1, v1.0.2, v2.0.0, ..., v2.2.10, v3.0.0-alpha, v3.0.0, 9999999-dev].
Use the option --with-all-dependencies (-W) to allow upgrades, downgrades and removals for packages currently locked to specific versions.
You can also try re-running composer require with an explicit version constraint, e.g. "composer require hyperf/gotask:*" to figure out if any version is installable, or "composer require hyperf/gotask:^2.1" if you know which you need.
想通过 fastmongo 这个组件包来操作mongo 安装和配置gotask 按照这个文档来的: https://github.com/Hyperf/gotask/wiki/Installation-&-Configuration
然后写demo的时候出现:
unable to establish connection unix:///xxx_61093a9f7fb39.sock: Connection refused (61)[137] in /xxx/vendor/hyperf/gotask/src/Relay/CoroutineSocketRelay.php
trace 信息如下:
#0 /xxx/vendor/hyperf/gotask/src/Relay/SocketTransporter.php(35): Hyperf\GoTask\Relay\CoroutineSocketRelay->connect()
#1 /xxx/vendor/spiral/goridge/src/RPC.php(48): Hyperf\GoTask\Relay\CoroutineSocketRelay->send('MongoProxy.Inse...', 20)
#2 /xxx/vendor/hyperf/gotask/src/IPC/SocketIPCSender.php(55): Spiral\Goridge\RPC->call('MongoProxy.Inse...', '.\v\x00\x00\x03Records\x00\xEB\n...', 4)
#3 /xxx/vendor/hyperf/gotask/src/GoTaskConnection.php(89): Hyperf\GoTask\IPC\SocketIPCSender->call('MongoProxy.Inse...', '.\v\x00\x00\x03Records\x00\xEB\n...', 4)
#4 /xxx/vendor/hyperf/gotask/src/GoTaskConnection.php(50): Hyperf\GoTask\GoTaskConnection->retry('call', Array, Object(Spiral\Goridge\Exceptions\PrefixException))
#5 /xxx/vendor/hyperf/gotask/src/SocketGoTask.php(41): Hyperf\GoTask\GoTaskConnection->__call('call', Array)
#6 /xxx/vendor/hyperf/gotask/src/GoTaskProxy.php(39): Hyperf\GoTask\SocketGoTask->call('MongoProxy.Inse...', '.\v\x00\x00\x03Records\x00\xEB\n...', 4)
#7 /xxx/vendor/hyperf/gotask/src/MongoClient/MongoProxy.php(101): Hyperf\GoTask\GoTaskProxy->call('MongoProxy.Inse...', '.\v\x00\x00\x03Records\x00\xEB\n...', 4)
#8 /xxx/vendor/hyperf/gotask/src/MongoClient/Collection.php(75): Hyperf\GoTask\MongoClient\MongoProxy->insertMany('.\v\x00\x00\x03Records\x00\xEB\n...')
#9 /xxx/runtime/container/proxy/App_Controller_Ab_MongoController.proxy.php(38): Hyperf\GoTask\MongoClient\Collection->insertMany(Array)
#10 /xxx/vendor/hyperf/http-server/src/CoreMiddleware.php(161): App\Controller\Ab\MongoController->index()
#11 /xxx/vendor/hyperf/http-server/src/CoreMiddleware.php(113): Hyperf\HttpServer\CoreMiddleware->handleFound(Object(Hyperf\HttpServer\Router\Dispatched), Object(Hyperf\HttpMessage\Server\Request))
#12 /xxx/vendor/hyperf/dispatcher/src/AbstractRequestHandler.php(65): Hyperf\HttpServer\CoreMiddleware->process(Object(Hyperf\HttpMessage\Server\Request), Object(Hyperf\Dispatcher\HttpRequestHandler))
#13 /xxx/vendor/hyperf/dispatcher/src/HttpRequestHandler.php(26): Hyperf\Dispatcher\AbstractRequestHandler->handleRequest(Object(Hyperf\HttpMessage\Server\Request))
#14 /xxx/vendor/hyperf/validation/src/Middleware/ValidationMiddleware.php(81): Hyperf\Dispatcher\HttpRequestHandler->handle(Object(Hyperf\HttpMessage\Server\Request))
#15 /xxx/vendor/hyperf/dispatcher/src/AbstractRequestHandler.php(65): Hyperf\Validation\Middleware\ValidationMiddleware->process(Object(Hyperf\HttpMessage\Server\Request), Object(Hyperf\Dispatcher\HttpRequestHandler))
#16 /xxx/vendor/hyperf/dispatcher/src/HttpRequestHandler.php(26): Hyperf\Dispatcher\AbstractRequestHandler->handleRequest(Object(Hyperf\HttpMessage\Server\Request))
#17 /xxx/vendor/hyperf/dispatcher/src/HttpDispatcher.php(40): Hyperf\Dispatcher\HttpRequestHandler->handle(Object(Hyperf\HttpMessage\Server\Request))
#18 /xxx/vendor/hyperf/http-server/src/Server.php(118): Hyperf\Dispatcher\HttpDispatcher->dispatch(Object(Hyperf\HttpMessage\Server\Request), Array, Object(Hyperf\HttpServer\CoreMiddleware))
看到issues 里面也有同样的错误,但是说试试新版本,2.2.7应该是目前最新的版本了。还请大佬帮忙解答下
运行一段时间后,报错 ,hyperf 版本是2.2最新版本
log.ERROR: Insert Document hybrid error 'Hyperf\Di\Exception\NotFoundException: No entry or class found for 'MongoProxy.InsertOne'(0) in /app/web/accept/vendor/hyperf/di/src/Container.php:79
Stack trace:
#0 /app/web/accept/vendor/hyperf/di/src/Container.php(127): Hyperf\Di\Container->make()
#1 /app/web/accept/vendor/hyperf/gotask/src/IPC/SocketIPCReceiver.php(130): Hyperf\Di\Container->get()
#2 /app/web/accept/vendor/hyperf/gotask/src/IPC/SocketIPCReceiver.php(85): Hyperf\GoTask\IPC\SocketIPCReceiver->dispatch()
#3 {main}' on 'Hyperf\GoTask\Relay\CoroutineSocketRelay'
public function __construct(ContainerInterface $container,MongoClient $client)
{
$this->container = $container;
$this->logger =$container->get(LoggerFactory::class)->get("log");
$this->database = $client->database('test');
}
public function insertOne(string $collect,array $data)
{
try{
return $this->database->collection($collect)->insertOne($data);
}catch (\Throwable $e){
$this->logger->error("Insert Document {$collect} ".$e->getMessage());
}
return null;
}
G:\go\pkg\mod\github.com\hyperf\gotask\[email protected]\pkg\gotask\server.go:182:11: undefined: syscall.Stat_t
G:\go\pkg\mod\github.com\hyperf\gotask\[email protected]\pkg\gotask\server.go:183:11: undefined: syscall.Stat
偶尔出现报错:unable to establish connection tcp://127.0.0.1:6012: Connection refused (111)
PHP版本:7.4.28
swoole:
Swoole => enabled
Author => Swoole Team <[email protected]>
Version => 4.8.5
Built => May 9 2022 18:04:04
coroutine => enabled with boost asm context
epoll => enabled
eventfd => enabled
signalfd => enabled
cpu_affinity => enabled
spinlock => enabled
rwlock => enabled
openssl => OpenSSL 1.0.2k-fips 26 Jan 2017
http2 => enabled
curl-native => enabled
pcre => enabled
zlib => 1.2.7
mutex_timedlock => enabled
pthread_barrier => enabled
futex => enabled
mysqlnd => enabled
async_redis => enabled
Directive => Local Value => Master Value
swoole.enable_coroutine => On => On
swoole.enable_library => On => On
swoole.enable_preemptive_scheduler => Off => Off
swoole.display_errors => On => On
swoole.use_shortname => Off => Off
swoole.unixsock_buffer_size => 8388608 => 8388608
gotask版本:v2.2.9
[WARNING] RemoteGoTask::__call failed, because unable to establish connection unix:///tmp/govmids_6020d03d7c92c.sock: No such file or directory (2)
Fatal error: Uncaught Spiral\Goridge\Exceptions\RelayException: No such file or directory (2) in /var/wwwroot/hyperf-gotask/vendor/hyperf/gotask/src/Relay/CoroutineSocketRelay.php:134
hyperf 2.0, gotask v2.2.3, 運行報錯:error 'rpc: can't find service MongoProxy.FindOne' on 'Hyperf\GoTask\Relay\CoroutineSocketRelay'
hyperf2.0,用的example里的代码:
$task = new SocketIPCSender('127.0.0.1:6001');
$rs = $task->call('MongoProxy.Find', ['Database' => 'test', 'Collection' => 'test', 'Filter' => ['a'=>2], 'Opts' => [['Skip' => 0, 'Limit' => 2]]]);
错误信息:
error 'base64Codec: invalid input, error found in #0 byte of ...|{"Database|..., bigger context ...|{"Database":"test","Collection":"test","Filter":{"|...' on 'Hyperf\GoTask\Relay\CoroutineSocketRelay'
错误信息:
unable to establish connection unix:///tmp/baohong_60113f552bdcf.sock: No such file or directory (2)
调用位置:
'socket_address' => \Hyperf\GoTask\ConfigProvider::address(),
'go2php' => [
'enable' => false,
'address' => \Hyperf\GoTask\ConfigProvider::address(),
],
报错来源于go这边的main函数中的
if err := gotask.Run(); err != nil {
log.Fatalln(err)
}
php: 8.0.20
swoole: 4.8.10
hyperf:2.2
新建一个Service
<?php
declare(strict_types=1);
namespace App\Service;
use App\Utils\Tools;
use Hyperf\Logger\LoggerFactory;
use Psr\Container\ContainerInterface;
use Hyperf\GoTask\IPC\SocketIPCSender;
use Hyperf\GoTask\MongoClient\MongoClient;
use Hyperf\GoTask\MongoClient\MongoProxy;
use Hyperf\Config\Config;
class MongoService
{
protected $container;
protected $logger;
protected $database;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
$this->logger =$container->get(LoggerFactory::class)->get("log");
$task = new SocketIPCSender(Tools::Yaml('app.sideCar.mongo'));
$client = new MongoClient(new MongoProxy($task), new Config([]));
$this->database = $client->database('robot');
}
//写入一条数据
public function insertOne(string $collect,array $data)
{
try{
return $this->database->collection($collect)->insertOne($data);
}catch (\Throwable $e){
$this->logger->error("Insert Document error ".$e->getMessage());
}
}
}
新建立一个Controller
<?php
declare(strict_types=1);
namespace App\Controller;
use Hyperf\HttpServer\Contract\RequestInterface;
use Hyperf\HttpServer\Contract\ResponseInterface;
use App\Service\MongoService;
class CallBackController {
public function index(RequestInterface $request, ResponseInterface $response,MongoService $client){
//记录所有请求的数据
$client->insertOne("collection",$request->all());
}
}
go的代码使用example的代码
报错 WARNING Server::check_worker_exit_status(): worker(pid=4018, id=240) abnormal exit, status=2, signal=0
请问下hyperf1.1对应的gotask版本是什么
github.com/hyperf/gotask/v2/pkg/gotask
找不到这个依赖项 v2是哪来的?
调用mongo时,报错
[WARNING] RemoteGoTask::__call failed, because unable to establish connection tcp://127.0.0.1:6001: Connection refused (111)
也不是端口的原因
hyperf/gotask
doens't have support to hyperf/process v3
Problem 1
- hyperf/gotask[v2.2.0, ..., v2.2.3] require hyperf/process ~2.0.0 -> found hyperf/process[v2.0.0, ..., 2.0.x-dev] but it conflicts with your root composer.json require (^3.0).
- hyperf/gotask[v2.2.4, ..., v2.2.10] require hyperf/process ^2.0.0 -> found hyperf/process[v2.0.0, ..., 2.2.x-dev] but it conflicts with your root composer.json require (^3.0).
unit test 中不使用 gotask
估計是沒有相關的Listener.
希望可以加入,
和
請問有沒有其他方法可以使用gotask
伪代码:
<?php
$insertMany = [
{ _id: 1, qty: 20 },
{ _id: 2, qty: 55 },
{ _id: 2, qty: 30 },
{ _id: 3, qty: 100},
{ _id: 3, qty: 120},
{ _id: 4, qty: 20},
{ _id: 5, qty: 30}
];
try {
$insertResult = $mongoTable->insertMany($insertMany, ['ordered' => false]);
printf("Inserted %d document(s)\n", count($insertResult->getInsertedIDs()));
} catch (ServiceException $e) {
print_r($e->getMessage());
//output like: error 'error while executing mongo command: bulk write error: [{[{E11000 duplicate key error collection: db_aaa.t_table index: _id_ dup key: { _id: "2"}} {E11000 duplicate key error collection: db_aaa.t_table index: _id_ dup key: { _id: "3" }}]}, {<nil>}]' on 'Hyperf\GoTask\Relay\CoroutineSocketRelay'
} catch (GoridgeException $e) {
//other logic
}
ordered是false的时候,会继续插入非重复的数据,但是ServiceException没有提供实际写入的数据,可靠性不好控制。
// Hyperf\GoTask\ConfigProvider
// line 72
public static function address()
{
// fix here to 'BASE_PATH'
if (defined('BASE_PATH')) {
$root = BASE_PATH . '/runtime';
} else {
$root = '/tmp';
}
$appName = env('APP_NAME');
$socketName = $appName . '_' . uniqid();
return $root . "/{$socketName}.sock";
}
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.