Git Product home page Git Product logo

gotask's Introduction

GoTask

English | 中文

Build Status

GoTask spawns a go process as a Swoole sidecar and establishes a bi-directional IPC to offload heavy-duties to Go. Think of it as a Swoole Taskworker in Go.

composer require hyperf/gotask

Feature

  • High performance with low footprint.
  • Based on Swoole 4 coroutine socket API.
  • Support Unix Socket, TCP and stdin/stdout pipes.
  • Support both PHP-to-Go and Go-to-PHP calls.
  • Automatic sidecar lifecycle management.
  • Correctly handle remote error.
  • Support both structural payload and binary payload.
  • Sidecar API compatible with net/rpc.
  • Baked-in connection pool.
  • Optionally integrated with Hyperf framework.

Perfect For

  • Blocking operations in Swoole, such as MongoDB queries.
  • CPU Intensive operations, such as encoding and decoding.
  • Leveraging Go eco-system, such as Kubernetes clients.

Requirement

  • PHP 7.2+
  • Go 1.13+
  • Swoole 4.4LTS+
  • Hyperf 1.1+ (optional)

Task Delivery Demo

package main

import (
    "github.com/hyperf/gotask/v2/pkg/gotask"
)

type App struct{}

func (a *App) Hi(name string, r *interface{}) error {
    *r = map[string]string{
        "hello": name,
    }
    return nil
}

func main() {
    gotask.SetAddress("127.0.0.1:6001")
    gotask.Register(new(App))
    gotask.Run()
}
<?php

use Hyperf\GoTask\IPC\SocketIPCSender;
use function Swoole\Coroutine\run;

require_once "../vendor/autoload.php";

run(function(){
    $task = new SocketIPCSender('127.0.0.1:6001');
    var_dump($task->call("App.Hi", "Hyperf"));
    // [ "hello" => "Hyperf" ]
});

Resources

English documentation is not yet complete! Please see examples first.

Benchmark

https://github.com/reasno/gotask-benchmark

Credit

gotask's People

Contributors

axpwx avatar dependabot-preview[bot] avatar huanghantao avatar leocavalcante avatar limingxinleo avatar niexiawei avatar reasno avatar slairmy avatar sy-records avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

gotask's Issues

调用mongo报错

hyperf2.0,用的example里的代码:

$task = new SocketIPCSender('127.0.0.1:6001');
$rs = $task->call('MongoProxy.Find', ['Database' => 'test', 'Collection' => 'test', 'Filter' => ['a'=>2], 'Opts' => [['Skip' => 0, 'Limit' => 2]]]);

错误信息:
error 'base64Codec: invalid input, error found in #0 byte of ...|{"Database|..., bigger context ...|{"Database":"test","Collection":"test","Filter":{"|...' on 'Hyperf\GoTask\Relay\CoroutineSocketRelay'

2.2.7 版本下 socket 连接拒绝是为什么?

想通过 fastmongo 这个组件包来操作mongo 安装和配置gotask 按照这个文档来的: https://github.com/Hyperf/gotask/wiki/Installation-&-Configuration

然后写demo的时候出现:

unable to establish connection unix:///xxx_61093a9f7fb39.sock: Connection refused (61)[137] in /xxx/vendor/hyperf/gotask/src/Relay/CoroutineSocketRelay.php

trace 信息如下:

#0 /xxx/vendor/hyperf/gotask/src/Relay/SocketTransporter.php(35): Hyperf\GoTask\Relay\CoroutineSocketRelay->connect()
#1 /xxx/vendor/spiral/goridge/src/RPC.php(48): Hyperf\GoTask\Relay\CoroutineSocketRelay->send('MongoProxy.Inse...', 20)
#2 /xxx/vendor/hyperf/gotask/src/IPC/SocketIPCSender.php(55): Spiral\Goridge\RPC->call('MongoProxy.Inse...', '.\v\x00\x00\x03Records\x00\xEB\n...', 4)
#3 /xxx/vendor/hyperf/gotask/src/GoTaskConnection.php(89): Hyperf\GoTask\IPC\SocketIPCSender->call('MongoProxy.Inse...', '.\v\x00\x00\x03Records\x00\xEB\n...', 4)
#4 /xxx/vendor/hyperf/gotask/src/GoTaskConnection.php(50): Hyperf\GoTask\GoTaskConnection->retry('call', Array, Object(Spiral\Goridge\Exceptions\PrefixException))
#5 /xxx/vendor/hyperf/gotask/src/SocketGoTask.php(41): Hyperf\GoTask\GoTaskConnection->__call('call', Array)
#6 /xxx/vendor/hyperf/gotask/src/GoTaskProxy.php(39): Hyperf\GoTask\SocketGoTask->call('MongoProxy.Inse...', '.\v\x00\x00\x03Records\x00\xEB\n...', 4)
#7 /xxx/vendor/hyperf/gotask/src/MongoClient/MongoProxy.php(101): Hyperf\GoTask\GoTaskProxy->call('MongoProxy.Inse...', '.\v\x00\x00\x03Records\x00\xEB\n...', 4)
#8 /xxx/vendor/hyperf/gotask/src/MongoClient/Collection.php(75): Hyperf\GoTask\MongoClient\MongoProxy->insertMany('.\v\x00\x00\x03Records\x00\xEB\n...')
#9 /xxx/runtime/container/proxy/App_Controller_Ab_MongoController.proxy.php(38): Hyperf\GoTask\MongoClient\Collection->insertMany(Array)
#10 /xxx/vendor/hyperf/http-server/src/CoreMiddleware.php(161): App\Controller\Ab\MongoController->index()
#11 /xxx/vendor/hyperf/http-server/src/CoreMiddleware.php(113): Hyperf\HttpServer\CoreMiddleware->handleFound(Object(Hyperf\HttpServer\Router\Dispatched), Object(Hyperf\HttpMessage\Server\Request))
#12 /xxx/vendor/hyperf/dispatcher/src/AbstractRequestHandler.php(65): Hyperf\HttpServer\CoreMiddleware->process(Object(Hyperf\HttpMessage\Server\Request), Object(Hyperf\Dispatcher\HttpRequestHandler))
#13 /xxx/vendor/hyperf/dispatcher/src/HttpRequestHandler.php(26): Hyperf\Dispatcher\AbstractRequestHandler->handleRequest(Object(Hyperf\HttpMessage\Server\Request))
#14 /xxx/vendor/hyperf/validation/src/Middleware/ValidationMiddleware.php(81): Hyperf\Dispatcher\HttpRequestHandler->handle(Object(Hyperf\HttpMessage\Server\Request))
#15 /xxx/vendor/hyperf/dispatcher/src/AbstractRequestHandler.php(65): Hyperf\Validation\Middleware\ValidationMiddleware->process(Object(Hyperf\HttpMessage\Server\Request), Object(Hyperf\Dispatcher\HttpRequestHandler))
#16 /xxx/vendor/hyperf/dispatcher/src/HttpRequestHandler.php(26): Hyperf\Dispatcher\AbstractRequestHandler->handleRequest(Object(Hyperf\HttpMessage\Server\Request))
#17 /xxx/vendor/hyperf/dispatcher/src/HttpDispatcher.php(40): Hyperf\Dispatcher\HttpRequestHandler->handle(Object(Hyperf\HttpMessage\Server\Request))
#18 /xxx/vendor/hyperf/http-server/src/Server.php(118): Hyperf\Dispatcher\HttpDispatcher->dispatch(Object(Hyperf\HttpMessage\Server\Request), Array, Object(Hyperf\HttpServer\CoreMiddleware))

看到issues 里面也有同样的错误,但是说试试新版本,2.2.7应该是目前最新的版本了。还请大佬帮忙解答下

启动时gotask.args参数为空时

image
image
启动时没设置gotask.args参数,会报错,这里默认gotask.args空数组,下面的array_push($argArr, ...$args)会默认没有第二个参数的

ConfigProvider.php bug

// Hyperf\GoTask\ConfigProvider

// line 72
public static function address()
    {
       // fix here to 'BASE_PATH'
        if (defined('BASE_PATH')) {
            $root = BASE_PATH . '/runtime';
        } else {
            $root = '/tmp';
        }

        $appName = env('APP_NAME');
        $socketName = $appName . '_' . uniqid();
        return $root . "/{$socketName}.sock";
    }

Doens't have support to hyperf/process v3

hyperf/gotask doens't have support to hyperf/process v3

  Problem 1
    - hyperf/gotask[v2.2.0, ..., v2.2.3] require hyperf/process ~2.0.0 -> found hyperf/process[v2.0.0, ..., 2.0.x-dev] but it conflicts with your root composer.json require (^3.0).
    - hyperf/gotask[v2.2.4, ..., v2.2.10] require hyperf/process ^2.0.0 -> found hyperf/process[v2.0.0, ..., 2.2.x-dev] but it conflicts with your root composer.json require (^3.0).

php bin/hyperf.php vendor:publish报错

版本:hyperf2.0

php bin/hyperf.php vendor:publish -vvv

In Input.php line 76:

[Symfony\Component\Console\Exception\RuntimeException]
Not enough arguments (missing: "package").

Exception trace:
at /home/aa/project/vvapi/vendor/symfony/console/Input/Input.php:76
Symfony\Component\Console\Input\Input->validate() at /home/aa/project/vvapi/vendor/symfony/console/Command/Command.php:253
Symfony\Component\Console\Command\Command->run() at /home/aa/project/vvapi/vendor/symfony/console/Application.php:929
Symfony\Component\Console\Application->doRunCommand() at /home/aa/project/vvapi/vendor/symfony/console/Application.php:264
Symfony\Component\Console\Application->doRun() at /home/aa/project/vvapi/vendor/symfony/console/Application.php:140
Symfony\Component\Console\Application->run() at /home/aa/project/vvapi/bin/hyperf.php:21
{closure}() at /home/aa/project/vvapi/bin/hyperf.php:22

mongodb偶尔会出现Connection refused

偶尔出现报错:unable to establish connection tcp://127.0.0.1:6012: Connection refused (111)

PHP版本:7.4.28

swoole:

Swoole => enabled
Author => Swoole Team <[email protected]>
Version => 4.8.5
Built => May  9 2022 18:04:04
coroutine => enabled with boost asm context
epoll => enabled
eventfd => enabled
signalfd => enabled
cpu_affinity => enabled
spinlock => enabled
rwlock => enabled
openssl => OpenSSL 1.0.2k-fips  26 Jan 2017
http2 => enabled
curl-native => enabled
pcre => enabled
zlib => 1.2.7
mutex_timedlock => enabled
pthread_barrier => enabled
futex => enabled
mysqlnd => enabled
async_redis => enabled

Directive => Local Value => Master Value
swoole.enable_coroutine => On => On
swoole.enable_library => On => On
swoole.enable_preemptive_scheduler => Off => Off
swoole.display_errors => On => On
swoole.use_shortname => Off => Off
swoole.unixsock_buffer_size => 8388608 => 8388608

gotask版本:v2.2.9

No entry or class found for 'MongoProxy.InsertOne'

运行一段时间后,报错 ,hyperf 版本是2.2最新版本
log.ERROR: Insert Document hybrid error 'Hyperf\Di\Exception\NotFoundException: No entry or class found for 'MongoProxy.InsertOne'(0) in /app/web/accept/vendor/hyperf/di/src/Container.php:79
Stack trace:
#0 /app/web/accept/vendor/hyperf/di/src/Container.php(127): Hyperf\Di\Container->make()
#1 /app/web/accept/vendor/hyperf/gotask/src/IPC/SocketIPCReceiver.php(130): Hyperf\Di\Container->get()
#2 /app/web/accept/vendor/hyperf/gotask/src/IPC/SocketIPCReceiver.php(85): Hyperf\GoTask\IPC\SocketIPCReceiver->dispatch()
#3 {main}' on 'Hyperf\GoTask\Relay\CoroutineSocketRelay'

    public function __construct(ContainerInterface $container,MongoClient $client)
    {
        $this->container = $container;
        $this->logger =$container->get(LoggerFactory::class)->get("log");
        $this->database = $client->database('test');
    }

    public function insertOne(string $collect,array $data)
    {
        try{
            return $this->database->collection($collect)->insertOne($data);
        }catch (\Throwable $e){
            $this->logger->error("Insert Document {$collect} ".$e->getMessage());
        }
        return null;
    }

Limit 不生效

  • MongoDB 版本

db version v3.0.15-1.10

  • 代码
$col->find(['gender' => 'male'], ['skip' => 1, 'limit' => 1]);
  • 问题

limit 等于 1 的时候,正常返回一笔,其他值的时候,返回全部记录,limit 失效。

找不到sock是什么原因呀?如果没有启动该怎么启动呢

[WARNING] RemoteGoTask::__call failed, because unable to establish connection unix:///tmp/govmids_6020d03d7c92c.sock: No such file or directory (2)

Fatal error: Uncaught Spiral\Goridge\Exceptions\RelayException: No such file or directory (2) in /var/wwwroot/hyperf-gotask/vendor/hyperf/gotask/src/Relay/CoroutineSocketRelay.php:134

image

gotask是不支持协程风格吗?

image
服务启动之后一直报 on null 的错误,我看了一下这部分process的源码。
只有当服务不是协程的时候,才会new SwooleProcess,不然process就肯定是null。
image
但因为 while(ture)
image
就会一直去跑handle(),然后一直拿不到process,然后一直log错误。
如果在server那边取消协程风格就能正常运行了。

所以是hyperf/gotask是不支持协程风格吗?

MongoDB transactions

Hi

Can gotask support mongodb transactions? Has anyone done that before in hyperf?

unit test 中不使用 gotask

unit test 中不使用 gotask
估計是沒有相關的Listener.
希望可以加入,

請問有沒有其他方法可以使用gotask

sock 文件没生成

错误信息:
unable to establish connection unix:///tmp/baohong_60113f552bdcf.sock: No such file or directory (2)
调用位置:
'socket_address' => \Hyperf\GoTask\ConfigProvider::address(),
'go2php' => [
'enable' => false,
'address' => \Hyperf\GoTask\ConfigProvider::address(),
],

请求MongoClient频繁出现 WARNING Server::check_worker_exit_status(): worker(pid=47106, id=156) abnormal exit, status=0, signal=11

php: 8.0.20
swoole: 4.8.10
hyperf:2.2

新建一个Service

<?php
declare(strict_types=1);
namespace App\Service;
use App\Utils\Tools;
use Hyperf\Logger\LoggerFactory;
use Psr\Container\ContainerInterface;
use Hyperf\GoTask\IPC\SocketIPCSender;
use Hyperf\GoTask\MongoClient\MongoClient;
use Hyperf\GoTask\MongoClient\MongoProxy;
use Hyperf\Config\Config;
class MongoService
{

    protected $container;
    protected $logger;
    protected $database;

    public function __construct(ContainerInterface $container)
    {
        $this->container = $container;
        $this->logger =$container->get(LoggerFactory::class)->get("log");

        $task = new SocketIPCSender(Tools::Yaml('app.sideCar.mongo'));
        $client = new MongoClient(new MongoProxy($task), new Config([]));
        $this->database = $client->database('robot');
    }

    //写入一条数据
    public function insertOne(string $collect,array $data)
    {
        try{

            return $this->database->collection($collect)->insertOne($data);

        }catch (\Throwable $e){
            $this->logger->error("Insert Document error ".$e->getMessage());
        }
    }
}

新建立一个Controller

<?php
declare(strict_types=1);
namespace App\Controller;

use Hyperf\HttpServer\Contract\RequestInterface;
use Hyperf\HttpServer\Contract\ResponseInterface;
use App\Service\MongoService;

class CallBackController {

    public function index(RequestInterface $request, ResponseInterface $response,MongoService $client){
        //记录所有请求的数据
        $client->insertOne("collection",$request->all());
    }

}

go的代码使用example的代码

报错 WARNING Server::check_worker_exit_status(): worker(pid=4018, id=240) abnormal exit, status=2, signal=0

hyperf3.1不支持

Problem 1
- hyperf/gotask[dev-dependabot/composer/spiral/goridge-tw-2.4or-tw-4.0, dev-master, v3.0.0] require symfony/event-dispatcher ^6.3 -> found symfony/event-dispatcher[v6.3.0-BETA1, ..., 6.4.x-dev] but the package is fixed to v7.0.0 (lock file version) by a partial update and that version does not match. Make sure you list it as an argument for the update command.
- hyperf/gotask[v1.0.0, ..., v1.0.1] require hyperf/command ^1.1 -> found hyperf/command[v1.1.0, ..., 1.1.x-dev] but it conflicts with your root composer.json require (~3.1.0).
- hyperf/gotask[0.0.3, ..., v0.3.0, v1.0.2, v2.0.0, ..., v2.1.1] require hyperf/pool ^1.1 -> found hyperf/pool[v1.1.0, ..., 1.1.x-dev] but the package is fixed to v3.1.0 (lock file version) by a partial update and that version does not match. Make sure you list it as an argument for the update command.
- hyperf/gotask[v2.2.0, ..., v2.2.3] require hyperf/pool ~2.0.0 -> found hyperf/pool[v2.0.0, v2.0.3, v2.0.12, 2.0.x-dev] but the package is fixed to v3.1.0 (lock file version) by a partial update and that version does not match. Make sure you list it as an argument for the update command.
- hyperf/gotask[dev-dependabot/composer/friendsofphp/php-cs-fixer-tw-2.14or-tw-3.0, dev-perm, v2.1.x-dev, ..., v2.2.10] require hyperf/pool ^2.0.0 -> found hyperf/pool[v2.0.0, ..., 2.2.x-dev] but the package is fixed to v3.1.0 (lock file version) by a partial update and that version does not match. Make sure you list it as an argument for the update command.
- hyperf/gotask v3.0.0-alpha requires symfony/event-dispatcher ^5.1 -> found symfony/event-dispatcher[v5.1.0-BETA1, ..., 5.4.x-dev] but the package is fixed to v7.0.0 (lock file version) by a partial update and that version does not match. Make sure you list it as an argument for the update command.
- Root composer.json requires hyperf/gotask * -> satisfiable by hyperf/gotask[dev-dependabot/composer/spiral/goridge-tw-2.4or-tw-4.0, dev-dependabot/composer/friendsofphp/php-cs-fixer-tw-2.14or-tw-3.0, dev-master, dev-perm, 0.0.3, ..., v0.3.0, v1.0.0, v1.0.1, v1.0.2, v2.0.0, ..., v2.2.10, v3.0.0-alpha, v3.0.0, 9999999-dev].

Use the option --with-all-dependencies (-W) to allow upgrades, downgrades and removals for packages currently locked to specific versions.
You can also try re-running composer require with an explicit version constraint, e.g. "composer require hyperf/gotask:*" to figure out if any version is installable, or "composer require hyperf/gotask:^2.1" if you know which you need.

Questions about Asynchronous Communication Feature in GoTask

Description

Problem Description:

I'm exploring the usage of the gotask package in my project and I'm encountering a scenario where I need to execute heavy processes using Go tasks. However, I'm looking for a feature that allows asynchronous communication between the code triggering the Go tasks and the tasks themselves. This feature would enable me to receive partial results or updates from the process as it progresses, similar to a "yield" mechanism in asynchronous languages.

Question

  1. Does the gotask package already include the functionality for asynchronous communication?
  2. If yes, how can I use this functionality to receive updates or partial results from Go tasks while they are being processed?
  3. If the functionality doesn't exist, do you have any suggestions or tips on how I can implement this type of asynchronous communication in conjunction with the gotask package?

Examples

If possible, I would appreciate seeing examples of how this functionality can be implemented, whether by using existing features within the gotask package or through suggestions on extending the code to achieve this goal. Below is an example of what I have in mind:

package main

import (
	"log"
	"time"

	"github.com/hyperf/gotask/v2/pkg/gotask"
)

// App sample
type App struct{}

// ProcessBatch executes batch processing and sends asynchronous updates.
func (a *App) ProcessBatch(input interface{}, r *string) error {
	for i := 1; i <= 10; i++ {
		// Simulating heavy processing
		time.Sleep(1 * time.Second)
		
		// Sending asynchronous update
		gotask.AsyncUpdate(fmt.Sprintf("Processed %d items", i))
	}

	*r = "Processing completed"
	return nil
}

func main() {
	if err := gotask.Register(new(App)); err != nil {
		log.Fatalln(err)
	}
	if err := gotask.Run(); err != nil {
		log.Fatalln(err)
	}
}

Benefits

Implementing this asynchronous communication feature between Go tasks and the triggering code would open up new possibilities for utilizing the gotask package. Furthermore, this feature would enhance the utility of the library for heavy processing scenarios.

Thank you in advance for your attention and assistance!

mongo_client批量插入的时候部分_id重复无法获取实际插入的数量

伪代码:

<?php
$insertMany = [
  { _id: 1, qty: 20 },
  { _id: 2, qty: 55 },
  { _id: 2, qty: 30 },
  { _id: 3, qty: 100},
  { _id: 3, qty: 120},
  { _id: 4, qty: 20},
  { _id: 5, qty: 30}
];
try {
  $insertResult = $mongoTable->insertMany($insertMany, ['ordered' => false]);
  printf("Inserted %d document(s)\n", count($insertResult->getInsertedIDs()));
} catch (ServiceException $e) {
  print_r($e->getMessage());
  //output like: error 'error while executing mongo command: bulk write error: [{[{E11000 duplicate key error collection: db_aaa.t_table index: _id_ dup key: { _id: "2"}} {E11000 duplicate key error collection: db_aaa.t_table index: _id_ dup key: { _id: "3" }}]}, {<nil>}]' on 'Hyperf\GoTask\Relay\CoroutineSocketRelay'
} catch (GoridgeException $e) {
  //other logic
}

ordered是false的时候,会继续插入非重复的数据,但是ServiceException没有提供实际写入的数据,可靠性不好控制。

找不到依赖包

github.com/hyperf/gotask/v2/pkg/gotask

找不到这个依赖项 v2是哪来的?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.