Git Product home page Git Product logo

postgres's Introduction

amphp/postgres

AMPHP is a collection of event-driven libraries for PHP designed with fibers and concurrency in mind. amphp/postgres is an asynchronous Postgres client. The library implements concurrent querying by transparently distributing queries across a scalable pool of available connections. Either ext-pgsql (bundled with PHP) or pecl-pq are required.

Features

  • Exposes a non-blocking API for issuing multiple Postgres queries concurrently
  • Transparent connection pooling to overcome Postgres' fundamentally synchronous connection protocol
  • Support for parameterized prepared statements
  • Nested transactions with commit and rollback event hooks
  • Unbuffered results to reduce memory usage for large result sets
  • Support for sending and receiving notifications

Installation

This package can be installed as a Composer dependency.

composer require amphp/postgres

Requirements

Note: pecl-ev is not compatible with ext-pgsql. If you wish to use pecl-ev for the event loop backend, you must use pecl-pq.

Documentation & Examples

Prepared statements and parameterized queries support named placeholders, as well as ? and standard numeric (i.e. $1) placeholders.

Row values are cast to their corresponding PHP types. For example, integer columns will be an int in the result row array.

More examples can be found in the examples directory.

use Amp\Postgres\PostgresConfig;
use Amp\Postgres\PostgresConnectionPool;

$config = PostgresConfig::fromString("host=localhost user=postgres db=test");

$pool = new PostgresConnectionPool($config);

$statement = $pool->prepare("SELECT * FROM test WHERE id = :id");

$result = $statement->execute(['id' => 1337]);
foreach ($result as $row) {
    // $row is an associative-array of column values, e.g.: $row['column_name']
}

Versioning

amphp/postgres follows the semver semantic versioning specification like all other amphp packages.

Security

If you discover any security related issues, please use the private security issue reporter instead of using the public issue tracker.

License

The MIT License (MIT). Please see LICENSE for more information.

postgres's People

Contributors

bilge avatar bwoebi avatar daverandom avatar enumag avatar kelunik avatar peehaa avatar prolic avatar trowski avatar uasan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

postgres's Issues

Amp\SQL implementation not ready

Fatal error: Class Amp\Postgres\PgSqlConnection contains 2 abstract methods and must therefore be declared abstract or implement the remaining methods (Amp\Sql\Link::beginTransaction, Amp\Sql\TransientResource::getLastUsedAt)

/cc @trowski @kelunik

PqHandle: another command is already in progress

After 2-6 hours high load I try error:

another command is already in progress: /project/vendor/amphp/postgres/src/PqHandle.php:202

After this error, no request is answered. I need restart amphp-cluster for restore correct work.

Installed packages, channel pecl.php.net:
=========================================
Package Version State
ev      1.0.6   stable
pq      2.1.8   stable
raphf   2.0.1   stable

Improper getCurrent implementation for end of ResultSet

if ($this->position > \pg_num_rows($this->handle)) {
throw new \Error("No more rows remain in the result set");
}

I'm not sure this is a good way to implement getCurrent(). It is the role of advance() to move the cursor forward. If it's at the end, I think it should just continue emitting the same row, rather than throwing an Error.

Moreover, since this is a generic type \Error, it's difficult to catch this without the risk of including false positives. I think we should either avoid throwing errors in this situation, or double down and throw a specific error type from the library's namespace.

Prepared statements

I submitted an issue at crate/crate#6785 and they mentioned switching to the protocol level prepared statement functionality.

Are you not using the "protocol level prepared statement functionality" because of how this library asynchronously interacts with postgresql?

How to keep reference to a connection?

I need to keep a reference to a specific connection, how can I do this?

Problem is, I want to make use of advisory locks, but this kind of lock is bound to the connection that started the lock, so I need to query the same connection from that point on until I release that lock. Releasing that lock must be done through the same connection again of course.

$pool->beginTransaction() and $pool->execute() just "hang" forever

More code around:

var_dump(__LINE__); // this is the last I see in output
try {
    $pool->query('DROP TABLE IF EXISTS test');
    var_dump(__LINE__); // this I don't see anymore
    $transaction = $pool->beginTransaction();
    var_dump(__LINE__);
    $transaction->query('CREATE TABLE test (domain VARCHAR(63), tld VARCHAR(63), PRIMARY KEY (domain, tld))');
    var_dump(__LINE__);
    $statement = $transaction->prepare('INSERT INTO test VALUES (?, ?)');
    var_dump(__LINE__);
    $statement->execute(['amphp', 'org']);
    $statement->execute(['google', 'com']);
    $statement->execute(['github', 'com']);
} catch (Throwable $e) {
    // we never see this
    var_dump(get_class($e));
    var_dump($e->getMessage());
    var_dump($e->getTraceAsString());
}

Error codes (sqlstate) missing in QueryError

To handle a query error properly, it is necessary what kind of error this is. Working on an error message is not viable. PG offers the SQLSTATE for that. The QueryError could be extended to contain the SQLSTATE and even more diagnostic information.

PQ offers that information in the 'diag' field of the result, pgsql offers this through pg_result_error_field. QueryError should provide a common interface for that.

Usage example with Model classes

Hi there!

In a non-amp context I am used to have Model classes for the database access. I can get the examples to work - but I have a hard time mapping the "amp" way to a model class approach.

So simple something like this:

$dsn = 'postgres://amp:amp@localhost:1234/amp';

Amp\Loop::run(function () use ($dsn) {
    $pool = Postgres\pool($dsn);
    $model = new SomeModel($pool);
    var_dump($model->fetchAll());
   // Array of values
});
class SomeModel
{
    protected $pool;

    public function __construct(Pool $pool)
    {
        $this->pool = $pool;
    }
    
    public function fetchAll()
    {
    }
}

Is there a another prefered way of doing this in an amp context?

Is there maybe an example project where this library is used in a "real world" application?

Thanks!

another command is already in progress

Good evening, question about the PQ driver.

there is a code: https://paste.ofcode.org/JT4euUCF55dAE282nXwcLB (the id field is the primary key.)
if I run it for the first time then everything is fine.

On the second run, the first query will get an error related to a unique record.
And to hell with it, but the second request will also fail with error "another command is already in progress"
And the error occurs with any fuckups related to the execution of the queries.

installed extensions: sockets, event, raphf, pq

Throw when prepare fails

AFAICS currently preparing a statement fails silently.

You will only find out it failed when trying to execute the statement resulting in the somewhat less useful message:

error Amp\Postgres\QueryError: ERROR: prepared statement "amp_46cfa66905e07b1e3b52558541a4bcf486056883" does not exist

Would be nice if errors on prepare would throw instead. So we have:

  1. the error at the location it actually fails
  2. have access to the actual error message.

[V2] Pending transaction rollbackTo in PHPUnit

Example file:

<?php

namespace Amp\Postgres\Test;

use Amp\PHPUnit\AsyncTestCase;
use Amp\Postgres\PostgresConfig;
use Amp\Postgres\PostgresConnectionPool;
use Amp\Postgres\PostgresTransaction;
use function Amp\async;
use function Amp\Future\await;

class PgSqlAwaitTest extends AsyncTestCase
{
    protected PostgresConnectionPool $connection;
    protected PostgresTransaction $transaction;

    protected function setUp(): void
    {
        parent::setUp();
        $this->connection = new PostgresConnectionPool(PostgresConfig::fromString("host=localhost user=postgres password=postgres"));
        $this->transaction = $this->connection->beginTransaction();
        $this->transaction->createSavepoint('phpunit_savepoint');
    }

    protected function tearDown(): void
    {
        $this->transaction->rollbackTo('phpunit_savepoint');
        $this->transaction->rollback();
        $this->connection->close();
    }

    public function testAwait(): void
    {
        $this->expectExceptionMessage('First');
        await([
            async(function () {
                $this->transaction->query('SELECT 1');
                throw new \Exception('First');
            }),
        ]);
    }

    public function testAwaitMultiple(): void
    {
        $this->expectExceptionMessage('First');
        await([
            async(function () {
                $this->transaction->query('SELECT 1');
                throw new \Exception('First');
            }),
            async(function () {
                $this->transaction->query('SELECT 2');
            }),
        ]);
    }
}

testAwait - completes successfully, but testAwaitMultiple hangs constantly waiting on the line $this->transaction->rollbackTo('phpunit_savepoint'); in tearDown method.

I think need modify await function to:

function await(iterable $futures, ?Cancellation $cancellation = null): array
{
    $values = [];
    $errors = [];

    // Future::iterate() to throw the first error based on completion order instead of argument order
    foreach (Future::iterate($futures, $cancellation) as $index => $future) {
        try {
            $values[$index] = $future->await();
        } catch (\Throwable $throwable) {
            $errors[$index] = $throwable;
        }
    }
    foreach ($errors as $error) {
        throw $error;
    }

    /** @var array<Tk, Tv> */
    return $values;
}

Unwanted PHP Notices

PHP Notice:  pg_send_query(): There are results on this connection. Call pg_get_result() until it returns FALSE in /var/www/vendor/amphp/postgres/src/PgSqlHandle.php on line 228

Could you mute'em, please with @? They are happened especially while working with a database pool.

How to UPDATE .... FROM

Im trying to build a query like this, any help appreciated

update tc_positions as t set
    speedlimit = c.speedlimit
from (values
    (1011827, 45),
    (1011831, 65)  
) as c(positionid, speedlimit) 
where c.positionid = t.id;

Errors of statements at PqHandle

Using the pq extension sometimes get the following errors:

  • at PqHandle::statementExecute: Named statement not found when executing
  • at PqHandle::prepare: prepared statement "amp_c1e68597d8be2371adc09335382ad9f18ab407dd" already exists

These errors occur only with a high number of requests per second, more than 150. The following results were obtained (total queries 5000-6000, using pool with max_coonnections = 90, timeout = 30 sec)

  • failed=19, rps=150-200
  • failed=0, rps=250-300
  • failed=7, rps=250-300
  • failed=11, rps=350-400
  • failed=25, rps=350-400
  • failed=40, rps=400-450

Also at PqHandle::statementExecute after success assert i had pq\Connection state like this:
"status": 0, "transactionStatus": 0, "errorMessage": "ERROR: prepared statement "amp_c1e68597d8be2371adc09335382ad9f18ab407dd" does not exist"

ArrayParser Error

ArrayParser::parse returns [''] for string {} instead of []. Could you fix it, please

Closing connection during a long running query blocks the loop

If we have a long running query and try to close connection while it is executing, loop will be blocked.

Script:

<?php

use Amp\Loop;
use Amp\Postgres\ConnectionConfig;
use function Amp\Postgres\pool;

require __DIR__ . '/../vendor/autoload.php';

Loop::run(function () {
    echo 'Start' . PHP_EOL;

    $config = ConnectionConfig::fromString("host=localhost user=test password=1234 db=test");
    $pool = pool($config);

    Loop::repeat(1000, static function () {
        echo 'Loop heartbeat: ' . time() . PHP_EOL;
    });

    Loop::delay(3000, static function () use ($pool) {
        echo 'Close connection' . PHP_EOL;
        $pool->close();
    });

    try {
        echo 'Query' . PHP_EOL;
        $result = yield $pool->execute('SELECT pg_sleep(10)');
        while (yield $result->advance()) {
            $row = $result->getCurrent();
            var_dump($row);
        }
    } catch (Throwable $e) {
        echo $e->getMessage() . PHP_EOL;
    }
});

Result:

Start
Query
Loop heartbeat: 1661195555
Loop heartbeat: 1661195556
Close connection
The connection was closed
Loop heartbeat: 1661195564
Loop heartbeat: 1661195565
Loop heartbeat: 1661195566
Loop heartbeat: 1661195567

Loop was blocking for 8 seconds from 1661195556 to 1661195564 until query end. Is it expected behavoir?

Seems, loop must not be blocked in this case.

[bug]: unexpected exception when pool limit is reached.

given the following code:

<?php

use Amp\Postgres;
use Amp\Postgres\Link;
use Amp\Future;

require __DIR__ . '/../vendor/autoload.php';

$operate = static function (Link $pgsql, string $table, int $identifier): void {
    $pgsql->query('CREATE TABLE IF NOT EXISTS ' . $table . ' (id SERIAL PRIMARY KEY, content TEXT NOT NULL)');

    Future\all([
        Amp\async(static fn() => $pgsql->query('INSERT INTO ' . $table . ' ( content ) VALUES (\'Hello, You!\')')),
        Amp\async(static fn() => $pgsql->query('INSERT INTO ' . $table . ' ( content ) VALUES (\'Hello, City!\')')),
        Amp\async(static fn() => $pgsql->query('INSERT INTO ' . $table . ' ( content ) VALUES (\'Hello, Country!\')')),
        Amp\async(static fn() => $pgsql->query('INSERT INTO ' . $table . ' ( content ) VALUES (\'Hello, World!\')')),
        Amp\async(static fn() => $pgsql->query('INSERT INTO ' . $table . ' ( content ) VALUES (\'Hello, Galaxy!\')')),
        Amp\async(static fn() => $pgsql->query('INSERT INTO ' . $table . ' ( content ) VALUES (\'Hello, Universe!\')')),
    ]);

    $result = $pgsql->query('SELECT * FROM ' . $table . '; DROP TABLE IF EXISTS ' . $table);

    printf('[%*d] successfully operated on "%s" table, row count: %d%s', 3, $identifier, $table, $result->getRowCount(), "\n");
};

$config = Postgres\ConnectionConfig::fromString('host=127.0.0.1 port=32770 user=main password=main');
$link = Postgres\pool($config, maxConnections: 40);

$futures = [];
for ($i = 1; $i <= 40; $i++) {
    $table = uniqid('tmp_table_', false);
    $futures[] = Amp\async(fn() => $operate($link, $table, $i));
}

Future\all($futures);

I receive the following error:

> php example/database.php
[  1] successfully operated on "tmp_table_61f1e7eae9517" table, row count: 6
[  4] successfully operated on "tmp_table_61f1e7eae96a7" table, row count: 6
PHP Fatal error:  Uncaught Error: Operation is no longer pending in /home/azjezz/Projects/neutomic/neu/vendor/amphp/amp/src/Internal/FutureState.php:88
Stack trace:
#0 /home/azjezz/Projects/neutomic/neu/vendor/amphp/amp/src/DeferredFuture.php(29): Amp\Internal\FutureState->complete(Object(Amp\Postgres\PgSqlConnection))
#1 /home/azjezz/Projects/neutomic/neu/vendor/amphp/sql-common/src/ConnectionPool.php(311): Amp\DeferredFuture->complete(Object(Amp\Postgres\PgSqlConnection))
#2 /home/azjezz/Projects/neutomic/neu/vendor/amphp/sql-common/src/ConnectionPool.php(329): Amp\Sql\Common\ConnectionPool->push(Object(Amp\Postgres\PgSqlConnection))
#3 /home/azjezz/Projects/neutomic/neu/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(585): Amp\Sql\Common\ConnectionPool->Amp\Sql\Common\{closure}()
#4 [internal function]: Revolt\EventLoop\Internal\AbstractDriver::Revolt\EventLoop\Internal\{closure}()
#5 /home/azjezz/Projects/neutomic/neu/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(503): Fiber->resume(Array)
#6 /home/azjezz/Projects/neutomic/neu/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(414): Revolt\EventLoop\Internal\AbstractDriver->invokeMicrotasks()
#7 /home/azjezz/Projects/neutomic/neu/vendor/revolt/event-loop/src/EventLoop/Driver/StreamSelectDriver.php(285): Revolt\EventLoop\Internal\AbstractDriver->invokeCallback(Object(Revolt\EventLoop\Internal\StreamReadableCallback))
#8 /home/azjezz/Projects/neutomic/neu/vendor/revolt/event-loop/src/EventLoop/Driver/StreamSelectDriver.php(122): Revolt\EventLoop\Driver\StreamSelectDriver->selectStreams(Array, Array, 0.5556615100013)
#9 /home/azjezz/Projects/neutomic/neu/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(488): Revolt\EventLoop\Driver\StreamSelectDriver->dispatch(true)
#10 /home/azjezz/Projects/neutomic/neu/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(551): Revolt\EventLoop\Internal\AbstractDriver->tick()
#11 [internal function]: Revolt\EventLoop\Internal\AbstractDriver->Revolt\EventLoop\Internal\{closure}()
#12 /home/azjezz/Projects/neutomic/neu/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(100): Fiber->resume()
#13 /home/azjezz/Projects/neutomic/neu/vendor/revolt/event-loop/src/EventLoop/Internal/DriverSuspension.php(80): Revolt\EventLoop\Internal\AbstractDriver->Revolt\EventLoop\Internal\{closure}()
#14 /home/azjezz/Projects/neutomic/neu/vendor/amphp/amp/src/Internal/FutureIterator.php(128): Revolt\EventLoop\Internal\DriverSuspension->suspend()
#15 /home/azjezz/Projects/neutomic/neu/vendor/amphp/amp/src/Future.php(56): Amp\Internal\FutureIterator->consume()
#16 /home/azjezz/Projects/neutomic/neu/vendor/amphp/amp/src/Future/functions.php(136): Amp\Future::iterate(Array, NULL)
#17 /home/azjezz/Projects/neutomic/neu/example/database.php(38): Amp\Future\all(Array)
#18 {main}
  thrown in /home/azjezz/Projects/neutomic/neu/vendor/amphp/amp/src/Internal/FutureState.php on line 88

Fatal error: Uncaught Error: Operation is no longer pending in /home/azjezz/Projects/neutomic/neu/vendor/amphp/amp/src/Internal/FutureState.php:88
Stack trace:
#0 /home/azjezz/Projects/neutomic/neu/vendor/amphp/amp/src/DeferredFuture.php(29): Amp\Internal\FutureState->complete(Object(Amp\Postgres\PgSqlConnection))
#1 /home/azjezz/Projects/neutomic/neu/vendor/amphp/sql-common/src/ConnectionPool.php(311): Amp\DeferredFuture->complete(Object(Amp\Postgres\PgSqlConnection))
#2 /home/azjezz/Projects/neutomic/neu/vendor/amphp/sql-common/src/ConnectionPool.php(329): Amp\Sql\Common\ConnectionPool->push(Object(Amp\Postgres\PgSqlConnection))
#3 /home/azjezz/Projects/neutomic/neu/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(585): Amp\Sql\Common\ConnectionPool->Amp\Sql\Common\{closure}()
#4 [internal function]: Revolt\EventLoop\Internal\AbstractDriver::Revolt\EventLoop\Internal\{closure}()
#5 /home/azjezz/Projects/neutomic/neu/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(503): Fiber->resume(Array)
#6 /home/azjezz/Projects/neutomic/neu/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(414): Revolt\EventLoop\Internal\AbstractDriver->invokeMicrotasks()
#7 /home/azjezz/Projects/neutomic/neu/vendor/revolt/event-loop/src/EventLoop/Driver/StreamSelectDriver.php(285): Revolt\EventLoop\Internal\AbstractDriver->invokeCallback(Object(Revolt\EventLoop\Internal\StreamReadableCallback))
#8 /home/azjezz/Projects/neutomic/neu/vendor/revolt/event-loop/src/EventLoop/Driver/StreamSelectDriver.php(122): Revolt\EventLoop\Driver\StreamSelectDriver->selectStreams(Array, Array, 0.5556615100013)
#9 /home/azjezz/Projects/neutomic/neu/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(488): Revolt\EventLoop\Driver\StreamSelectDriver->dispatch(true)
#10 /home/azjezz/Projects/neutomic/neu/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(551): Revolt\EventLoop\Internal\AbstractDriver->tick()
#11 [internal function]: Revolt\EventLoop\Internal\AbstractDriver->Revolt\EventLoop\Internal\{closure}()
#12 /home/azjezz/Projects/neutomic/neu/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(100): Fiber->resume()
#13 /home/azjezz/Projects/neutomic/neu/vendor/revolt/event-loop/src/EventLoop/Internal/DriverSuspension.php(80): Revolt\EventLoop\Internal\AbstractDriver->Revolt\EventLoop\Internal\{closure}()
#14 /home/azjezz/Projects/neutomic/neu/vendor/amphp/amp/src/Internal/FutureIterator.php(128): Revolt\EventLoop\Internal\DriverSuspension->suspend()
#15 /home/azjezz/Projects/neutomic/neu/vendor/amphp/amp/src/Future.php(56): Amp\Internal\FutureIterator->consume()
#16 /home/azjezz/Projects/neutomic/neu/vendor/amphp/amp/src/Future/functions.php(136): Amp\Future::iterate(Array, NULL)
#17 /home/azjezz/Projects/neutomic/neu/example/database.php(38): Amp\Future\all(Array)
#18 {main}
  thrown in /home/azjezz/Projects/neutomic/neu/vendor/amphp/amp/src/Internal/FutureState.php on line 88

Changing maxConnection to a higher number, or lower the number of operations ( e.g: $i <= 10 ), results in a successful run.

It appears that the lock mechanism used in the connection pool is incorrect, i haven't taken a deep look into it yet.

Row count reported by driver hidden behind private API

One of the most useful things to know when performing a query is how many results are in the results set. This information is provided by the driver and exposed by a public method in this library, but by an object that is inaccessible to the consumer.

PgSqlResultSet exposes the method numRows(), but PgSqlResultSet is wrapped by PooledResultSet which is what the consumer actually receives and which does not provided any public access to the underlying PgSqlResultSet. Due to this, the row count is inaccessible to the consumer.

Also, and this is a side issue, even if it were possible to access numRows() via PooledResultSet, the library only declares interfaces, and the method is not present on the generic SQL interface so declared. The consumer would have to know the underlying object's exact type and override the type system to access it.

Array parameters for IN clause

In some cases I want to use IN (...) with a dynamic number of values. For instance:

// $ids is int[]
$result = $link->execute('SELECT * FROM articles WHERE id IN (:ids)', ['ids' => $ids]);

At the moment this returns nothing - no error but no result either even if there is one.

I know that this might be non-trivial to implement, however Doctrine does have this feature. See PARAM_INT_ARRAY / PARAM_STR_ARRAY in Doctrine documentation. It is done through kind of a hack though - the database receives WHERE id IN (?, ?, ?, ?, ?, ?) with several parameters instead.

I'm not quite sure how this library works internally. Do you think this would be possible to implement somehow? I kinda dislike using a dynamic number of question marks in the SQL query...

Amp\Loop\InvalidWatcherError : Cannot reference an invalid watcher identifier: 'e'

I try use PHPUnit with Postgres adapter. But I catch error.
amphp/postgres v1.4.3
Example code:

<?php
use Amp\PHPUnit\AsyncTestCase;
use Amp\Postgres\ConnectionConfig;

$connection = \Amp\Postgres\pool(ConnectionConfig::fromString("host=localhost port=5432 dbname=rees46_test user=rails password=rails"), 5);

class BarTest extends AsyncTestCase {

	public function test() {
		global $connection;

		$result = yield $connection->execute('SELECT 1 AS c');
		if( yield $result->advance() ) {
			$this->assertSame(1, $result->getCurrent()['c']);
		}
	}
	public function test2() {
		global $connection;

		$result = yield $connection->execute('SELECT 1 AS c');
		if( yield $result->advance() ) {
			$this->assertSame(1, $result->getCurrent()['c']);
		}
	}
}

Test log:

Testing started at 23:04 ...
PHPUnit 9.5.11 by Sebastian Bergmann and contributors.

Runtime:       PHP 8.0.12
Configuration: /project/phpunit.xml

Amp\Loop\InvalidWatcherError : Cannot reference an invalid watcher identifier: 'e'
 /project/vendor/amphp/amp/lib/Loop/Driver.php:516
 /project/vendor/amphp/amp/lib/Loop.php:322
 /project/vendor/amphp/postgres/src/PgSqlHandle.php:269
 /project/vendor/amphp/postgres/src/PgSqlHandle.php:381
 /project/vendor/amphp/amp/lib/Coroutine.php:67
 /project/vendor/amphp/amp/lib/functions.php:96
 /project/vendor/amphp/postgres/src/PgSqlHandle.php:382
 /project/vendor/amphp/postgres/src/Connection.php:79
 /project/vendor/amphp/postgres/src/Connection.php:109
 /project/vendor/amphp/postgres/src/Pool.php:83
 /project/vendor/amphp/sql-common/src/ConnectionPool.php:363
 /project/vendor/amphp/amp/lib/Coroutine.php:67
 /project/vendor/amphp/amp/lib/functions.php:96
 /project/vendor/amphp/sql-common/src/ConnectionPool.php:382
 /project/test/BarTest.php:21
 /project/vendor/amphp/amp/lib/Coroutine.php:67
 /project/vendor/amphp/phpunit-util/src/AsyncTestCase.php:278
 /project/vendor/amphp/phpunit-util/src/AsyncTestCase.php:137
 /project/vendor/amphp/amp/lib/Coroutine.php:118
 /project/vendor/amphp/amp/lib/Success.php:41
 /project/vendor/amphp/amp/lib/Coroutine.php:151
 /project/vendor/amphp/phpunit-util/src/AsyncTestCase.php:68
 /project/vendor/amphp/amp/lib/Loop/Driver.php:119
 /project/vendor/amphp/amp/lib/Loop/Driver.php:72
 /project/vendor/amphp/amp/lib/Loop/EventDriver.php:211
 /project/vendor/amphp/amp/lib/Loop.php:95
 /project/vendor/amphp/phpunit-util/src/AsyncTestCase.php:96
 /project/vendor/amphp/phpunit-util/src/AsyncTestCase.php:46

Time: 00:00.110, Memory: 10.00 MB

ERRORS!
Tests: 2, Assertions: 1, Errors: 1.

But for this example, tests works correctly:

<?php
use Amp\PHPUnit\AsyncTestCase;
use Amp\Postgres\ConnectionConfig;

class BarTest extends AsyncTestCase {

	public function test() {
		$connection = \Amp\Postgres\pool(ConnectionConfig::fromString("host=localhost port=5432 dbname=rees46_test user=rails password=rails"), 5);

		$result = yield $connection->execute('SELECT 1 AS c');
		if( yield $result->advance() ) {
			$this->assertSame(1, $result->getCurrent()['c']);
		}
	}
	public function test2() {
		$connection = \Amp\Postgres\pool(ConnectionConfig::fromString("host=localhost port=5432 dbname=rees46_test user=rails password=rails"), 5);

		$result = yield $connection->execute('SELECT 1 AS c');
		if( yield $result->advance() ) {
			$this->assertSame(1, $result->getCurrent()['c']);
		}
	}
}

Log:

Testing started at 23:06 ...
PHPUnit 9.5.11 by Sebastian Bergmann and contributors.

Runtime:       PHP 8.0.12
Configuration: /Users/nixx/Projects/rees46-core/phpunit.xml

Time: 00:00.113, Memory: 8.00 MB

OK (2 tests, 2 assertions)

Set default time zone for pool

Sql query: SET TIMEZONE TO 'UTC' not working, I receiving time in my local TZ 2021-03-22 14:57:49.834961+03.
But it work with connect psql:

# select now();
              now
-------------------------------
 2021-03-22 15:02:39.242053+03
(1 row)
# SET TIMEZONE TO 'UTC';
SET
# select now();
              now
-------------------------------
 2021-03-22 12:03:10.843069+00
(1 row)

Add a new example

Currently, you have the basic example, which only shows one query being executed. Can you include another example where one could execute multiple queries in parallel asynchronously?

Currently we're working with the below example and I'm wondering if this is the correct way to implement this using amphp.

$queries = [
    [
        'key' => 'totalRows',
        'statement' => 'SELECT COUNT(*) as numRows FROM mytable WHERE x = ?',
        'where' => [
            'vals' => [ 'abc' ]
        ],
        'params' => []
    ]
];
use Amp\Loop;
use Amp\Postgres;

$returnData = [];

// This section below is in a try catch and will only throw an error if it doesn't refer to the deallocate function
$pool = Postgres\pool('my dsn string');

Loop::run(function() use ($queries, $pool, &$returnData) {
    $promises = [];
    $results = [];

    foreach ($queries as $query) {
        $key = $query['key'];
        $promises[$key] = yield $pool->prepare($query['statement'], $query['params']);
        $results[$key] = yield $promises[$key]->execute($query['where']['vals']);
    }

    foreach($results as $key => $result) {
        $returnData[$key] = [];

        while (yield $result->advance()) {
            $item = $result->getCurrent();
            $returnData[$key][] = $item;
        }
    }
});

print_r($returnData);

Segmentation fault with ext-event

This library is causing a segfault for me when I try to use ext-event as the loop driver. I reported it here and Ruslan Osmanov fixed it just now. But he is wondering about how the situation could occur in the first place which I unfortunately can't answer since I'm not familiar with the internals of \Amp\Loop\EventDriver.

@kelunik @trowski Perhaps you might have an idea what's going on?

https://bitbucket.org/osmanov/pecl-event/issues/60/segmentation-fault-on-alpine#comment-57357616

Need fix after amphp/sql update

After commit e257237b61da7cb25a59a95299e232e9b536d93c (Improve naming) at amphp/sql package need to fix Amp\Postgres\Connection class.

Fatal error: Class Amp\Postgres\PgSqlConnection contains 2 abstract methods and must therefore be declared abstract or implement the remaining methods (Amp\Sql\Link::beginTransaction, Amp\Sql\TransientResource::getLastUsedAt) in vendor/amphp/postgres/src/PgSqlConnection.php on line 14

Prepared statement already exists

> Amp\Postgres\QueryExecutionError: ERROR:  prepared statement "amp_fca13c924dc545addd9f67807241cb1d7b83f685" already exists
>  in /home/mtm/vendor/amphp/postgres/src/PgSqlHandle.php:397
> 

Hi! I'm getting this error, how can I fix it? Do you need other logs?

Array of enum values is parsed as string instead of array

CREATE TYPE "Foo" AS ENUM ('F1', 'F2', 'F3');
ALTER TABLE "myFoo" ADD COLUMN foos "Foo"[] NOT NULL DEFAULT '{}';
$connection->execute('SELECT foos FROM "myFoo"');
  • When foos is empty, I get the empty string ('{}') instead of an empty array ([]).
  • When foos is non-empty, I get a string ('{F1}') instead of an array of strings (['F1']).

Cannot set connection to nonblocking mode

Stumbled upon this problem when trying out this library.
The connection is established, fully functional, query gets executed with no problem, after some time it seems I lose the connection:

[2018-06-08 12:09:38] server.debug: Accept ::1:58915 on ::1:1337 #151 [] []
[2018-06-08 12:09:38] server.debug: GET http://localhost:1337/ HTTP/1.1 @ ::1:58914 [] []
[2018-06-08 12:09:38] server.debug: GET http://localhost:1337/favicon.ico HTTP/1.1 @ ::1:58914 [] []
[2018-06-08 12:09:52] server.debug: Close ::1:58915 #151 [] []
[2018-06-08 12:09:53] server.debug: Close ::1:58914 #150 [] []
[2018-06-08 14:40:47] server.debug: Accept ::1:57156 on ::1:1337 #156 [] []
[2018-06-08 14:40:47] server.debug: Accept ::1:57157 on ::1:1337 #157 [] []
[2018-06-08 14:40:47] server.debug: GET http://localhost:1337/ HTTP/1.1 @ ::1:57156 [] []
PHP Notice:  pg_connection_busy(): Cannot set connection to nonblocking mode in /amp/vendor/amphp/postgres/lib/PgSqlHandle.php on line 107

Notice: pg_connection_busy(): Cannot set connection to nonblocking mode in /amp/vendor/amphp/postgres/lib/PgSqlHandle.php on line 107
[2018-06-08 14:40:47] server.error: Amp\Postgres\QueryExecutionError: socket not open  in /amp/vendor/amphp/postgres/lib/PgSqlHandle.php:320 Stack trace: #0 [internal function]: Amp\Postgres\PgSqlHandle->Amp\Postgres\{closure}() #1 /amp/vendor/amphp/amp/lib/Coroutine.php(74): Generator->send(Resource id #158) #2 /amp/vendor/amphp/amp/lib/Internal/Placeholder.php(127): Amp\Coroutine->Amp\{closure}(NULL, Resource id #158) #3 /amp/vendor/amphp/amp/lib/Deferred.php(41): class@anonymous->resolve(Resource id #158) #4 /amp/vendor/amphp/postgres/lib/PgSqlHandle.php(111): Amp\Deferred->resolve(Resource id #158) #5 /amp/vendor/amphp/amp/lib/Loop/NativeDriver.php(172): Amp\Postgres\PgSqlHandle::Amp\Postgres\{closure}('o', Resource id #125, NULL) #6 /amp/vendor/amphp/amp/lib/Loop/NativeDriver.php(68): Amp\Loop\NativeDriver->selectStreams(Array, Array, 0.94) #7 /amp/vendor/amphp/amp/lib/Loop/Driver.php(130): Amp\Loop\NativeDriver->dispatch(true) #8 /amp/vendor/amphp/amp/lib/Loop/Driver.php(70): Amp\Loop\Driver->tick() #9 /amp/vendor/amphp/amp/lib/Loop.php(76): Amp\Loop\Driver->run() #10 /amp/run.php(111): Amp\Loop::run(Object(Closure)) #11 {main} [] []
[2018-06-08 14:40:47] server.debug: GET http://localhost:1337/favicon.ico HTTP/1.1 @ ::1:57156 [] []
[2018-06-08 14:41:02] server.debug: Close ::1:57156 #156 [] []
[2018-06-08 14:41:02] server.debug: Close ::1:57157 #157 [] []

as you may see @ 12:09:38 this runs with no problem, the query gets executed, result returned, after being idle, at 14:40:47 (actually this happened to me with ยฑ5minutes in idle as well) there's this error happening.

Should I be "pinging" the database every minute or so to keep the connection alive, or should I reestablish it after some time?

Here's the code (but it actually is a bunch of amp examples clumped together)

Amp\Loop::run(function () {
    $sockets = [
        Socket\listen("0.0.0.0:1337"),
        Socket\listen("[::]:1337"),
    ];

    $logHandler = new StreamHandler(new ResourceOutputStream(\STDOUT));
    $logHandler->setFormatter(new ConsoleFormatter);
    $logger = new Logger('server');
    $logger->pushHandler($logHandler);


    /** @var \Amp\Postgres\Pool $pool */
    $pool = Amp\Postgres\pool("host=xxx user=xxx dbname=xxx password=xxx");

    $router = new Router;

    $router->addRoute('GET', '/', new CallableRequestHandler(function () use ($pool) {
        $statement = yield $pool->prepare("SELECT * FROM users ORDER BY user_id ASC LIMIT 100");

        /** @var \Amp\Postgres\ResultSet $result */
        $result = yield $statement->execute();

        return new Response(Status::OK, [
            "content-type" => "text/plain; charset=utf-8",
        ], new IteratorStream(new Producer(function (callable $emit) use ($result) {
            while (yield $result->advance()) {
                $row = $result->getCurrent();

                yield $emit($row['user_id'].': '.$row['email']."\r\n");
            }
        })));
    }));

Possible allocation and deallocation of the same statement at the same time

Hello! I'm inspired by your great work and ported this client from AmPHP to Swoole.
I'm faced a strange behavior. There is no any synchronization between statement allocation and deallocation, and with high concurrency this may lead to the same statement is being allocated and deallocated at the same time.

I can't reproduce this behavior within tests, but in real workload, eg. ab -n1000 -c100 http://your-endpoint it can be reproduced.

If it will be helpful - I fixed this issue in my port PR.

Incompatibility with ext-event

Ruslan Osmanov has fixed the segfault I was getting when using this with package with ext-event but said that it won't work anyway and that he would need to add pgsql support there - read his comment here.

Can you share some insight if this is really necessary? Does ext-uv also have some custom pgsql support which he could use for inspiration? The native loop driver also works just fine...

The readme of this package says that it's incompatible with ext-ev. Is that true or was it actually meant to say that it's incompatible with ext-event? Or is it with both?

@kelunik @trowski @bwoebi

Make new release?

v0.2.1 is pretty buggy and lot's of changes are in master - can you make a new dev-release (maybe v0.3)?

Also: Are there any plans to add an interface behind amphp/postgres & amphp/mysql, so I can chose to be compatible to both?

Transactions won't work if \pg_get_result returns multiple results

Code to reproduce:

use Amp\Postgres;

Amp\Loop::run(function () {
    $config = Postgres\ConnectionConfig::fromString('host=localhost user=postgres');

    $pool = Postgres\pool($config);

    yield $pool->query('DROP TABLE IF EXISTS test');
    yield $pool->query('DROP TABLE IF EXISTS test2');

    /** @var \Amp\Postgres\Transaction $transaction */
    $transaction = yield $pool->beginTransaction();

    yield $transaction->query('
        CREATE TABLE test (domain VARCHAR(63), tld VARCHAR(63), PRIMARY KEY (domain, tld));
        CREATE TABLE test2 (domain VARCHAR(63), tld VARCHAR(63), PRIMARY KEY (domain, tld));
    ');

    yield $transaction->commit();
});

It fails with reason: Amp\Sql\FailureException: another command is already in progress

It happens because async version of \pg_send_query() can be accept sql queries with ; delimiter:
https://www.php.net/manual/en/function.pg-send-query.php
https://www.php.net/manual/en/function.pg-get-result.php

Unlike pg_query(), it can send multiple queries at once to PostgreSQL and get the results one by one using pg_get_result().

ะขะพ fix this bug library should be to call \pg_get_result until it returns false.
https://github.com/amphp/postgres/blob/master/src/PgSqlHandle.php#L113

[feature request]: last insert oid

The pg driver offers pg_last_oid function that can be used to retrieve the last oid from a result, however, amphp doesn't offer a high level access to this function ( similar to the getLastInsertId in amphp/mysql ), or access to the underlying result handle.

I think adding getLastInsertId would be a good start, while offering access to the underlying result handle ( or possible all types of handles, e.g: pg handle, socket connection, ... etc ) might be dangerous, it's a good feature to have that would allow people that know what they are doing to play around.

Making multiple queries repeatedly, cleaner code

I'm trying to make my code cleaner, i have tried adding functions and calling them when i need to update or insert values, the problem is this will not work because yield needs to be inside a the first function and loop.

Any ideas on how to accomplish something similar but working

working.php

// Get all emails pending
Amp\Loop::run(function () {

    $date = date('Y-m-d H:i:s');
    //echo "Date: $date";
    $conn = "host=" . DB_HOST . " user=" . DB_USER . " password=" . DB_PASS . " dbname=" . DB_DATA;
    $config = ConnectionConfig::fromString($conn);
    $pool = Postgres\pool($config);

    // Get all mails not sent yet
    $statement = yield $pool->prepare("SELECT * FROM tc_mail WHERE status = 0 and \"dateSent\" is null;");
    $result = yield $statement->execute();

    while (yield $result->advance()) {

        // Get id and mail info
        $row = $result->getCurrent();
        $id = $row["id"];
        $mailSubject = $row["mailSubject"];
        $mailType = $row["mailType"];
        $body = $row["body"];
        $dateCreated = $row["dateCreated"];
        $extras = $row["extras"];
        $mailAddress = $row["mailAddress"];
        
        $mailAddress = explode(",", $mailAddress);

        // convert all emails to array
        $mailArray = array();
        foreach ($mailAddress as $address) {
            array_push($mailArray, new To($address));
        }

        // Send content to HTML template
        $file = __DIR__ . '/templates/general.php';
        $emailContent = array('content' => $body);
        $body = '';
        $body .= template($file, $emailContent);

        // Create new Email
        $email = new \SendGrid\Mail\Mail();
        $email->setFrom("[email protected]", "TEST");
        $email->setSubject($mailSubject);
        $email->addTos($mailArray);
        $email->addContent(
            "text/html", $body
        );

        $sendgrid = new \SendGrid(SENDGRID_API_KEY);

        // Check for email Type
        if (isset($extras) && !empty($extras) && $mailType = "alert") {

            // Check if deviceid has emails from 30 minutes ago
            $statement = yield $pool->prepare("SELECT * from tc_mail WHERE extras = ? and status = 1 and \"dateSent\" > current_timestamp - interval '30 minutes' limit 1");
            $result2 = yield $statement->execute([$extras]);

            if (!yield $result2->advance()) {
                // Send email for alert
                $response = $sendgrid->send($email);

                if ($response->statusCode() == 202) {
                    // if correct update status=1
                    $statement = yield $pool->prepare('UPDATE "public"."tc_mail" SET "dateSent" = ?, "status" = ? WHERE "id" = ?');
                    yield $statement->execute([$date, 1, $id]);
                } else {

                    // if error update status=3
                    $statement = yield $pool->prepare('UPDATE "public"."tc_mail" SET "dateSent" = ?, "status" = ? WHERE "id" = ?');
                    yield $statement->execute([$date, 3, $id]);
                }

            } else {
                // Set status = 2 and dont send email
                $statement = yield $pool->prepare('UPDATE "public"."tc_mail" SET "dateSent" = ?, "status" = ? WHERE "id" = ?');
                yield $statement->execute([$date, 2, $id]);

            }
        } else {

            // Just send the email for other notifications
            // Send email for alert
            $response = $sendgrid->send($email);

            if ($response->statusCode() == 202) {
                // if correct update status=1
                $statement = yield $pool->prepare('UPDATE "public"."tc_mail" SET "dateSent" = ?, "status" = ? WHERE "id" = ?');
                yield $statement->execute([$date, 1, $id]);
            } else {

                // if error update status=3
                $statement = yield $pool->prepare('UPDATE "public"."tc_mail" SET "dateSent" = ?, "status" = ? WHERE "id" = ?');
                yield $statement->execute([$date, 3, $id]);
            }
        }
    }

    if (isset($pool)) {
        $pool->close();
    }

});

function template($file, $args)
{
    // ensure the file exists
    if (!file_exists($file)) {
        return "File $file does not exists.";
    }

    // Make values in the associative array easier to access by extracting them
    if (is_array($args)) {
        extract($args);
    }

    // buffer the output (including the file is "output")
    ob_start();
    include $file;
    return ob_get_clean();
}

cleaner_not_working.php

use Amp\Postgres;
use Amp\Postgres\ConnectionConfig;
use SendGrid\Mail\To;

// Get all emails pending

Amp\Loop::run(function () {

    $date = date('Y-m-d H:i:s');
    //echo "Date: $date";
    $conn = "host=" . DB_HOST . " user=" . DB_USER . " password=" . DB_PASS . " dbname=" . DB_DATA;
    $config = ConnectionConfig::fromString($conn);
    $pool = Postgres\pool($config);

    // Get all mails not sent yet
    $statement = yield $pool->prepare("SELECT * FROM tc_mail WHERE status = 0 and \"dateSent\" is null;");
    $result = yield $statement->execute();

    while (yield $result->advance()) {

        // Get id and mail info
        $row = $result->getCurrent();
        $id = $row["id"];
        $mailSubject = $row["mailSubject"];
        $mailType = $row["mailType"];
        $body = $row["body"];
        $dateCreated = $row["dateCreated"];
        $extras = $row["extras"];
        $mailAddress = $row["mailAddress"];
        $mailAddress = explode(",", $mailAddress);

        // convert all emails to array
        $mailArray = array();
        foreach ($mailAddress as $address) {
            array_push($mailArray, new To($address));
        }

        // Send content to HTML template
        $file = __DIR__ . '/templates/general.php';
        $emailContent = array('content' => $body);
        $body = '';
        $body .= template($file, $emailContent);

        // Create new Email
        $email = new \SendGrid\Mail\Mail();
        $email->setFrom("[email protected]", "TEST");
        $email->setSubject($mailSubject);
        $email->addTos($mailArray);
        $email->addContent(
            "text/html", $body
        );

        $sendgrid = new \SendGrid(SENDGRID_API_KEY);

        // Check for email Type
        if (isset($extras) && !empty($extras) && $mailType = "alert") {

            // Check if deviceid has emails from 30 minutes ago
            if (check_previous_email_sent_minutes_ago($pool, $extras)) {

                // Set status = 2 and dont send email
                update_mail($pool, $date, 2, $id);

            } else {

                sendEmail($pool, $date, $id, $sendgrid, $email);
            }
        } else {

            sendEmail($pool, $date, $id, $sendgrid, $email);
        }
    }

    if (isset($pool)) {
        $pool->close();
    }
});

function update_mail($pool, $date, $status, $id)
{
    try {
        $statement = yield $pool->prepare('UPDATE "public"."tc_mail" SET "dateSent" = ?, "status" = ? WHERE "id" = ?');
        yield $statement->execute([$date, $status, $id]);

    } catch (Exception $e) {
        echo "ERROR UPDATING tc_mail: " . $e . "\n";
    }

}

function template($file, $args)
{
    // ensure the file exists
    if (!file_exists($file)) {
        return "File $file does not exists.";
    }

    // Make values in the associative array easier to access by extracting them
    if (is_array($args)) {
        extract($args);
    }

    // buffer the output (including the file is "output")
    ob_start();
    include $file;
    return ob_get_clean();
}

function check_previous_email_sent_minutes_ago($pool, $extras)
{
    //SELECT * FROM tc_mail tm where extras = ? and id not in (?) and dateSent > NOW() - INTERVAL '30 minutes' and status = 1 limit 1;
    $statement = yield $pool->prepare("SELECT * from tc_mail WHERE extras = ? and status = 1 and \"dateSent\" > current_timestamp - interval '30 minutes' limit 1");
    $result = yield $statement->execute([$extras]);

    return yield $result->advance();
}

function sendEmail($pool, $date, $id, $sendgrid, $email)
{
    $response =  $sendgrid->send($email);

    // check email sent correctly
    if ($response->statusCode() == 202) {

        // if correct update status=1
        $this->update_mail($pool, $date, 1, $id);
    } else {

        // if error update status=3
        $this->update_mail($pool, $date, 3, $id);
    }
}

[v2.0.0-beta.5] Transaction suspend on the second request

amphp/sql-common: v2.0.0-beta.7
amphp/postgres: v2.0.0-beta.5
amphp/sql: v2.0.0-beta.6

Example:

use Amp\Postgres;
use Amp\Postgres\PostgresConfig;

require 'vendor/autoload.php';

$config = new PostgresConfig('localhost', 5432, 'user', 'pass', 'dbtest');
$connection = Postgres\connect($config);

$transaction = $connection->beginTransaction();
$result = $transaction->execute('INSERT INTO customers (email,time_zone) VALUES (:email,:time_zone) RETURNING *', [
	'email'   => '[email protected]',
	'time_zone' => 'UTC',
]);
print_r($result->fetchRow());
$result = $transaction->execute('INSERT INTO customers (email) VALUES (:email) RETURNING *', [
	'email'   => '[email protected]',
]); // <---- suspend here
print_r($result->fetchRow());
$transaction->rollback();

STDOUT:

Array
(
    [email] => [email protected]
    [time_zone] => UTC
)

Works correctly if request similar sql queries:

use Amp\Postgres;
use Amp\Postgres\PostgresConfig;

require 'vendor/autoload.php';

$config = new PostgresConfig('localhost', 5432, 'user', 'pass', 'dbtest');
$connection = Postgres\connect($config);

$transaction = $connection->beginTransaction();
$result = $transaction->execute('INSERT INTO customers (email,time_zone) VALUES (:email,:time_zone) RETURNING *', [
	'email'   => '[email protected]',
	'time_zone' => 'UTC',
]);
print_r($result->fetchRow());
$result = $transaction->execute('INSERT INTO customers (email,time_zone) VALUES (:email,:time_zone) RETURNING *', [
	'email'   => '[email protected]',
	'time_zone' => 'UTC',
]);
print_r($result->fetchRow());
$transaction->rollback();

STDOUT:

Array
(
    [email] => [email protected]
    [time_zone] => UTC
)
Array
(
    [email] => [email protected]
    [time_zone] => UTC
)

In v2.0.0-beta.4 works correctly.

Why just "RESET ALL" is used to reset the state of a pooled connection?

Hi all,

While looking at how connection pool is implemented I got surprised about the current implementation uses just RESET ALL [1] statement to do the job. According PostgreSQL documentation [2] it will reset just run-time parameters.

Sorry about my very newbie question about the project. I just got curious about how advanced things we can do with PHP nowadays (used PHP for long time in the past) and nowadays I help PostgreSQL development.

So if the real intention here is to restore connection to it initial state in PostgreSQL it should use DISCARD ALL [3] instead, because can exist other temp resource we can left behind without doing it that can lead us to leak resources, for example:

  • cached plans and sequences
  • prepared statements
  • temporary tables
  • advisory locks

Another example is pgbouncer [1] the most known connection pool for PostgreSQL that uses DISCARD ALL as the default of server_reset_query.

[1] https://github.com/amphp/postgres/blob/master/src/Pool.php#L82
[2] https://www.postgresql.org/docs/current/sql-reset.html
[3] https://www.postgresql.org/docs/current/sql-discard.html
[4] http://www.pgbouncer.org/config.html#connection-sanity-checks-timeouts

[2.0.0 Beta 4] ErrorException: pg_connection_busy(): Cannot set connection to nonblocking mode

#0 [internal function]: PC\Worker\AbstractWorker->errorHandler()
#1 vendor/amphp/postgres/src/Internal/PgSqlHandle.php(106): pg_connection_busy()
#2 vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(571): Amp\Postgres\Internal\PgSqlHandle::Amp\Postgres\Internal\{closure}()
#3 [internal function]: Revolt\EventLoop\Internal\AbstractDriver->Revolt\EventLoop\Internal\{closure}()
#4 vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(478): Fiber->resume()
#5 vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(533): Revolt\EventLoop\Internal\AbstractDriver->invokeCallbacks()
#6 [internal function]: Revolt\EventLoop\Internal\AbstractDriver->Revolt\EventLoop\Internal\{closure}()
#7 vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(105): Fiber->resume()
#8 vendor/revolt/event-loop/src/EventLoop/Driver/EventDriver.php(119): Revolt\EventLoop\Internal\AbstractDriver->run()
#9 vendor/workerman/workerman/src/Events/Revolt.php(88): Revolt\EventLoop\Driver\EventDriver->run()

The connection to the database has been closed (pgsql)

pg_get_result

I'm getting the following exception:

pg_send_query(): There are results on this connection. Call pg_get_result() until it returns FALSE

File: lib/PhSqlHandle.php
Line: 216

Pool hangs for unknown reason

I have a method like this (simplified here):

    protected function doRead(): Generator
    {
        try {
            $this->logger->debug('do request events from ' . $this->fromSequenceNumber); // stops after the log entry

            $sql = <<<SQL
SELECT
    COALESCE(e1.event_id, e2.event_id) as event_id,
    e1.event_number as event_number,
    COALESCE(e1.event_type, e2.event_type) as event_type,
    COALESCE(e1.data, e2.data) as data,
    COALESCE(e1.meta_data, e2.meta_data) as meta_data,
    COALESCE(e1.is_json, e2.is_json) as is_json,
    COALESCE(e1.updated, e2.updated) as updated
FROM
    events e1
LEFT JOIN events e2
    ON (e1.link_to = e2.event_id)
WHERE e1.stream_id = ?
AND e1.event_number >= ?
ORDER BY e1.event_number ASC
LIMIT ?
SQL;

            /** @var Statement $statement */
            $statement = yield $this->pool->prepare($sql);
            $this->logger->debug('executing fetch query'); // never gets logged
            /** @var ResultSet $result */
            $result = yield $statement->execute([$this->streamId, $this->fromSequenceNumber, self::MaxReads]);

            $readEvents = 0;

            while (yield $result->advance(ResultSet::FETCH_OBJECT)) {
                $this->logger->debug('found event, enqueue');
                $row = $result->getCurrent();
                $this->logger->debug(json_encode($row));
                ++$readEvents;
                $this->fromSequenceNumber = $row->event_number + 1;
            }

            if (0 === $readEvents && $this->stopOnEof) {
                $this->logger->debug('stopped');
                $this->stopped = true;
            }
        } catch (\Throwable $e) {
            $this->logger->error($e->getMessage());
            $this->logger->error($e->getTraceAsString());
        }
    }

When I first call this method, it works, on a second call, it stops at [2018-05-19 19:08:00] PROJECTOR-testlinkto.DEBUG: do request events from 401 [] [] (see comments in code above). There is no error logged, so try-catch didn't do any difference. I debugged the connection pool, I have 100 max connections and 2 are used and both idle, so that's also not the problem. I can't tell what's going on at the moment.

Any help is greatly appreciated.

Question mark operators interpreted as unnamed parameters

In the following query, the @? JSONB operator will be erroneously interpreted by this library as an unnamed parameter.

SELECT *
FROM foo
WHERE bar::jsonb @? '$[*].baz' 

This library treats the ? component of the @? operator as though it were an unnamed parameter by making this erroneous transformation:

SELECT *
FROM foo
WHERE bar::jsonb @$1 '$[*].baz' 

Note that even though I want to use @?, there are also ?|, ?& and even bareword ? JSONB operators. Therefore it may not be sufficient to solve this problem with regex. Perhaps the library should be smarter about doing question mark substitution such that if the statement is not executed with any unnamed parameters, unnamed substitution does not occur. That is, if one wants to use JSONB operators and parameter substitution, they should use named substitution only.

Note further this does not just apply to JSONB. Geometric functions also include a whole other class of operators, many of which contain the question mark. Moreover, extensions are free to implement their own operators that may include question marks too; the same applies to custom operators.

Solutions

  1. Improve the regex to exclude most operators
  2. Implicitly ignore question mark placeholders when no numeric keys specified
  3. Explicitly ignore question mark placeholders
  4. Add an escape sequence for question marks

Improve the regex to exclude most operators

Considering operators may include the following characters, + - * / < > = ~ ! @ # % ^ & | ` ?, the regex can be extended to exclude question marks surrounded by these other characters. This does not interfere with normal usage, such as INSERT INTO foo VALUES (?, ?), where question marks are typically only surrounded by braces, commas and whitespace, which are not valid operator characters. However, this solution would never be able to handle the bare question mark case (?).

Implicitly ignore question mark placeholders when no numeric keys specified

If we assume when that when no parameters are specified with numeric keys, we can safely disable question mark substitution, this could be an elegant and implicit way to avoid the problem in some cases.

Explicitly ignore question mark placeholders

Similar to above, we could add an explicit option to disable question mark placeholders. This is the approach taken by some other client libraries such as DBD::Pg.

Add an escape sequence for question marks

Allowing question marks to be escaped makes it clear they are not to be used as placeholders.

WHERE bar::jsonb @\? '$[*].baz' 

This approach also supports the bare question mark case. However, the a weakness to this approach is it makes the query less portable (e.g. pasting into another program to test).

A combination of these solutions may also be feasible. For example, improving the regex and also implementing the escape just for the bare question mark case.

rfc: better transaction disposing mechanism

while working on #50, i noticed a weird behavior within the library.

a query cannot be executed ( on the same connection ), unless all handles related to a specific transaction are released, however, the library doesn't offer a way to force release all handles.

Let's take the following example:

/**
 * @template T
 * @param Closure(Executor): T $operation
 * @return T
 */
function transactional(Link $link, Closure $operation): mixed {
  $trans = $link->beginTransaction();
  try {
    $result = $operation($trans);
    $trans->commit();
    return $result;
  } catch(Throwable $e) {
     $trans->rollback();
     throw $e;
  }
}

$link = ...;
$users = transactional($link, static fn($executor) => $executor->query('SELECT * FROM users'));

so far so good, we have a function that given a database link, and an operation, will execute the operation within a transaction, and return it's results.

however, let's take this a step further, simulating a more real-world usage of transactional function.

$users = transactional(
  $link,
  static fn($executor) => $executor->execute('SELECT * FROM forum_users WHERE role = ?', [User::ROLE_ADMIN]),
);

foreach($users as $user) {
  if (admin_has_been_active($user)) {
    continue;
  }

  if (!admin_warned_about_demotion($user)) {
    warn_admin_about_demotion($user);
    continue;
  }

  if (!admin_warned_about_demotion_over_a_week_ago($user)) {
    // hasn't been a week
    continue;
  }
  
  $link->execute('UPDATE users SET forum_users = ? WHERE id = ?', [User::ROLE_SUPER, $user['id']]);
}

here, we get the list of our forum admins, iterator over them, and if one of them admin has not been active, we warn them that their account is going demoted next week, if the have been warned over a week ago, we demote their account to super users.

However, this currently fails.

Since $users is a result set, this result set still has a reference to a handle that is associated with the transaction used to retrieve the users, while arguably the initial query shouldn't have been made within a transaction, remember that this is just an example.

What i suggest is the following:

if a transaction has been rolled-back, or committed, all handles should be force released, this can be done in one of the following ways:

  1. force release and mark objects with a reference to any handle as invalid ( e.g iterating over users will fail ).
  2. do all the work immediately ( e.g: $users needs a reference to the handle in order to fetch users while iterating, however, when a commit() or a rollback() is executed, we can force the result to fetch all users immediately, making it drop the reference to the handle, and still be valid.

I personally prefer the second solution, while it might be bad for memory when dealing with large queries, it is the most sane behavior in my opinion, as if the user is holding a reference to that object, they are probably still going to use it, if not, they can manually call dispose() on the result.

Cannot set connection to nonblocking mode

I see a same error on v1.3.3. I set a pool 10 connection and receive error after 1-2 minutes:

pg_connection_busy(): Cannot set connection to nonblocking mode, /project/vendor/amphp/postgres/src/PgSqlHandle.php:109

When I used pool 5, I did not observe such a problem.

Similar issue: #13

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.