Git Product home page Git Product logo

websocket-client's Introduction

websocket-client

AMPHP is a collection of event-driven libraries for PHP designed with fibers and concurrency in mind. amphp/websocket-client provides an asynchronous WebSocket client for PHP based on Amp. Websockets are full-duplex communication channels, which are mostly used for realtime communication where the HTTP request / response cycle has too much overhead. They're also used if the server should be able to push data to the client without an explicit request.

There are various use cases for a WebSocket client in PHP, such as consuming realtime APIs, writing tests for a WebSocket server, or controlling web browsers via their remote debugging APIs, which are based on WebSockets.

Installation

This package can be installed as a Composer dependency.

composer require amphp/websocket-client

Requirements

  • PHP 8.1+

Usage

Connecting

You can create new WebSocket connections using Amp\Websocket\connect() or calling connect() on an instance of WebsocketConnector. The connect() function accepts a string, PSR-7 UriInterface instance, or a WebsocketHandshake as first argument. URIs must use the ws or wss (WebSocket over TLS) scheme.

Custom connection parameters can be specified by passing a WebsocketHandshake object instead of a string as first argument, which can also be used to pass additional headers with the initial handshake. The second argument is an optional Cancellation which may be used to cancel the connection attempt.

<?php

require 'vendor/autoload.php';

use function Amp\Websocket\Client\connect;

// Amp\Websocket\Client\connect() uses the WebsocketConnection instance
// defined by Amp\Websocket\Client\websocketConnector()
$connection = connect('ws://localhost:1337/websocket');

foreach ($connection as $message) {
    // $message is an instance of Amp\Websocket\WebsocketMessage
}

Custom Connection Parameters

If necessary, a variety of connection parameters and behaviors may be altered by providing a customized instance of WebsocketConnectionFactory to the WebsocketConnector used to establish a WebSocket connection.

use Amp\Websocket\Client\Rfc6455ConnectionFactory;
use Amp\Websocket\Client\Rfc6455Connector;
use Amp\Websocket\Client\WebsocketHandshake;
use Amp\Websocket\ConstantRateLimit;
use Amp\Websocket\Parser\Rfc6455ParserFactory;
use Amp\Websocket\PeriodicHeartbeatQueue;

$connectionFactory = new Rfc6455ConnectionFactory(
    heartbeatQueue: new PeriodicHeartbeatQueue(
        heartbeatPeriod: 5, // 5 seconds
    ),
    rateLimit: new ConstantRateLimit(
        bytesPerSecondLimit: 2 ** 17, // 128 KiB
        framesPerSecondLimit: 10,
    ),
    parserFactory: new Rfc6455ParserFactory(
        messageSizeLimit: 2 ** 20, // 1 MiB
    ),
    frameSplitThreshold: 2 ** 14, // 16 KiB
    closePeriod: 0.5, // 0.5 seconds
);

$connector = new Rfc6455Connector($connectionFactory);

$handshake = new WebsocketHandshake('wss://example.com/websocket');
$connection = $connector->connect($handshake);

Sending Data

WebSocket messages can be sent using the Connection::sendText() and Connection::sendBinary() methods. Text messages sent with Connection::sendText() must be valid UTF-8. Binary messages send with Connection::sendBinary() can be arbitrary data.

Both methods return as soon as the message has been fully written to the send buffer. This does not mean that the message is guaranteed to have been received by the other party.

Receiving Data

WebSocket messages can be received using the Connection::receive() method. Connection::receive() returns a WebsocketMessage instance once the client has started to receive a message. This allows streaming WebSocket messages, which might be pretty large. In practice, most messages are rather small, and it's fine buffering them completely by either calling WebsocketMessage::buffer() or casting the object to a string. The maximum length of a message is defined by the option given to the WebsocketParserFactory instance provided to the WebsocketConnectionFactory (10 MiB by default).

use Amp\Websocket\Client\WebsocketHandshake;
use Amp\Websocket\WebsocketCloseCode;
use function Amp\Websocket\Client\connect;

// Connects to the websocket endpoint at libwebsockets.org
// which sends a message every 50ms.
$handshake = (new WebsocketHandshake('wss://libwebsockets.org'))
    ->withHeader('Sec-WebSocket-Protocol', 'dumb-increment-protocol');

$connection = connect($handshake);

foreach ($connection as $message) {
    $payload = $message->buffer();

    printf("Received: %s\n", $payload);

    if ($payload === '100') {
        $connection->close();
        break;
    }
}

Versioning

amphp/websocket-client follows the semver semantic versioning specification like all other amphp packages.

Security

If you discover any security related issues, please use the private security issue reporter instead of using the public issue tracker.

License

The MIT License (MIT). Please see LICENSE for more information.

websocket-client's People

Contributors

bwoebi avatar danog avatar iggyvolz avatar kelunik avatar lt avatar nimah79 avatar staabm avatar trowski avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

websocket-client's Issues

Exception not reaching outer try catch

I put the whole Loop doing a websocket in a try... catch( Exception $e ) {

If there is a problem, like I disconnect internet, I get...

   Amp\Websocket\ClosedException  : Connection closed abnormally while awaiting message; Code 1008 (POLICY_VIOLATION); Reason: "Exceeded unanswered PING limit"

  at .../vendor/amphp/websocket/src/Rfc6455Client.php:659
    655|                             $deferred->resolve();
    656|                             break;
    657| 
    658|                         default:
  > 659|                             $deferred->fail(new ClosedException(
    660|                                 'Connection closed abnormally while awaiting message',
    661|                                 $code,
    662|                                 $reason
    663|                             ));

  Exception trace:

  1   Amp\Websocket\Rfc6455Client::Amp\Websocket\{closure}()
      [internal]:0

  2   Generator::current()
      .../vendor/amphp/amp/lib/Coroutine.php:67

How can I catch error and retry since try/catch does not work?

Error while installing in a Laravel 8 Project

Setup

  • Laravel Framework 8.58.0
  • Windows 10 Pro build 19055
 19/21 [=========================>--]  90%    Failed to extract amphp/websocket-client: (2) "C:\Program Files\7-Zip\7z.EXE" x -bb0 -y C:\wamp64\www\etradexchange-api\vendor\composer\tmp-6af2ca5d53f3e85f2ecd39c2fd21b507 -oC:\wamp64\www\etradexchange-api\vendor\composer\792c342b

ERROR: Cannot create symbolic link : A required privilege is not held by the client. : C:\wamp64\www\etradexchange-api\vendor\composer\792c342b\amphp-websocket-client-173b541\docs\asset

    The archive may contain identical file names with different capitalization (which fails on case insensitive filesystems)
    Unzip with 7z command failed, falling back to ZipArchive class
    

Question, on breaks. Errors #1006, 1008

Periodically, 5-10 times per hour, my script crashes into a fatal error. Here is the log:

  • Connection closed abnormally while awaiting message; Code 1006 (ABNORMAL_CLOSE); Reason: "TCP connection closed unexpectedly"
  • Connection closed abnormally while awaiting message; Code 1008 (POLICY_VIOLATION); Reason: "Exceeded unanswered PING limit"

How can I rewrite the code so that the script does not fall into a fatal error, but just let's say it sleeps for 30 seconds? or maybe there is another way so that this error does not occur and the script runs forever. I'll attach the code, can you help write the correct workaround for this error?

I will be very grateful!


MyCode:

<?php

//require_once ...

use Amp\ByteStream\StreamException;
use Amp\Loop;
use Amp\Websocket;
use Amp\Websocket\Client;
use Amp\Websocket\Client\Connection;
use Amp\Websocket\Client\Handshake;
use Amp\Websocket\ClosedException;
use Amp\Websocket\Message;
use Amp\Websocket\Options;
use Amp\Delayed;
use function Amp\Websocket\Client\connect;
	
	Amp\Loop::run(function () {
	
	$handshake = (new Handshake('wss://mg-s1.site.ru/api/bot/v1/ws?events=message_new'))->withHeader('x-bot-token', '0000000000000000000000000000000000000000000000000');

    $connection = yield connect($handshake);
    yield $connection->send('Hello!');

    try {
		while ($message = yield $connection->receive()) {
			$payload = yield $message->buffer();
			
			$result = json_decode($payload, true);
			
			//handler code

		} 
	} catch (ClosedException $e) {
            logFileEvent ('Connection break. sleep 30 sec');
			logFile ('Errors: ' . $e->getMessage());
			sleep(30);
		} catch (AssertionError $e) {
            logFile ('Errors: ' . $e->getMessage());
            $connection->close();
        } catch (Error $e) {
			logFile ('Errors: ' . $e->getMessage());
            $connection->close();
        } catch (StreamException $e) {
			logFile ('StreamException: ' . $e->getMessage());
            $connection->close();
        }
});

?>

Missing `$closeTimeout` property

Not sure if the socket is being closed from the remote end, or the local end due to a timeout, but getting the following notice spam.

Notice: Undefined property: Amp\Websocket\Rfc6455Endpoint::$closeTimeout in vendor/amphp/websocket/lib/Rfc6455Endpoint.php on line 428

WebsocketHandshake does not support cloning

Hello, since the last commit, I've not been able to use parallel + websocket-client.
I get the error:

Error thrown in context with message "Amp\Websocket\Client\WebsocketHandshake does not support cloning" and code "0" in /home/custodio/Development/Enjin/open-platform-core/vendor/amphp/amp/src/ForbidCloning.php:9; call Amp\Parallel\Context\ContextPanicError::getOriginalTrace() for the stack trace in the context as an array; if the Xdebug extension is enabled, set "xdebug.mode" to "debug" to include the exception stack trace in the context in the exception message

I've updated all dependencies to the last amphp commit on every package still no luck. Is there something else that we should do or should we just wait for future releases?

how to end loop from outside ?

hi there,

thanks for this great package!
i have a simple question: when the loop is running and the client is waiting for some data from the server:

while ($message = yield $this->connection->receive()) {
.....
}

i can only do something (inside the while loop) when the server is sending something.
but lets say i want to send something to the server, even the server didnยดt send anything,
how can i do this ?

in my example i am catching CTRL+C to close the connection and end the loop. but for now, my code
only gets executes whenever the server is sending something, but not just-in-time.

what i am doing:

pcntl_async_signals(true);
pcntl_signal(SIGINT, [$this, 'shutdown']);
pcntl_signal(SIGTERM, [$this, 'shutdown']);

and my shutdown function is only setting $this->stop = true and inside the while, i am sending something to the server, close the connection and Loop::stop. but all this only happens, if the server is sending something, i cannot run all that without any feedback from the server.

thanks for any help!

Activate travis hook

Builds dont run on travis right now.

While at it, is the packagist hook activated?

Question: Documentation on available `\Amp\Websocket\Options`

Is there a documentation page somewhere that documents nature of the various \Amp\Websocket\Options properties?

    private $streamThreshold = 32768; // 32KB
    private $frameSplitThreshold = 32768; // 32KB
    private $bytesPerSecondLimit = 1048576; // 1MB
    private $framesPerSecondLimit = 100;
    private $frameSizeLimit = 2097152; // 2MB
    private $messageSizeLimit = 10485760; // 10MB
    private $textOnly = false;
    private $validateUtf8 = true;
    private $closePeriod = 3;
    private $compressionEnabled = false;
    private $heartbeatEnabled = true;
    private $heartbeatPeriod = 10;
    private $queuedPingLimit = 3;

Some of these are rather self-explanatory given their name, but it would be nice to see a page with thorough documentation on them (if one exists).

Thank you!

Fatal Error in Rfc6455Connection due to not implementing method

Error:

Fatal error: Class Amp\Websocket\Client\Rfc6455Connection contains 1 abstract method and must therefore be declared abstract or implement the remaining methods (Amp\Websocket\Client::didPeerInitiateClose) in vendor/amphp/websocket-client/src/Rfc6455Connection.php on line 15

Call Stack (amphp code starts at line 8, paths abbreviated to root of project):

0.0055     465472   1. {main}() /bin/gemini:0
0.0737    5438352   2. Symfony\Component\Console\Application->run(???, ???) /bin/gemini:340
0.0794    5754736   3. Symfony\Component\Console\Application->doRun(???, ???) /vendor/symfony/console/Application.php:149
0.0807    5758496   4. Symfony\Component\Console\Application->doRunCommand(???, ???, ???) /vendor/symfony/console/Application.php:273
0.0808    5758496   5. Kobens\Gemini\Command\Command\Market\BookKeeper->run(???, ???) /vendor/symfony/console/Application.php:1009
0.0816    5759952   6. Kobens\Gemini\Command\Command\Market\BookKeeper->execute(???, ???) /vendor/symfony/console/Command/Command.php:255
0.0823    5799112   7. Kobens\Gemini\Api\WebSocket\MarketData\BookKeeper->openBook() /src/Command/Command/Market/BookKeeper.php:45
0.0854    6161264   8. Amp\Loop::run(???) /src/Api/WebSocket/MarketData/BookKeeper.php:30
0.0856    6171784   9. Amp\Loop\NativeDriver->run() /vendor/amphp/amp/lib/Loop.php:84
0.3095    8750800  10. Amp\Loop\NativeDriver->tick() /vendor/amphp/amp/lib/Loop/Driver.php:72
0.3095    8750880  11. Amp\Loop\NativeDriver->dispatch(???) /vendor/amphp/amp/lib/Loop/Driver.php:134
0.3096    8750880  12. Amp\Loop\NativeDriver->selectStreams(???, ???, ???) /vendor/amphp/amp/lib/Loop/NativeDriver.php:97
0.3621    8751408  13. Amp\ByteStream\ResourceInputStream::Amp\ByteStream\{closure:/vendor/amphp/byte-stream/lib/ResourceInputStream.php:70-102}(???, ???, ???) /vendor/amphp/amp/lib/Loop/NativeDriver.php:192
0.3623    8751632  14. Amp\Deferred->resolve(???) /vendor/amphp/byte-stream/lib/ResourceInputStream.php:101
0.3623    8751632  15. {anonymous-class:/vendor/amphp/amp/lib/Deferred.php:20-25}->resolve(???) /vendor/amphp/amp/lib/Deferred.php:45
0.3623    8751632  16. Amp\Coroutine->Amp\{closure:/vendor/amphp/amp/lib/Coroutine.php:79-135}(???, ???) /vendor/amphp/amp/lib/Internal/Placeholder.php:130
0.3623    8751632  17. Generator->send(???) /vendor/amphp/amp/lib/Coroutine.php:105
0.3623    8751632  18. Amp\Http\Client\Connection\Http1Connection->readResponse(???, ???, ???, ???) /vendor/amphp/amp/lib/Coroutine.php:105
0.3638    8804184  19. Amp\Http\Client\Connection\Http1Connection->handleUpgradeResponse(???, ???, ???) /vendor/amphp/http-client/src/Connection/Http1Connection.php:322
0.3640    8879880  20. Amp\asyncCall(???, ...variadic()) /vendor/amphp/http-client/src/Connection/Http1Connection.php:474
0.3640    8879880  21. Amp\call(???, ...variadic()) /vendor/amphp/amp/lib/functions.php:91
0.3641    8880568  22. Amp\Coroutine->__construct(???) /vendor/amphp/amp/lib/functions.php:66
0.3641    8880568  23. Generator->current() /vendor/amphp/amp/lib/Coroutine.php:60
0.3641    8880568  24. Amp\Http\Client\Connection\Http1Connection::Amp\Http\Client\Connection\{closure:/vendor/amphp/http-client/src/Connection/Http1Connection.php:466-474}() /vendor/amphp/amp/lib/Coroutine.php:60
0.3641    8880568  25. Amp\call(???, ...variadic(???, ???, ???)) /vendor/amphp/http-client/src/Connection/Http1Connection.php:468
0.3641    8880944  26. Amp\Websocket\Client\Rfc6455Connector::Amp\Websocket\Client\{closure:/vendor/amphp/websocket-client/src/Rfc6455Connector.php:55-77}(???, ???, ???) /vendor/amphp/amp/lib/functions.php:60
0.3641    8880944  27. Amp\Websocket\Client\Rfc6455ConnectionFactory->createConnection(???, ???, ???, ???) /vendor/amphp/websocket-client/src/Rfc6455Connector.php:76
0.3657    9040408  28. spl_autoload_call(???) /vendor/amphp/websocket-client/src/Rfc6455ConnectionFactory.php:20
0.3657    9040472  29. Composer\Autoload\ClassLoader->loadClass(???) /vendor/amphp/websocket-client/src/Rfc6455ConnectionFactory.php:20
0.3658    9040600  30. Composer\Autoload\includeFile(???) /vendor/composer/ClassLoader.php:322
0.3659    9054808  31. include('/vendor/amphp/websocket-client/src/Rfc6455Connection.php') /vendor/composer/ClassLoader.php:444

Closure being passed to \Amp\Loop::run():

private function getRunClosure(): \Closure
{
    $websocketUrl = $this->getWebSocketUrl();
    return function () use ($websocketUrl) {
        $connection = yield \Amp\Websocket\Client\connect($websocketUrl);
        while ($message = yield $connection->receive()) {
            $payload = yield $message->buffer();
            // code not relevant to issue...
        }
    };
}

Any assistance would be greatly appreciated. My application has version 173b541 installed (release 1.0.0).

Thank you in advance.

Problem determining connection was closed

I'm having problems figuring out the connection was closed.
Below is the code example I'm using for testing.
It connects to endpoint, sends ping messages to a server every 5 second, and sends pong messages to a server when it gets a ping message from server.
It also closes connection on the client side after 30 seconds just for testing purporses.
What I am expecting:

  • get WebsocketClosedException while attempting to send a ping message after connection was closed
  • get EventLoop still working after connection was closed and thus trying to send ping message resulting in WebsocketClosedException
  • get onClose callback called when connection is closed (either by client or by server)

What I am getting:

  • onClose never gets called, no matter who closed the connection
  • no exception at all after the connection is closed - the script just hangs in while loop, EventLoop stops trying to send ping message. This cover both cases of connection being closed on the client and on the server.
  • issuing $connection->close() doesn't result on connection actually disappering from the server. The server still 'sees' this connection

In the end I need a reliable way to understand, that the connection was closed. On the server in particular.
Any advice is very apriciated.

<?php

require dirname(__DIR__) . '/vendor/autoload.php';

use Revolt\EventLoop;
use function Amp\weakClosure;
use function Amp\Websocket\Client\connect;

$serverUrl = 'ws://dev.sportlevel.com/highway';

$connection = connect($serverUrl);

$connection->onClose(function () {
    echo "connection is closed!" . PHP_EOL;
});

EventLoop::repeat(5, weakClosure(function () use ($connection) {
    $pingMessage = json_encode([
        'msg_type' => 'ping',
        'timestamp' => microtime(true),
    ]);

    echo "sending ping: {$pingMessage}" . PHP_EOL;
    try {
        $connection->sendText($pingMessage);
    } catch (\Exception $e) {
        echo "send ping failed! " . $e->getMessage() . PHP_EOL;
    }

}));

EventLoop::delay(30, function () use ($connection) {
    $connection->close();
});

while (1) {
    if ($message = $connection->receive()) {
        $payload = $message->buffer();

        echo "Received: {$payload}" . PHP_EOL;

        $message = json_decode($payload, true);

        if ($message['msg_type'] == 'ping') {
            $pongMessage = json_encode([
                'msg_type' => 'pong',
                'timestamp' => $message['timestamp'],
            ]);

            echo "send pong: {$pongMessage}" . PHP_EOL;
            try {
                $connection->sendText($pongMessage);
            } catch (\Exception $e) {
                echo "send pong failed! " . $e->getMessage() . PHP_EOL;
            }
        }
    }
}

Reconnecting upon disconnection

There is no example nor can I work out how to reconnect to a socket server upon disconnection.
I assume the loop doesn't need to be restarted, but rather the connection needs to be re-established. When the loop starts it only yields the handshake and connection once.

Can someone please provide an example of how I could leverage the $connection->isClosed() method to force another connection to be established?

Issue with handshake regex

I'm having an issue getting a legitimate Websocket handshake to be recognised. Initially I thought it was the AWS ALB implementation and how is handled headers however debugging suggests its the library. I have looked at the regex in Handshake.php to try and work out the problem. After running the regex through a regex tool I noticed there were some errors in the regex.

The HTTP1.1 Response looks like this:

HTTP/1.1 101 Switching Protocols
upgrade: websocket
connection: upgrade
sec-websocket-accept: iekWUM343JKJJKY/SRcjycD9l8UsRX2iU=
keep-alive: timeout=60, max=999
date: Wed, 30 Jan 2019 04:48:08 GMT

The regex I am referring to is here: https://github.com/amphp/websocket-client/blob/master/lib/Handshake.php#L84

I was getting the error 'Missing "Upgrade: websocket" header.'. I changed it the following and it now works as expected. Can someone please let me know why the additional slashes were in the original regex as they are not valid? Happy to change it back if required but it would be awesome if anyone can identify any problems with my response.

\preg_match_all("((?P<field>[^()<>@,;:\"\/[\]?={}\x01-\x20\x7F]+):[\x20\x09]*(?P<value>[^\x01-\x08\x0A-\x1F\x7F]*)\x0D?[\x20\x09]*\r?\n)", $headerBuffer, $responseHeaders);

Getting error after making a subsequent request in the while condition

PHP Notice:  Undefined offset: 1 in C:\wamp64\www\etradexchange-api\vendor\amphp\websocket\src\Rfc6455Client.php on line 274

code

use Amp\Delayed;
use Amp\Websocket;
use Amp\Websocket\Client;

class SomeService implements \App\Interfaces\SomeInterface
{
  private $connection;
  private $transaction;
  public $error_message;
  public $user_message;

  protected static $instance = null;

  public static function run($transaction)
  {
      if ( is_null ( self::$instance ) ) {
          self::$instance = new self($transaction);
      }

      return self::$instance;
  }


  public function __construct($transaction) {

    $this->transaction = $transaction;
  }

  public function authorize(callable $callback, $must_authorize = true) {
      Amp\Loop::run(function () use($callback, $must_authorize) {
        try {

            $connection = yield Client\connect(config('~.url') . '?app_id=' . config('~.app_id'));

          if ($must_authorize) {
            $connection->send(json_encode([
              'authorize' => config('~.btc.token')
            ]));
            
          }
          

        while ($message = yield $connection->receive()) {


          call_user_func($callback, $connection);


          $payload = yield $message->buffer();

          $response = json_decode($payload, false);

          if (isset($response->error)) {

            $connection->close();
            throw new \App\Exceptions\SomeException($response->error->message);
            break;

          }

          ...
          
          if ($response->msg_type === 'exchange_rates') {
              $amount = $response->exchange_rates->rate * $this->transaction->transactable->amount_in_usd;

              call_user_func($this->transfer($amount), $connection); // culprit call that causes the error
          }

          if ($response->msg_type === 'paymentagent_transfer') {
            $connection->close();
            dd(response);


          }



        }
        } catch(Amp\Websocket\Client\ConnectionException $exception) {
          $error_json = yield $exception->getResponse()->getBody()->buffer();
          $error_object = json_decode($error_json, false);
          throw new \App\Exceptions\SomeException($error_object->error);
        }
  	});

  }
                 

  public function transfer() {
    return $this->authorize(function($connection) use($amount){
          $connection->send(json_encode([
            'transfer' => 1,
            'amount' => $amount,
            'currency' => '',
            'transfer_to' => $this->transaction->account_id,
            'description' => '',
          ]));

        }, true);

  }


  
  public function getResult() {
    $temp = $this->user_message;
    $this->user_message = null;
    return $temp . "\r\n";
    
  }
  
  public function getExchangeRates() {
      return $this->authorize(function($connection) {
        $connection->send(json_encode([
          'exchange_rates' => 1,
          'base_currency' => 'USD',
        ]));
    });
  }



}
$t = Transaction::with('transactable')->find(7);  
$h = SomeService::run($t);

  
$h->getExchangeRates();

echo $h->getResult();
  

about keep the conection alive

i suscribed the channel by sending some msg to ws server .if i want to send another msg like โ€˜pingโ€™ every 30s, where should i add the code in the while loop?

Laravel integration

its a great library and this is the best one among other libs. Are there any plans to create a walk thru installations for laravel?

Call to undefined method Amp\Coroutine::receive()

I tried the example from the README and I'm getting an error on the receive() call:

Call to undefined method Amp\Coroutine::receive()

<?php

require 'vendor/autoload.php';

use Amp\Websocket\Client;

$connection = Client\connect('ws://localhost:1337/ws');

while ($message = $connection->receive()) {
    $payload = $message->buffer();

    printf("Received: %s\n", $payload);

    if ($payload === '100') {
        $connection->close();
        break;
    }
}

Am I missing something obvious?

How to get pings from an open connection?

Hello,

I am using the readme example here: https://github.com/amphp/websocket-client/blob/master/README.md

Issue:
The while loop will only run when a message is sent through the stream, no message, nothing ever can happen since it's in idle mode waiting.

Solution:
How can I receive the ping or have the while loop run on the ping, and still collect the messages?

For instance, I would like to have control on checking some information, (such as the socket should remain open) however, it can only check that when a message is coming through the stream, this limits the script as it would only ever execute when there's an activity, thus waiting forever if no information is ever sent.

Pings are standard in web sockets based on the RFC: https://tools.ietf.org/html/rfc6455

in the Rfc6455Connection connection class, there are pings, but there is no documentation on how to access this or use this directly.

It would be cool to run the while loop on the ping and check if there is a message at the same time, is this possible?

Add tests

Currently there aren't any tests, we should definitely add some. At least some integration tests with Aerys.

Invalid Websocket URI provided

I am getting the error "Invalid Websocket URI provided".

I am fairly certain the websocket URI is just fine, as it works in javascript.

js:
var mySock = new WebSocket("wss://" + streamerSocketUrl + "/ws");

php:

\Amp\Loop::run(function () {
        /** @var Client\Connection $connection */
          $connection = yield Client\connect("wss://" . $streamerSocketUrl . "/ws");

I have echo'd it out so I know it is not a problem with the uri, it is something else. anything I could be missing??

RC is not compatible with amphp/http-server:^2

Thanks for your great job once again, guys! In my project I use both HTTP server and WebSocket client/server libraries, and the last amphp/http-server that is installable is RC2 because of league/uri version conflict. I hope there will be a compatible websocket libs release tagged soon.

P.S.: I use it in a semi-production environment and I'm ready to deal with RC risks, but I feel too dangerous to switch to dev-master, so I need at least RC.

Amp\Dns\TimeoutException

This may be the issue with my set up and not a bug, but I am not sure where I can ask for help.

I am following the basic example here: https://amphp.org/websocket-client/

Running on my Windows 10 Dev PC.

I am getting Uncaught Amp\MultiReasonException: Multiple errors encountered; use Amp\MultiReasonException::getReasons() to retrieve the array of exceptions thrown in \vendor\amphp\amp\lib\functions.php:540

Looking into the error, I found some issue with my DNS.
Amp\Dns\TimeoutException No response for 'ftx.com' (A) from any nameserver within 3000 ms after 2 attempts, tried udp://192.168.1.1:53, udp://192.168.1.1:53

Not sure what I can do to resolve this. I have no problems running a websocket client on Javascript.

Error when I make some condition in the while loop?

when I use the client like below,if I comment the code snippet below the "/hhhhhhhhhhhhhhhh", it run well, it will go error after I add the conditions below the comment. Can you give some advise, Thank you!

$connection = yield Websocket\connect("wss://real.okex.com:10440/websocket/okexapi");   
yield $connection->send("[{'event':'addChannel','channel':'ok_sub_futureusd_btc_ticker_quarter'},{'event':'addChannel','channel':'ok_sub_futureusd_ltc_ticker_quarter'},{'event':'addChannel','channel':'ok_sub_futureusd_eth_ticker_quarter'},{'event':'addChannel','channel':'ok_sub_futureusd_etc_ticker_quarter'},{'event':'addChannel','channel':'ok_sub_futureusd_bch_ticker_quarter'},{'event':'addChannel','channel':'ok_sub_futureusd_eos_ticker_quarter'}]");

    while ($message = yield $connection->receive()) {
        $payload = yield $message->buffer();
        printf("Received: %s\n", $payload);
       
        // hhhhhhhhhhhhhhhhhhh
        $dic = json_decode($payload, ture);
        print($dic);
        yield new Delayed(100);

        if($dic["channel"] != "addChannel")
        {
           print("---------------\n");
            $priceLast = (float) $dic["data"]["last"];
            printf("$priceLast %s-\n",$priceLast);

            $coin = explode('_',$dic["channel"],3);
            printf("$coin %s-\n",$coin);

            $symbol = $coin.'_usd';
            printf("$symbol %s-\n",$symbol);

            $signals = Redis::keys("future_".$symbol."_*");
            printf("prices: %s\n", $signals);

            if(count($signals) > 0) 
            {
                foreach ($signals as $signal) {
                $price =(float) explode('_',$signal,4);
                $type = (int) explode('_',$signal,3);

                printf("type: %s\n", $type);
                printf("price: %s\n", $price);

                   if($type == 3 && $priceLast <= $price)
                     {
                        $key = "future_".$symbol."_".$type."_".$price;
                         $this->future_stop_loss($key);
                     }
                   if($type == 4 && $priceLast >= $price)
                   {
                        $key = "future_".$symbol."_".$type."_".$price;
                          $this -> future_stop_loss($key);
                   }

                }
                 
            }
        }

Segfault for test case 13.1.17

Reproducible locally.

thread #1, queue = 'com.apple.main-thread', stop reason = EXC_BAD_ACCESS (code=1, address=0x1407a1f6e)
    frame #0: 0x00000001003c650c php`ZEND_HANDLE_EXCEPTION_SPEC_HANDLER + 340
    frame #1: 0x0000000100380258 php`execute_ex + 48
    frame #2: 0x00000001003f4468 php`zend_generator_resume + 260
    frame #3: 0x00000001003f4d5c php`zim_Generator_send + 128
    frame #4: 0x00000001003a3dc0 php`ZEND_DO_FCALL_SPEC_RETVAL_USED_HANDLER + 408
    frame #5: 0x0000000100380258 php`execute_ex + 48
    frame #6: 0x000000010034f800 php`zend_call_function + 1416
    frame #7: 0x00000001004100f0 php`zend_fiber_execute + 316
    frame #8: 0x000000010040fb4c php`zend_fiber_trampoline + 152
    frame #9: 0x00000001002f2cf8 php`make_fcontext + 24

Listening thausands messages/sec

I listening binance socket and 100 pairs have about 14k/msg/s. When I doing blocking operations when recieving messages (insert to db, calculating indicators etc.) After 10-20min of listening queue of socket messages have huge backlog and server can't going faster.
Then socket client crashes w/o any errors.

How to deal with it?
How I know how much msgs pending to proceed?
I need to listen much more then 14k/s x10 more. I think better hardware does't resolve problem - because CPU is't 100% when client crashes

It's not working

Hello, I try connected by example from README, but get exception:
Fatal error: Uncaught Amp\MultiReasonException: Multiple errors encountered; use Amp\MultiReasonException::getReason s() to retrieve the array of exceptions thrown in
My code:

<?php

require_once __DIR__ . '/vendor/autoload.php';

use Amp\Websocket\Client\Connection;
use Amp\Websocket\Message;
use function Amp\delay;
use function Amp\Websocket\Client\connect;

Amp\Loop::run(function () {
    /** @var Connection $connection */
    $connection = yield connect('ws://demos.kaazing.com/echo');
    yield $connection->send("Hello!");

    $i = 0;

    while ($message = yield $connection->receive()) {
        /** @var Message $message */
        $payload = yield $message->buffer();
        printf("Received: %s\n", $payload);

        if ($payload === "Goodbye!") {
            $connection->close();
            break;
        }

        yield delay(1000); // Pause the coroutine for 1 second.

        if ($i < 3) {
            yield $connection->send("Ping: " . ++$i);
        } else {
            yield $connection->send("Goodbye!");
        }
    }
});

Creating connection in thread context keeps thread from finishing

Hello! My application needs to talk with WebSocket server while it's main thread is blocked, so I used amphp/websocket:0.2.2 combined with amphp/parallel:0.2.5 and krakjoe/pthreads:3.1.7dev to do it from separate thread. Everything goes fine (successfull communication with server, I mean) except for closing the thread. I used the following minimalistic script to investigate the problem:

<?php

require_once __DIR__ . '/../vendor/autoload.php';

ob_start();
phpinfo(INFO_MODULES);
$phpinfo = ob_get_clean();
if (1 !== preg_match('#^pthreads(?:.+?)^Version\s+=>\s+(.*?)$#sm', $phpinfo, $matches)) {
    die("Failed to detect pthreads version");
}
echo "Pthreads version: {$matches[1]}\n";

$delay = 2;

echo "Program started\n";
sleep($delay);

\Amp\Loop::run(
    function () use ($delay) {
        echo "Loop started\n";
        sleep($delay);
        $thread = \Amp\Parallel\Context\Thread::run(
            function (\Amp\Parallel\Sync\Channel $channel, int $delay) {
                echo "Thread started\n";

                $ticker = \Amp\Loop::repeat(
                    1000,
                    function () {
                        static $counter = 0;
                        echo "Tick from thread: ", $counter++, "\n";
                    }
                );

                sleep($delay);

                echo "Thread loop started\n";
                \Amp\Loop::run(
                    function () use ($channel, $ticker) {
                        /** @var Amp\Websocket\Connection $connection */
                        $connection = yield Amp\Websocket\connect("ws://local.websocket:8080");
                        $data = yield $channel->receive();

                        if ('stop' == $data) {
                            \Amp\Loop::cancel($ticker);
                            echo "Ticker stopped\n";

                            echo "Connection is closed: ", $connection->isClosed() ? 'YES' : 'NO', "\n";
                            $connection->close();
                            echo "Connection is closed: ", $connection->isClosed() ? 'YES' : 'NO', "\n";
                        }
                    }
                );
                echo "Out of thread loop\n";
            },
            $delay
        );

        sleep($delay * 2);
        yield $thread->send('stop');

        yield $thread->join();
        echo "Thread joined\n";
    }
);

echo "Out of loop\n";

I also used the following dockerfile to run the script:

FROM php:7.2-zts

RUN apt-get update -q && apt-get install -qy --no-install-recommends \
    git \
    procps \
    && rm -r /var/lib/apt/lists/*

RUN pecl install xdebug \
    && git clone https://github.com/krakjoe/pthreads.git \
        && ( \
            cd pthreads \
            && phpize \
            && ./configure --enable-pthreads \
            && make -j$(nproc) \
            && make install \
        ) \
    && rm -r pthreads \
    && docker-php-ext-enable \
        pthreads \
        xdebug

It produces the following output:

Pthreads version: 3.1.7dev
Program started
Loop started
Thread started
Thread loop started
Tick from thread: 0
Tick from thread: 1
Ticker stopped
Connection is closed: NO
Connection is closed: YES
Out of thread loop
Thread joined
Out of loop

But the script hangs forever. If I comment out all $connection stuff, the program terminates okay. Further investigation with top -H revealed that opening a connection starts one more thread that doesn't finish after calling $connection->close(), and probably that "third" thread keeps "second" thread from finishing.
Is there some workaround on this situation? Is this a bug or maybe I'm just doing something wrong?

Specify timeout

Allow specified connection timeout?
Now when the target cannot be accessed, "Uncaught Amp \ Socket \ ConnectException: Connecting to tcp: // localhost: 10000 failed: timeout exceeded (10000 ms)" will occur.
But the 10 second timeout is too long

Ping-Pong Error - Connection Close

Hello, I am using the stable version published. I coded it from the samples page. There is one server and one client. Both were coded with AMPHP. However, after receiving the server and client responses below, the connection ends.

I can't see the PING-PONG answers. I read that you handle it automatically.
However, the situation does not seem that way.

Client Response:
Connection closed abnormally while awaiting message; Code 1008 (POLICY_VIOLATION); Reason: "Exceeded unanswered PING limit

Server Response:
server.notice: Client initiated websocket close reporting error (code: 1008): Exceeded unanswered PING limit [] []

Adding/switching to an EventEmitter interface?

I planned to use this library as a wrapper around Slack's API. As I'm writing a public library, the raw websockets client needs to be hidden and decorated by my own interface.

The problem is, with the current interface, it leads to horrible gymnastics (onOpen example only, but you can add onData, and onClose) :

class RtmClient extends WebClient
{
    public function start() : Promise
    {
        return pipe($this->callAsync('rtm.start'), function(array $rtmInfo) {
            $onOpen = \Closure::fromCallable([$this, 'handleOpen']);

            $repeater = new class($onOpen) implements Websocket {
                private $onOpen;

                public function __construct(callable $onOpen)
                {
                    $this->onOpen = $onOpen;
                }

                public function onOpen(Websocket\Endpoint $endpoint, array $headers)
                {
                    call_user_func($this->onOpen, $endpoint, $headers);
                }
            };

            return websocket($repeater, new Websocket\Handshake($rtmInfo['url']));
        });
    }

    private function handleOpen(Websocket\Endpoint $endpoint, array $headers) : void
    {
        // ...
    }
}

To avoid that (which is especially hideous and non-performant), I can also make RtmClient an implementor of the Amp\Websocket interface, renaming handleOpen in onOpen (and onData, etc), and making this method public. But that makes it part of my public API.

Since this library is somewhat low level, I don't think that the current API, although neat, is well suited ; in most of the cases, you'll want to decorate this websocket client and emit higher-level objects. IMO, a simple EventEmitter would make the component more easily reusable, because the current Amp\Websocket interface makes that task too hard.

Your thoughts?

Regards,

The request was not processed and can be safely retried

Hi,

All of a sudden we're receiving the error "The request was not processed and can be safely retried.". Until now, everything worked perfectly.

Anybody got an idea of what the error means?

Info

  • Php 8.1.10
  • Apache/2.4.54
  • OpenSSL 1.1.1s

Code:

\Amp\Loop::run(
    function () {
        $handshake = new \Amp\Websocket\Client\Handshake('wss://...');

        while (true) {
            try {
                $connection = yield connect($handshake);

                yield $connection->send(json_encode([...subscription_info...]));

                while ($message = yield $connection->receive()) {
                    // ...
                }
            } catch (Exception $e) {
                // ...
            }
        }
    }
);

Stack trace:

Error reads: The request was not processed and can be safely retried.
 
Trace: #0 [internal function]: Amp\Http\Client\Connection\DefaultConnectionFactory->Amp\Http\Client\Connection\{closure}()
#1 ../vendor/amphp/amp/lib/Coroutine.php(115): Generator->throw(Object(Amp\Socket\TlsException))
#2 ../vendor/amphp/amp/lib/Failure.php(33): Amp\Coroutine->Amp\{closure}(Object(Amp\Socket\TlsException), NULL)
#3 ../vendor/amphp/amp/lib/Internal/Placeholder.php(143): Amp\Failure->onResolve(Object(Closure))
#4 ../vendor/amphp/amp/lib/Internal/Placeholder.php(177): Amp\Coroutine->resolve(Object(Amp\Failure))
#5 ../vendor/amphp/amp/lib/Coroutine.php(137): Amp\Coroutine->fail(Object(Amp\Socket\TlsException))
#6 ../vendor/amphp/amp/lib/Failure.php(33): Amp\Coroutine->Amp\{closure}(Object(Amp\Socket\TlsException), NULL)
#7 ../vendor/amphp/amp/lib/Internal/Placeholder.php(143): Amp\Failure->onResolve(Object(Closure))
#8 ../vendor/amphp/amp/lib/Internal/Placeholder.php(177): Amp\Promise@anonymous->resolve(Object(Amp\Failure))
#9 ../vendor/amphp/amp/lib/Deferred.php(66): Amp\Promise@anonymous->fail(Object(Amp\Socket\TlsException))
#10 ../vendor/amphp/socket/src/Internal/functions.php(148): Amp\Deferred->fail(Object(Amp\Socket\TlsException))
#11 ../vendor/amphp/amp/lib/Loop/NativeDriver.php(327): Amp\Socket\Internal\{closure}('ac', Resource id #1219, Object(Amp\Deferred))
#12 ../vendor/amphp/amp/lib/Loop/NativeDriver.php(127): Amp\Loop\NativeDriver->selectStreams(Array, Array, 4.625)
#13 ../vendor/amphp/amp/lib/Loop/Driver.php(138): Amp\Loop\NativeDriver->dispatch(true)
#14 ../vendor/amphp/amp/lib/Loop/Driver.php(72): Amp\Loop\Driver->tick()
#15 ../vendor/amphp/amp/lib/Loop.php(95): Amp\Loop\Driver->run()
...

Invalid request phase transition from ServerProcessing to Connect

An invalid request phase transition happens if the connection isn't properly established, here because dum-increment-protocol is passed instead of dumb-increment-protocol.

<?php

require \dirname(__DIR__) . '/vendor/autoload.php';

use Amp\Websocket\Client\WebsocketHandshake;
use function Amp\Websocket\Client\connect;

// Connects to the websocket endpoint at libwebsockets.org which sends a message every 50ms.
$handshake = (new WebsocketHandshake('wss://libwebsockets.org'))
    ->withHeader('Sec-WebSocket-Protocol', 'dum-increment-protocol');

$connection = connect($handshake);

while ($message = $connection->receive()) {
    $payload = $message->buffer();

    \printf("Received: %s\n", $payload);

    if ($payload === '100') {
        $connection->close();
        break;
    }
}
โžœ php examples/libwebsockets.org.php

Fatal error: Uncaught Error: Invalid request phase transition from ServerProcessing to Connect in /Users/kelunik/PHP/amphp/websocket-client/vendor/amphp/http-client/src/Internal/EventInvoker.php on line 85

Error: Invalid request phase transition from ServerProcessing to Connect in /Users/kelunik/PHP/amphp/websocket-client/vendor/amphp/http-client/src/Internal/EventInvoker.php on line 85

Call Stack:
    0.0406    5097232   1. {fiber:1053903C0}() /Users/kelunik/PHP/amphp/websocket-client/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php:498
    0.0406    5117712   2. Revolt\EventLoop\Driver\StreamSelectDriver->Revolt\EventLoop\Internal\{closure:/Users/kelunik/PHP/amphp/websocket-client/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php:560-622}() /Users/kelunik/PHP/amphp/websocket-client/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php:498
    0.2997    6165592   3. Revolt\EventLoop\Driver\StreamSelectDriver->invokeMicrotasks() /Users/kelunik/PHP/amphp/websocket-client/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php:616
    0.2999    6171544   4. Amp\{closure:/Users/kelunik/PHP/amphp/websocket-client/vendor/amphp/amp/src/functions.php:23-37}() /Users/kelunik/PHP/amphp/websocket-client/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php:425
    0.2999    6171704   5. Amp\Http\Client\Connection\DefaultConnectionFactory->create() /Users/kelunik/PHP/amphp/websocket-client/vendor/amphp/amp/src/functions.php:33
    0.3000    6171704   6. Amp\Http\Client\Internal\EventInvoker->connectStart() /Users/kelunik/PHP/amphp/websocket-client/vendor/amphp/http-client/src/Connection/DefaultConnectionFactory.php:33

Are Ping Pong Control frames supported?

I am interested to know if amphp/websocket client responds with pong control frame to the ping control frame from web socket server, as per RFC6455 (https://tools.ietf.org/html/rfc6455#section-5.5)?

While this library is really helpful in sending web socket requests, in sync with amphp/redis library, in the scope of redis pubsub related subscribe method, the web socket connection is getting disconnected in few seconds after connection initiation (if I put websocket client connection code above and outside the while loop of amphp/redis library's subscribe method based code), even though my web socket server is configured to support Ping Pong Control frames.

I didnot use the While statement code of the amphp websocket client since, that blocks the next code (i.e. amphp/redis related channel subscription code), just to test, I did include amphp/websocket client connection code inside the While loop of amphp/redis related subscribe method, when the websocket connection is happening and connection is closing with the close command thereafter.

Is this approach scalable?

please share your thoughts to ensure, I be able to use websocket client connection code once, while using that connection code to send messages to the websocket server @kelunik

thank you

Pings regularly, not in blocking receive wait?

This is a very handy library... but what if I want to listen, as in the example

while ($message = yield $connection->receive()) { ...

but also ping every x seconds even if there was nothing received?
A websocket might need regular pings to stay active.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.