Git Product home page Git Product logo

Comments (8)

will avatar will commented on May 22, 2024 1

I went with the short term solution, it is in v0.11.0

from crystal-pg.

will avatar will commented on May 22, 2024

You can set the on_notify callback on the connection to process listen notify events. While it is async, there has to be some network traffic going for the events to make it in, such as just an empty select

require "./src/pg"

DB = PG.connect("postgres:///")

DB.on_notification do |note|
  p note
end

DB.exec "listen a"
DB.exec "notify a, 'hello'"

loop do
  sleep 1
  DB.exec "select"
end

# output
# PQ::Notification(@pid=54318, @channel="a", @payload="hello")

from crystal-pg.

Svenskunganka avatar Svenskunganka commented on May 22, 2024

I missed that completely, thank you for clarifying!

I have a couple of follow-up questions; You said that network traffic has to be active for the events to come in. Is there any particular reason to this? In your example, are notifications received at most every second or are they "instant", even though the empty select statement is only executed once per second?

Thanks!

from crystal-pg.

will avatar will commented on May 22, 2024

Right now the driver does not read from the socket unless it is expecting a response from the server, and if it happens to be an unexpected notice, error, or parameter message it process them before going back to what it expects.

It might be possible to make it instant if there is a way to know when the socket has data, or near instant by spawning something that polls the socket rather than making a complete query, but that would need to make sure not to read a byte in the middle of a query/response flow and get everything out of sync.

This driver is the only serious thing I've done with networking, so I'm not yet sure of the correct approach.

from crystal-pg.

jhass avatar jhass commented on May 22, 2024

I guess the protocol specifies some kind of message/packet format, so that you could continuously read those from a socket? Then you could try to spawn a fiber that continuously reads and distributes to various various channels:

def poll
  spawn do
    loop do
      data = @socket.read(buffer) # read length header and right length instead
      message = Message.new(data)
      if message.kind.notification?
        @notifications.send message
      else
        @query_results.send message
      end
    end
  end
end

def query
  @socket.write data
  message = @query_results.receive
  # Perhaps data has some tag we can check here to see whether the message is for us
  # and if not return it to the channel
  @query_results.send message unless message.tag == tag

  case message.kind
  when &.notice?
  when &.error?
  when &.result?
  end
end

def notifications_poll
  spawn do
    loop do
      notification = @notifications.receive
      @handlers.each &.call(notification)
    end
  end
end

clear downside is that it makes IO streaming almost impossible, except when using an unbuffered channel and only using it to signal IO ownership (when done the receiver would send the IO back through another channel).

from crystal-pg.

will avatar will commented on May 22, 2024

Thanks for the suggestion, but I'm concerned with the overhead of putting all the data through channels.

I'm thinking of using something like https://github.com/will/crystal-pg/pull/31/files?diff=unified to wait on data becoming available. I think LibC.select would be more appropriate here, except that it currently blocks the entire thread.

The problem though, when we wake up from a read event, we can't be sure the buffer was empty from the last query. So after the query is over but before releasing the lock, we need to check if the buffer is empty. Something to peek the buffer would be useful.

# IO::Buffered
  def peek_byte : UInt8?
    check_open

    fill_buffer if @in_buffer_rem.empty?
    if @in_buffer_rem.empty?
      nil
    else
      @in_buffer_rem[0]
    end
  end

at the end of a query, the only thing that could be in the buffer would be one of the 3 unsolicited frames. so keep processing them until the buffer is empty, then release the lock.

With that in place, we know then any time the coroutine is woken due from a read event is that the buffer is empty and it will be the start of a frame, and the frame is an unsolicited notice/etc frame.

It might be possible to be woken up during a query. Simply trying to grab the connection's mutex though could cause a deadlock, so try and grab it a new attempt_lock method that returns false if it can't get the lock rather than wait for the lock. If we can't grab, just ignore and wait until the next read event.

from crystal-pg.

jhass avatar jhass commented on May 22, 2024

select(2) is also quite hard to use on OpenSSL sockets.

from crystal-pg.

will avatar will commented on May 22, 2024

Good point.

I think for the short term, the best tradeoff would be to open a separate connection that can't make queries, and is only used for listening in a loop. The downside is burning a second connection, but the huge upside is none of these changes has to happen. Later on, it can be optimized to fold both into the same connection.

from crystal-pg.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.