Git Product home page Git Product logo

extreme's Introduction

Extreme

Build Status Hex version InchCI Coverage Status

Erlang/Elixir TCP client for Event Store.

This version is tested with EventStore 3.9.3 - 4.1.1, Elixir 1.5, 1.6, 1.7 and Erlang/OTP 19.3, 20.3 and 21.0

INSTALL

Add Extreme as a dependency in your mix.exs file.

def deps do
  [{:extreme, "~> 0.13.2"}]
end

In order to deploy your app using exrm you should also update your application list to include :extreme:

def application do
  [applications: [:extreme]]
end

Extreme includes all its dependencies so you don't have to name them separately.

After you are done, run mix deps.get in your shell to fetch and compile Extreme and its dependencies.

EventStore v4 and later note

Starting from EventStore version 4.0 there are some upgrades to communication protocol. Event number size is changed to 64bits and there is new messages IdentifyClient and ClientIdentified. Since we would like to keep backward compatibility with older v3 protocol, we introduced new configuration for :extreme application, where you have to set :protocol_version equal to 4 if you want to use new protocol, default is 3. Below is exact line you have to add in you application config file in order to activate new protocol:

config :extreme, :protocol_version, 4

USAGE

The best way to understand how adapter should be used is by investigating test/extreme_test.exs file, but we'll try to explain some details in here as well.

Extreme is implemented using GenServer and is OTP compatible. If client is disconnected from server we are not trying to reconnect, instead you should rely on your supervisor. For example:

defmodule MyApp.Supervisor do
  use Supervisor

  def start_link do
    Supervisor.start_link __MODULE__, :ok
  end

  @event_store MyApp.EventStore

  def init(:ok) do
    event_store_settings = Application.get_env :my_app, :event_store

    children = [
      worker(Extreme, [event_store_settings, [name: @event_store]]),
      # ... other workers / supervisors
    ]
    supervise children, strategy: :one_for_one
  end
end

You can manually start adapter as well (as you can see in test file):

{:ok, server} = Application.get_env(:extreme, :event_store) |> Extreme.start_link

From now on, server pid is used for further communication. Since we are relying on supervisor to reconnect, it is wise to name server as we did in example above.

MODES

Extreme can connect to single ES node or to cluster specified with node IPs and ports.

Example for connecting to single node:

config :extreme, :event_store,
  db_type: :node,
  host: "localhost",
  port: 1113,
  username: "admin",
  password: "changeit",
  reconnect_delay: 2_000,
  connection_name: :my_app,
  max_attempts: :infinity
  • db_type - defaults to :node, thus it can be omitted
  • host - check EXT IP setting of your EventStore
  • port - check EXT TCP PORT setting of your EventStore
  • reconnect_delay - in ms. Defaults to 1_000. If tcp connection fails this is how long it will wait for reconnection.
  • connection_name - Optional param introduced in EventStore 4. Connection can be identified by this name on ES UI
  • max_attempts - Defaults to :infinity. Specifies how many times we'll try to connect to EventStore

Example for connecting to cluster:

config :extreme, :event_store,
  db_type: :cluster,
  gossip_timeout: 300,
  mode: :read,
  nodes: [
    %{host: "10.10.10.29", port: 2113},
    %{host: "10.10.10.28", port: 2113},
    %{host: "10.10.10.30", port: 2113}
  ],
  connection_name: :my_app,
  username: "admin",
  password: "changeit"
  • gossip_timeout - in ms. Defaults to 1_000. We are iterating through nodes list, asking for cluster member details. This setting represents timeout for gossip response before we are asking next node from nodes list for cluster details.
  • nodes - Mandatory for cluster connection. Represents list of nodes in the cluster as we know it
    • host - should be EXT IP setting of your EventStore node
    • port - should be EXT HTTP PORT setting of your EventStore node
  • mode - Defaults to :write where Master node is prefered over Slave, otherwise prefer Slave over Master

Example of connection to cluster via DNS lookup

config :extreme, :event_store,
 db_type: :cluster_dns,
 gossip_timeout: 300,
 host: "es-cluster.example.com", # accepts char list too, this whould be multy A record host enrty in your nameserver
 port: 2113, # the external gossip port
 connection_name: :my_app,
 username: "admin",
 password: "changeit",
 mode: :write,
 max_attempts: :infinity

When cluster mode is used, adapter goes thru nodes list and tries to gossip with node one after another until it gets response about nodes. Based on nodes information from that response it ranks their statuses and chooses the best candidate to connect to. For :write mode (default) Master node is prefered over Slave, but for :read mode it is opposite. For the way ranking is done, take a look at lib/cluster_connection.ex:

defp rank_state("Master", :write),    do: 1
defp rank_state("Master", _),         do: 2
defp rank_state("PreMaster", :write), do: 2
defp rank_state("PreMaster", _),      do: 3
defp rank_state("Slave", :write),     do: 3
defp rank_state("Slave", _),          do: 1

Note that above will work with same procedure with cluster_dns mode turned on, since internally it will get ip addresses to which the same connection procedure will be used.

Once client is disconnected from EventStore, supervisor should respawn it and connection starts over again.

Communication

EventStore uses ProtoBuf for taking requests and sending responses back. We are using exprotobuf to deal with them. List and specification of supported protobuf messages can be found in include/event_store.proto file.

Instead of wrapping each and every request in elixir function, we are using execute/2 function that takes server pid and request message:

{:ok, response} = Extreme.execute server, write_events()

where write_events can be helper function like:

alias Extreme.Msg, as: ExMsg

defp write_events(stream \\ "people", events \\ [%PersonCreated{name: "Pera Peric"}, %PersonChangedName{name: "Zika"}]) do
  proto_events = Enum.map(events, fn event ->
    ExMsg.NewEvent.new(
      event_id: Extreme.Tools.gen_uuid(),
      event_type: to_string(event.__struct__),
      data_content_type: 0,
      metadata_content_type: 0,
      data: :erlang.term_to_binary(event),
      metadata: ""
    ) end)
  ExMsg.WriteEvents.new(
    event_stream_id: stream,
    expected_version: -2,
    events: proto_events,
    require_master: false
  )
end

This way you can fine tune your requests, i.e. choose your serialization. We are using erlang serialization in this case data: :erlang.term_to_binary(event), but you can do whatever suites you. For more information about protobuf messages EventStore uses, take a look at their documentation or for common use cases you can check test/extreme_test.exs file.

Subscriptions

Extreme.subscribe_to/3 function is used to get notified on new events on particular stream. This way subscriber, in next example self, will get message {:on_event, push_message} when new event is added to stream people.

def subscribe(server, stream \\ "people"), do: Extreme.subscribe_to(server, self, stream)

def handle_info({:on_event, event}, state) do
  Logger.debug "New event added to stream 'people': #{inspect event}"
  {:noreply, state}
end

Extreme.read_and_stay_subscribed/7 reads all events that follow a specified event number, and subscribes to future events.

defmodule MyApp.StreamSubscriber
  use GenServer

  def start_link(extreme, last_processed_event), do: GenServer.start_link __MODULE__, {extreme, last_processed_event}

  def init({extreme, last_processed_event}) do
    stream = "people"
    state = %{ event_store: extreme, stream: stream, last_event: last_processed_event }
    GenServer.cast self, :subscribe
    {:ok, state}
  end

  def handle_cast(:subscribe, state) do
    # read only unprocessed events and stay subscribed
    {:ok, subscription} = Extreme.read_and_stay_subscribed state.event_store, self, state.stream, state.last_event + 1
    # we want to monitor when subscription is crashed so we can resubscribe
    ref = Process.monitor subscription
    {:noreply, %{state|subscription_ref: ref}}
  end

  def handle_info({:DOWN, ref, :process, _pid, _reason}, %{subscription_ref: ref} = state) do
    GenServer.cast self, :subscribe
    {:noreply, state}
  end
  def handle_info({:on_event, push}, state) do
    push.event.data
    |> :erlang.binary_to_term
    |> process_event
    event_number = push.link.event_number
    :ok = update_last_event state.stream, event_number
    {:noreply, %{state|last_event: event_number}}
  end
  def handle_info(:caught_up, state) do
    Logger.debug "We are up to date!"
    {:noreply, state}
  end
  def handle_info(_msg, state), do: {:noreply, state}

  defp process_event(event), do: IO.puts("Do something with #{inspect event}")
  defp update_last_event(_stream, _event_number), do: IO.puts("Persist last processed event_number for stream")
end

This way unprocessed events will be sent by Extreme, using {:on_event, push} message. After all persisted messages are sent, :caught_up message is sent and then new messages will be sent the same way as they arrive to stream.

If you subscribe to non existing stream you'll receive message {:extreme, severity, problem, stream} where severity can be either :error (for subscription on hard deleted stream) or :warn (for subscription on non existing or soft deleted stream). Problem is explanation of problem (i.e. :stream_hard_deleted). So in your receiver you can either have catch all handle_info(_message, _state) or you can handle such message:

def handle_info({:extreme, _, problem, stream}=message, state) do
  Logger.warn "Stream #{stream} issue: #{to_string problem}"
  {:noreply, state}
end

Extreme.Listener

Since it is common on read side of system to read events and denormalize them, there is Extreme.Listener macro that hides noise from listener:

defmodule MyApp.MyListener do
  use Extreme.Listener
  import MyApp.MyProcessor

  # returns last processed event by MyListener on stream_name, -1 if none has been processed so far
  defp get_last_event(stream_name), do: DB.get_last_event MyListener, stream_name

  defp process_push(push, stream_name) do
    #for indexed stream we need to follow push.link.event_number, otherwise push.event.event_number
    event_number = push.link.event_number
    DB.in_transaction fn ->
      Logger.info "Do some processing of event #{inspect push.event.event_type}"
      :ok = push.event.data
             |> :erlang.binary_to_term
             |> process_event(push.event.event_type)
      DB.ack_event(MyListener, stream_name, event_number)  
    end
    {:ok, event_number}
  end

  # This override is optional
  defp caught_up, do: Logger.debug("We are up to date. YEEEY!!!")
end

defmodule MyApp.MyProcessor do
  def process_event(data, "Elixir.MyApp.Events.PersonCreated") do
    Logger.debug "Doing something with #{inspect data}"
    :ok
  end
  def process_event(_, _), do: :ok # Just acknowledge events we are not interested in
end

Listener can be started manually but it is most common to place it in supervisor AFTER specifing Extreme:

defmodule MyApp.Supervisor do
  use Supervisor

  def start_link, do: Supervisor.start_link __MODULE__, :ok

  @event_store MyApp.EventStore

  def init(:ok) do
    event_store_settings = Application.get_env :my_app, :event_store

    children = [
      worker(Extreme, [event_store_settings, [name: @event_store]]),
      worker(MyApp.MyListener, [@event_store, "my_indexed_stream", [name: MyListener]]),
      # ... other workers / supervisors
    ]
    supervise children, strategy: :one_for_one
  end
end

Subscription can be paused:

{:ok, last_event_number} = MyApp.MyListener.pause MyListener

and resumed

:ok = MyApp.MyListener.resume MyListener

Extreme.FanoutListener

It's not uncommon situation to listen live events and propagate them (for example on web sockets). For that situation there is Extreme.FanoutListener macro that hides noise from listener:

defmodule MyApp.MyFanoutListener do
  use Extreme.FanoutListener
  import MyApp.MyPusher

  defp process_push(push) do
    Logger.info "Forward to web socket event #{inspect push.event.event_type}"
    :ok = push.event.data
           |> :erlang.binary_to_term
           |> process_event(push.event.event_type)
  end
end

defmodule MyApp.MyPusher do
  def process_event(data, "Elixir.MyApp.Events.PersonCreated") do
    Logger.debug "Transform and push event with data: #{inspect data}"
    :ok
  end
  def process_event(_, _), do: :ok # Just acknowledge events we are not interested in
end

Listener can be started manually but it is most common to place it in supervisor AFTER specifing Extreme:

defmodule MyApp.Supervisor do
  use Supervisor

  def start_link, do: Supervisor.start_link __MODULE__, :ok

  @event_store MyApp.EventStore

  def init(:ok) do
    event_store_settings = Application.get_env :my_app, :event_store

    children = [
      worker(Extreme, [event_store_settings, [name: @event_store]]),
      worker(MyApp.MyFanoutListener, [@event_store, "my_indexed_stream", [name: MyFanoutListener]]),
      # ... other workers / supervisors
    ]
    supervise children, strategy: :one_for_one
  end
end

Persistent subscriptions

The Event Store provides an alternate event subscription model, from version 3.2.0, known as competing consumers. Instead of the client holding the state of the subscription, the server remembers it.

Create a persistent subscription

The first step in using persistent subscriptions is to create a new subscription. This can be done using the Event Store admin website or in your application code, as shown below. You must provide a unique subscription group name and the stream to receive events from.

alias Extreme.Msg, as: ExMsg

{:ok, _} = Extreme.execute(server, ExMsg.CreatePersistentSubscription.new(
  subscription_group_name: "person-subscription",
  event_stream_id: "people",
  resolve_link_tos: false,
  start_from: 0,
  message_timeout_milliseconds: 10_000,
  record_statistics: false,
  live_buffer_size: 500,
  read_batch_size: 20,
  buffer_size: 500,
  max_retry_count: 10,
  prefer_round_robin: true,
  checkpoint_after_time: 1_000,
  checkpoint_max_count: 500,
  checkpoint_min_count: 1,
  subscriber_max_count: 1
))

Connect to a persistent subscription

Extreme.connect_to_persistent_subscription/5 function is used subscribe to an existing persistent subscription. The subscriber, in this example self, will receive message {:on_event, push_message} when each new event is added to stream people.

{:ok, subscription} = Extreme.connect_to_persistent_subscription(server, self(), group, stream, buffer_size)

Receive & acknowledge events

You must acknowledge receipt, and successful processing, of each received event. The Event Store will remember the last acknowledged event. The subscription will resume from this position should the subscriber process terminate and reconnect. This simplifies the client logic - the code you must write.

Extreme.PersistentSubscription.ack/3 function is used to acknowledge receipt of an event.

receive do
  {:on_event, event, correlation_id} ->
    Logger.debug "New event added to stream 'people': #{inspect event}"
    :ok = Extreme.PersistentSubscription.ack(subscription, event, correlation_id)
end

You must track the subscription PID returned from the Extreme.connect_to_persistent_subscription/5 function as part of the process state when using a GenServer subscriber.

def handle_info({:on_event, event, correlation_id}, %{subscription: subscription} = state) do
  Logger.debug "New event added to stream 'people': #{inspect event}"
  :ok = Extreme.PersistentSubscription.ack(subscription, event, correlation_id)
  {:noreply, state}
end

Events can also be not acknowledged. They can be not acknowledged with a nack_action of :Park, :Retry, :Skip, or :Stop.

def handle_info({:on_event, event, correlation_id}, %{subscription: subscription} = state) do
  Logger.debug "New event added to stream 'people': #{inspect event}"
  if needs_to_retry do
    :ok = Extreme.PersistentSubscription.nack(subscription, event, correlation_id, :Retry)
  else
    :ok = Extreme.PersistentSubscription.ack(subscription, event, correlation_id)
  end
  {:noreply, state}
end

Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request

Licensed under The MIT License.

extreme's People

Contributors

azuken avatar bbhoss avatar burmajam avatar c-rack avatar crowdhailer avatar fahchen avatar henry-hz avatar mindreframer avatar mjaric avatar pgermishuys avatar slashdotdash avatar svrdlans avatar timbuchwaldt avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

extreme's Issues

Implement backpresue, acking received events

There is an important PR in commanded, by sharksdontfly that implements extreme support in commanded framework. Let me suggest to add into extreme a feature that apparently exists in eventstore-postgres.

from: https://github.com/slashdotdash/eventstore#ack-received-events
"Ack received events Receipt of each event by the subscriber must be acknowledged. This allows the subscription to resume on failure without missing an event.

The subscriber receives an {:events, events, subscription} tuple containing the published events and the subscription to send the ack to. This is achieved by sending an {:ack, last_seen_event_id} tuple to the subscription process. A subscriber can confirm receipt of each event in a batch by sending multiple acks, one per event. Or just confirm receipt of the last event in the batch.

A subscriber will not receive further published events until it has confirmed receipt of all received events. This provides back pressure to the subscription to prevent the subscriber from being overwhelmed with messages if it cannot keep up. The subscription will buffer events until the subscriber is ready to receive, or an overflow occurs. At which point it will move into a catch-up mode and query events and replay them from storage until caught up."

Release deadlock fix

Are you planning to do another hex release any time soon to include the deadlock fix?
Looks like the most recent hex release, 0.10.3, was just a few days before the #47 PR was merged.

If you're not ready to do that release yet, or you would like to vet that fix in other environments, I'd be happy to take the git dependency instead and release that into the environment where we first encountered the deadlock.

Thanks.

read-only client modules

Similar to the Ecto option for read-only repositories, it would be useful to be able to configure an extreme client module as read-only.

I'm imagining that this parameter gets passed in to the config the same way one passes in db_type or hostname (just a read_only: true that defaults to false)

Read-only extreme client modules should support executing these message types:

  • ReadEvent
  • ReadStreamEvents
  • ReadStreamEventsBackward
  • ReadAllEvents
  • ConnectToPersistentSubscription
  • SubscribeToStream

This could help increase write-safety for read models. We occasionally connect local development read model services to production eventstores for the sake of projecting real prod data, and having some extra guarantees around writing would make that less scary.

Am I missing any key message types? Would this need any special considerations for clustered eventstores (e.g. auto-select read mode instead of master mode)? I'd be happy to PR for this ๐Ÿ‘

Performance?

I'm trying to write to a local EventStore instance on my windows machine, and it seems like it can't even handle 100 items/second. It takes around 100ms for each write.

Am I doing something wrong?

My eventstore is started as follows:

EventStore.ClusterNode.exe --run-projections=all

and my extreme code looks as follows:

def publish_to_es(stream_id, event_id, event_type, event_data) do
		server = Evstore.EventStore

		event = Extreme.Messages.NewEvent.new(
			event_id: event_id,
			event_type: event_type, # event_type,
			data_content_type: 0, 
			metadata_content_type: 0, 
			data: event_data,
			meta: "")

		write_events = Extreme.Messages.WriteEvents.new(
			event_stream_id: stream_id, events: [event], 
			expected_version: -2, require_master: false)

		{:ok, _response} = Extreme.execute server, write_events
	end

Deadlock while doing concurrent reads and writes

Hi @burmajam,

We have a few apps that use Extreme to maintain a subscription to a stream while also writing to other streams. These apps would spontaneously crash from GenServer timeouts or Extreme would stop with reason :tcp_closed.

We isolated it down to what appears to be a deadlock that can occur while doing basic reads and writes concurrently using the same Extreme worker process. I'll post a PR in a moment with the stress test I've been using to reproduce the deadlock. #46

I got this far a few weeks ago and haven't been able to find time to dig any further, so I thought I'd at least make you aware. We've been able to work-around the problem by using separate Extreme workers for the subscription and the writes.

Retrieve a list of all streams

Hello,

I try to get the list of all the streams, I read from the documentation that this is possible but I can not implement it with extreme.

For the moment, I test the following codes that returns every time an error:
{:ok, response} = Extreme.execute server, ExMsg.ReadAllEvents.new(..)

(FunctionClauseError) no function clause matching in Extreme.MessageResolver.encode_cmd/1

and

{:ok, response} = Extreme.execute server, :read_all_events_forward

(UndefinedFunctionError) function :read_all_events_forward.struct/0 is undefined (module :read_all_events_forward is not available)

Does the code currently implement a way to get a list of all the streams? If so, can you provide me with a sample code?

I can already read all events for a stream.

Thank you.

Best regards
Julien.

Metadata not saving

Dears,
I tried to save metadata using binary and json methods, but I can see no sign of metada in the database.

 ExMsg.NewEvent.new(
        event_id: Extreme.Tools.gen_uuid(),
        event_type: to_string(event.__struct__),
        data_content_type: 0,
        metadata_content_type: 0,
        data: :erlang.term_to_binary(event),
        meta: :erlang.term_to_binary(event)
      )

# or

 ExMsg.NewEvent.new(
        event_id: Extreme.Tools.gen_uuid(),
        event_type: to_string(event.__struct__),
        data_content_type: 0,
        metadata_content_type: 0,
        data: :erlang.term_to_binary(event),
        meta: "{some_meta: 33}"
      )

Memory leak with Extreme.Listener

Extreme.Listener is awesome, it really helps cut down on the boilerplate with genservers.

However, when using it with a large stream (millions of events), seeing a memory leak somewhere.

Basically, the genserver will continue to increase in memory before crashing :(.

When using a genserver as outlined [https://github.com/exponentially/extreme#subscriptions](in the README) (brilliant docs BTW) I'm not seeing any memory leaks, memory stays ~300MB-320MB.

Has anybody else seen memory issues when using Extreme.Listener?

CommitTimeout?

When using extreme_system sometimes I get this:

 Error saving events {:error, {:error, :CommitTimeout, %Extreme.Msg.WriteEventsCompleted{commit_position: 0, current_version: -1, first_event_number: -2147483648, last_event_number: -2147483648, message: "Commit phase timeout.", prepare_position: -2147483648, result: :CommitTimeout}}}

Do you know why this is happening?

What is DB?

Sorry for the dumb question, but what is DB in the code examples?
E.g.:
DB.ack_event(MyListener, stream_name, event_number)

Is this part of elixir? I'm still learning it.

tcp_close exception when sending messages?

Hey, guys.
I'm trying to send a bunch of messages at once (like 1000). But I'm getting a weird exception sometimes, namely tcp_closed. I think it's happening when I try to write events. Could it be that writing and subscribing are sharing the same process?

 63043   crasher:
 63044     initial call: myapp_evstore:init/1
 63045     pid: <0.975.0>
 63046     registered_name: myapp_evstore
 63047     exception exit: {tcp_closed,  63048                         {'Elixir.GenServer',call, 
 63049                             [myapp_extreme, 
 63050                              {execute,
 63051     โ”‚   #{'__struct__' => 'Elixir.Extreme.Messages.WriteEvents',
 63052     โ”‚   event_stream_id => <<"agg-1000">>,

I'll keep digging, but maybe you see something I'm doing wrong right away?

Here is how my code for pushing events looks:

      WriteEvents = 'Elixir.Extreme.Messages.WriteEvents':new([
         {event_stream_id, StreamId},
         {events, Events},
         {expected_version, ExpectedVersion},
         {require_master, false}
        ]),
      'Elixir.Extreme':execute(?ES_SERVER, WriteEvents).```

Subscriptions recovering from connection failures

Hi @burmajam !!

I'm in the process of improving how our subscriptions deal with EventStore connection failures. I wanted to check in with you to see if you've dealt with similar issues and make sure I'm not off track in any way.

The problem we've had is that when the connection to the EventStore is briefly lost, the extreme client recovers just fine, but our existing subscriptions no longer receive new events from their streams. The solution I'm pursuing is to modify our subscription process so that it also monitors the Extreme.Connection process. That way when the Connection process crashes, our subscription process can also crash and establish a new subscription after restart.

The disconnect I'm concerned with manifests in the application as a :tcp_closed message from :gen_tcp.

Our subscription process is our own wrapper around the extreme client that was originally patterned after the MyApp.StreamSubscriber example in the readme. This subscription process calls read_and_stay_subscribed and monitors the Extreme.Subscription pid that's returned. If I also monitor the Extreme.Connection process here and :stop when the Connection process goes :DOWN, I get the desired behavior. Our subscription process restarts and we re-subscribe.

Now that I look through the extreme client further, I'm a little concerned that this approach will result in orphaned Extreme.Subscription processes supervised by Extreme.SubscriptionsSupervisor. I'll have to dig into the extreme client further to really understand the implications. The question that just popped up is, would it be better for the extreme client to internally monitor the Extreme.Connection process from the Extreme.Subscription processes or from the Extreme.SubscriptionsSupervisor process. If you have any thoughts along those lines, I'd love to hear them.

Thank you much,
~Tony

escript exception error

I am trying to install extreme with rebar3_elixir_compile plugin (which basically allows for elixir packages to be used in erlang), but when I run rebar3 compile I get this rather strange error:

==> poison
Compiling 4 files (.ex)
Generated poison app
escript: exception error: undefined function rebar:main/1
  in function  escript:run/2 (escript.erl, line 757)
  in call from escript:start/1 (escript.erl, line 277)
  in call from init:start_em/1
  in call from init:do_boot/3
** (Mix) Could not compile dependency :ssl_verify_fun, "/home/drozdyuka/.mix/rebar compile skip_deps=true deps_dir="/projects/beehive/_elixir_build/extreme/_build/dev/lib"" command failed. You can recompile this dependency with "mix deps.compile ssl_verify_fun", update it with "mix deps.update ssl_verify_fun" or clean it with "mix deps.clean ssl_verify_fun"
==> extreme
===> Failed to fetch and copy dep: {elixir,"extreme","0.7.1"}

Any tips on what this might be?

httpoison dependency still locked to 0.11 instead of 1.X.X

Hello,

I was updating my project dependencies, and I saw that extreme is still asking for 0.11 httpoison version, while 1.0.0 version was released since january, and the latest in june (1.2.0).

Why do you not upgrade this dependency ? Technical constraint ?

Thank you in advance for your response.

Extreme.handle_info :tcp_closed exception after kill -9

Its on a deleted_stream_subscription_fix branch

[error] GenServer EventStore terminating
** (FunctionClauseError) no function clause matching in Extreme.handle_info/2
    (extreme) lib/extreme.ex:143: Extreme.handle_info({:tcp_closed, #Port<0.14560>}, %{credentials: %{pass: "changeit", user: "admin"}, pending_responses: %{}, received_data: "", should_receive: nil, socket: #Port<0.14560>, subscriptions: %{}, subscriptions_sup: #PID<0.350.0>})
    (stdlib) gen_server.erl:615: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:681: :gen_server.handle_msg/5
    (stdlib) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Last message: {:tcp_closed, #Port<0.14560>}
State: %{credentials: %{pass: "changeit", user: "admin"}, pending_responses: %{}, received_data: "", should_receive: nil, socket: #Port<0.14560>, subscriptions: %{}, subscriptions_sup: #PID<0.350.0>}
[error] GenServer #PID<0.350.0> terminating
** (FunctionClauseError) no function clause matching in Extreme.handle_info/2
    (extreme) lib/extreme.ex:143: Extreme.handle_info({:tcp_closed, #Port<0.14560>}, %{credentials: %{pass: "changeit", user: "admin"}, pending_responses: %{}, received_data: "", should_receive: nil, socket: #Port<0.14560>, subscriptions: %{}, subscriptions_sup: #PID<0.350.0>})
    (stdlib) gen_server.erl:615: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:681: :gen_server.handle_msg/5
    (stdlib) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Last message: {:EXIT, #PID<0.349.0>, {:function_clause, [{Extreme, :handle_info, [{:tcp_closed, #Port<0.14560>}, %{credentials: %{pass: "changeit", user: "admin"}, pending_responses: %{}, received_data: "", should_receive: nil, socket: #Port<0.14560>, subscriptions: %{}, subscriptions_sup: #PID<0.350.0>}], [file: 'lib/extreme.ex', line: 143]}, {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 615]}, {:gen_server, :handle_msg, 5, [file: 'gen_server.erl', line: 681]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 240]}]}}
State: {:state, {#PID<0.350.0>, Extreme.SubscriptionsSupervisor}, :simple_one_for_one, [{:child, :undefined, Extreme.Subscription, {Extreme.Subscription, :start_link, [#PID<0.349.0>]}, :temporary, 5000, :worker, [Extreme.Subscription]}], :undefined, 3, 5, [], Extreme.SubscriptionsSupervisor, #PID<0.349.0>}

Restart Extreme connection after TCP failure

When I shutdown Eventstore, all my application shuts down, even that the supervision are setup to permanent
@mindreframer I renamed the app to workflow ;)

Pls, if you can review.

git clone https://github.com/work-capital/workflow.git
iex -S mix
:observer.start

Poor performance when using read_and_stay_subscribed on a large stream

We've attempted to use read_and_stay_subscribed to catch up and subscribe to a large stream. While processing the stream to catch up, we start seeing timeouts in other processes that are accessing the EventStore, or the Extreme TCP session will get closed from a heartbeat timeout.

Our easy alternative is to manage the "catch up" ourselves by doing our own individual reads until we're caught up, and then turn on the subscription. Nonetheless, I thought I'd raise the issue with you to see if we might be missing something when it comes to how we're using Extreme.Subscription.

I haven't dug down to root cause, but I suspect it's related to how the Extreme.Subscription starts out by reading from the stream as fast as it can and buffers up the events. It may be that these back-to-back reads are putting enough stress on the EventStore to cause these other timeouts.

Alternatively, since the :on_event call is asynchronous and our receiver can't keep up with the initial reads, I'm concerned that the mailbox of our receiver is filling up with :on_event messages which can kill process performance, but I haven't confirmed that theory yet.

I was hoping to find some way to control the maximum size of the Extreme.Subscription state.buffered_messages, or somehow apply backpressure to Extreme.Subscription, but I don't see anything.

Any thoughts?

Set maxCount on the stream?

How do I set "$maxCount" on the stream?

I tried "my_stream/metadata", with WriteEvents but it just creates a new stream: my_stream/metadata.

However, when I issue this command from bash, it works:

curl -i "http://127.0.0.1:2113/streams/my_stream/metadata" -H "Content-Type:application/vnd.eve    ntstore.events+json" -d '[
     {
         "eventId":"5f6380f5-c885-46e6-8abb-781a0c8b462e",
         "eventType" : "set_snapshot_count",
         "data" : {
             "$maxCount" : 1,
         },
     }
   ]'

Notify subscriber when 'caught up' with buffered messages

Hey there,

First of all: thank you very much for this library, I'm grateful for all of the work you've done so far.

I was making a read model in my application and was thinking that it would be great if the subscription process could notify my subscribing process that it has finished sending all of the buffered messages.

The reason is that I don't really want my reader to respond with what presumably stale data when it's being spun up as the result of a query.

Does this even make sense? Is there some existing way to do this that I'm oblivious to?

The change would look something like this:

subscription.ex

def handle_cast(:push_buffered_messages, state) do
  state.buffered_messages |> Enum.each(fn(e)-> send(state.subscriber, {:on_event, e}) end)
  send state.subscriber, :caught_up # <-- this
  {:noreply, %{state|status: :subscribed, buffered_messages: []}}
end

Thanks for your time ๐Ÿ˜„

Compatibility with Elixir 1.6.x

Hi,

Me and my team have upgraded our Elixir project to new version (1.6.3). After some tests, we do not see any problems with our application, but we have a warning which says that extreme requires Elixir 1.3, or 1.4, or 1.5.

Can you tell me if a new release is coming, and if not, why ?
If there is a serious compatibility issue, we want to know, to see if we can deploy it into production or not.

Thanks in advance

Number of open connections

What is a better option, to have one connection to write and read, and another only for subscriptions, or both, subscriptions and W/R using the same connection ? I am asking this because I saw that Eventstore suggests that it's always a best practice to hold one connection only.

Read model : subscribe to all

Hi all,

Just wanted to ask if there is a way to subscribe to all events or, ideally, subscribe to all streams by regex or wildcard matching? I had a quick look at defined messages but the only thing I found is ReadAllEvent, I assume it does not start subscription?

Unsubscribe?

How can I unsubscribe from event store?

E.g. I'm replaying an aggregate with Extreme.read_and_stay_subscribed, but when I am caught_up - I don't need to receive the events anymore (since I apply them locally to my aggregate).

Thank you.

GRPC support

Is there any plan on adding GRPC support as the TCP is deprecated?
If not, would you accept a PR?

Bad argument in arithmetic expression in Extreme.slice_content/3

While processing many of events per second, we are randomly running into a bad argument in arithmetic expression error.

** (EXIT) an exception was raised:
        ** (ArithmeticError) bad argument in arithmetic expression
           (extreme) lib/extreme.ex:432: Extreme.slice_content/3
           (extreme) lib/extreme.ex:425: Extreme.process_package/2
           (extreme) lib/extreme.ex:407: Extreme.handle_info/2
           (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
           (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
           (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
            (elixir) lib/gen_server.ex:774: GenServer.call/3

We've used the Logger.debug from inside Extreme.slice_content to see that data that is coming in:

2018-03-09 12:09:14.407 [error] We have unfinished message of length nil(2): <<36, 0>>

Extending to the atompub protocol and http api

I think Extreme just uses the TCP API. Would it be a separate project or an extension to this project to extend it to using the http api

In general our recommendation would be to use AtomPub as your primary protocol unless you have low subscriber SLAs or need higher throughput on reads and writes than Atom can offer. This is largely due to the open nature and ease of use of the Atom protocol. Very often in integration scenarios these are more important than raw performance.

subscribe to deleted stream error

When subscribing with Extreme.read_and_stay_subscribed to non existing stream stream, it seems it has some internal bug?

[error] GenServer #PID<0.316.0> terminating
** (FunctionClauseError) no function clause matching in Extreme.Subscription.process_response/2
    (extreme) lib/subscription.ex:65: Extreme.Subscription.process_response({:error, :StreamDeleted, %Extreme.Messages.ReadStreamEventsCompleted{error: "", events: [], is_end_of_stream: true, last_commit_position: 45902490, last_event_number: 2147483647, next_event_number: -1, result: :StreamDeleted}}, %{buffered_messages: [], connection: #PID<0.296.0>, read_params: %{from_event_number: 0, per_page: 4096, require_master: false, resolve_link_tos: true, stream: "Domain.Acl.AclAggregate"}, read_until: 2147483648, status: :reading_events, subscriber: Domain.Acl.AclAggregate})
    (extreme) lib/subscription.ex:48: Extreme.Subscription.handle_cast/2
    (stdlib) gen_server.erl:615: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:681: :gen_server.handle_msg/5
    (stdlib) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Last message: {:"$gen_cast", :read_events}
State: %{buffered_messages: [], connection: #PID<0.296.0>, read_params: %{from_event_number: 0, per_page: 4096, require_master: false, resolve_link_tos: true, stream: "Domain.Acl.AclAggregate"}, read_until: 2147483648, status: :reading_events, subscriber: Domain.Acl.AclAggregate}

last event

Dear Supporters,
I am trying to get the last event from a stream, once I am saving snapshots into that one. The problem is that I have to read all events from a stream before extracting the last one. Is there any way to read only the last one ? I saw that the C# API has a ReadStreamEventsBackwardAsync, shoud I create a PR adding this message in the protobuf include file ?
best, and thanks for sharing this fanstastic lib

Compatibility with Elixir 1.9.x

So Elixir 1.9 is out! ๐ŸŽ‰

Seeing the following when trying to compile with 1.9 (using the latest github):

==> extreme
Compiling 13 files (.ex)

== Compilation error in file lib/messages.ex ==
** (ArgumentError) argument error
    :erlang.++("event_store", ".proto")
    src/gpb_lib.erl:788: :gpb_lib.copy_filename_ext/2
    src/gpb_parse.yrl:489: anonymous fn/2 in :gpb_parse.shorten_file_paths/1
    (stdlib) lists.erl:1239: :lists.map/2
    src/gpb_parse.yrl:486: :gpb_parse.shorten_file_paths/1
    src/gpb_parse.yrl:457: :gpb_parse.post_process_all_files/2
    lib/exprotobuf/parser.ex:22: Protobuf.Parser.finalize!/2
    lib/exprotobuf.ex:152: Protobuf.parse/2
could not compile dependency :extreme, "mix compile" failed. You can recompile this dependency with "mix deps.compile extreme", update it with "mix deps.update extreme" or clean it with "mix deps.clean extreme"

However, running mix compile in a cloned version of extreme works.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.