akkadotnet / akka.persistence.sql Goto Github PK
View Code? Open in Web Editor NEWLinq2Db implementation of Akka.Persistence.Sql. Common implementation for SQL Server, Sqlite, Postgres, Oracle, and MySql.
License: Apache License 2.0
Linq2Db implementation of Akka.Persistence.Sql. Common implementation for SQL Server, Sqlite, Postgres, Oracle, and MySql.
License: Apache License 2.0
On the JVM akka-persistence-jdbc (upon which this is based,) they are planning on adding a WriterUUID column. Based on my gleanings the intent is to track which incarnation of a Journal plugin wrote to the store.
We will require Migration Scripts for users. Possibly use FluentMigrator to lower need to write individual scripts for every database?
The current Implementation of Persistence.Query may not scale well on certain queries.
The biggest problem is that we are using .ToList()
in places that in large systems may return large results.
We have two options. The first of which is a 'universal' fix and can easily be implemented as an opt-in feature, the second will require a little more work but will be better for users long term.
.ToListAsync()
to instead stream off an IAsyncEnumerable
, via https://gist.github.com/to11mtm/dc9a350080fcbcb14098c14509d70e7fMessagesWithBatch
Option 1 is the 'safest' and should work with all DBs with the possible exception of SQLite (because of it's tendencies for readers and writers to block each other.) Option 2 will be better performing.
I'm expecting that the end state will be that most queries will be better under 'batched' reads, but there will be one or two (I'm thinking mainly CurrentPersistenceIds()
) that will still benefit from the option of switching.
Version Information
Version of Akka.Persistence.Sql? dev
Describe the bug
CurrentEventsByTag
query on PostgreSql using the new TagTable returns unordered events
To Reproduce
Steps to reproduce the behavior:
[Fact]
public override void ReadJournal_query_CurrentEventsByTag_should_see_all_150_events()
{
var queries = ReadJournal as ICurrentEventsByTagQuery;
var a = Sys.ActorOf(SpecTestActor.Props("a"));
for (var i = 0; i < 150; ++i)
{
a.Tell("a green apple");
ExpectMsg("a green apple-done");
}
var greenSrc = queries.CurrentEventsByTag("green", offset: Offset.NoOffset());
var probe = greenSrc.RunWith(this.SinkProbe<EventEnvelope>(), Materializer);
probe.Request(150);
var list = probe.ExpectNextN(150).ToList();
Log.Info($"Order: {string.Join(",", list.Select(e => e.SequenceNr))}");
list.Count.Should().Be(150);
var expectedSeqNr = 0;
foreach (var env in list)
{
env.PersistenceId.Should().Be("a");
env.Event.Should().Be("a green apple");
expectedSeqNr++;
env.SequenceNr.Should().Be(expectedSeqNr);
}
probe.ExpectComplete();
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
}
Expected behavior
SequenceNr be a list of numbers, from 1 to 150
Actual behavior
Order:
1,2,3,4,5,6,7,8,9,10,
11,12,13,14,15,16,17,18,19,20,
21,22,23,24,25,26,27,28,29,30,
31,32,33,34,35,36,37,38,39,40,
41,42,43,44,45,46,47,48,49,50,
51,52,53,54,55,56,57,58,59,60,
61,62,63,64,65,66,67,68,69,70,
71,72,73,74,75,76,77,78,79,80,
81,82,83,84,85,86,87,88,89,90,
91,92,93,94,95,96,97,98,100,102, <== SequenceNr goes out of order here
104,106,108,110,112,114,116,118,120,122,
124,126,128,130,132,134,136,138,140,142,
144,146,148,150,99,101,103,105,107,109,
111,113,115,117,119,121,123,125,127,129,
131,133,135,137,139,141,143,145,147,149
We do not have a configuration or a compatibility spec to work with Akka.Persistence.Oracle.
In theory, a user would be able to manually configure the tables with delete-compatibility-mode
, but we should provide easy configuration and ideally provide unit tests.
Potential Challenges: I know nothing about the legal/liscensing implications of running Oracle in a CI for this.
It would be nice to add naming to the Stream stages to make logs/etc more clear.
This is just a matter of adding .Named(string name)
calls on the various stages.
With other parts of Akka.Net having ActorSystemSetup
and similar, it would be nice to have the same capabilities here so that users can easily set up plugin configurations without having to write a bunch of HOCON.
This class should allow for configuration of all the 'important bits' that are exposed in HOCON configs.
Linq2Db.ProviderName
There are a few places in our read journal queries where we are using .ToList()
instead of .ToListAsync()
for fetching from the DB. These should probably be refactored to use the async methods and/or a more lazy form of evaluation.
It would be nice to add some Metrics measurement capabilities.
We should look for:
Questions:
ReplayMessagesAsync
has to make multiple round trips, do we report that as well as a separate metric?)RetryPolicy
is defined.Because Linq2Db does not have the reactive stream capabilities that SlickDB has for JDBC side, We had to write some of our data access patterns differently. In the interests of getting a working product out we did not in all cases go with a 'consolidated' design where database access patterns are held in their own file, instead right now the various Journals and SnapshotStores are using Linq2Db directly.
It would not be a bad idea to consolidate this database logic into another class. Linq2Db thankfully has a lot of capabilities around syntactic sugar that make it easy to write composable functor-like classes.
Version Information
I'm using Akka.Cluster.Hosting v1.5.15 and the associated libraries like Akka 1.5.15.
Describe the performance issue
I'm using an ASP.NET Core background service that uses a PersistenceQuery (CurrentEventsByTag) that should be run at application start to retrieve some data and cache it in-memory. I want my app to scale using PostgreSQL but also scale down using Sqlite which works great by using Akka.Persistence.Sql and making the provider itself configurable.
The background service I'm using looks like this and is added to the ASP.NET Core app via AddHostedService.
public class TestBackgroundService : BackgroundService
{
private readonly ActorSystem _actorSystem;
public TestBackgroundService(ActorSystem actorSystem)
{
_actorSystem = actorSystem;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
Log.Information("TestBackgroundService: Starting query...");
var query = PersistenceQuery
.Get(_actorSystem)
.ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier)
.CurrentEventsByTag("t:wfd", Offset.NoOffset());
await foreach (var ee in query.RunAsAsyncEnumerable(_actorSystem).WithCancellation(stoppingToken))
{
Log.Information("TestBackgoundService: Got event {Event}", ee.Event);
}
}
}
When using the Sqlite provider (with Microsoft.Data.Sqlite v8.0.1), I get my first event after 11 seconds:
[17:25:24 INF] TestBackgroundService: Starting query...
[17:25:24 INF] Now listening on: http://localhost:12341
[17:25:24 INF] Application started. Press Ctrl+C to shut down.
[17:25:24 INF] Hosting environment: Development
[17:25:24 INF] Content root path: /Users/kp/Projects/test/src/test
[17:25:35 INF] TestBackgoundService: Got event Incremented { Tag = t:wfd }
[17:25:35 INF] TestBackgoundService: Got event Incremented { Tag = t:wfd }
This delay doesn't happen with PostgreSQL or using another query type. Is there a reason the CurrentEventsByTag query runs after a 10-11s delay? The Sqlite database itself it empty except for the two events in the journal. I'm using the tag table instead of csv tags.
Could you give me a tip on how to get more information of the persistence query performance?
Also I expect to be the Sqlite provider to be slower than the PostgreSQL provider and a 10s delay is not the end of the world for an ASP.NET Core web app - but this doesn't feel like just "bad performance" but more like theres an intentional 10s delay somewhere and I would like to understand why.
Environment
I'm using a 16-inch 2021 MacBook Pro (M1) with 16 GB RAM.
.NET SDK:
Version: 8.0.101
Commit: 6eceda187b
Workload version: 8.0.100-manifests.8a1da915
Runtime Environment:
OS Name: Mac OS X
OS Version: 14.0
OS Platform: Darwin
RID: osx-arm64
Base Path: /usr/local/share/dotnet/sdk/8.0.101/
See the actual problem here: akkadotnet/akka.net#3811
Example implementation in MongoDb: akkadotnet/Akka.Persistence.MongoDB#71
Creating this issue to follow up the problems with sqlite in upgrading linq2db.
All information is in the comments from #163 (comment)
We want to upgrade linq2db, but can't get past 3.5.0 because then sqlite explodes with some DateTime related issues.
This is a discussion thread over how journal event deletion should behave in Linq2Db.
In the current code as of November 2022, Linq2Db is still supporting event journal logical delete operation and offers queries over deleted events.
Logical deletion support has been removed from our other SQL plugins and have been removed from the latest JVM codes, I strongly believe that we should also do the same with Linq2B as it will make the plugin settings less confusing for users.
In the current code as of november 2022, journal event deletion behavior in Persistence.Linq2Db is inconsistent with the rest of Persistence.Sql ecosystem.
This difference needs to be rectified in order to make Persistence.Linq2Db consistent with the rest of the Sql ecosystem.
Adding Support for a mechanism similar to ITimestampProvider
in Akka.Persistence.Sql.Common will be useful to some users that wish to have timestamps in a format other than .NET DateTime Ticks (i.e. Unix epoch instead)
Ideally, we should try to use a base/abstract class instead of an Interface, or at least benchmark the difference between the two; Reason for this is on high velocity systems, cost of interface calls on reading/writing events may have an impact instead of a simple virtual call.
Is your feature request related to a problem? Please describe.
Users would like the ability to use a separate table for tags with SQL Persistence, as described in akkadotnet/akka.net#5296 .
Additionally, Existing users of Akka.Persistence.Linq2Db would like to ensure they continue to maintain fast read and write performance from their existing journals.
It would be extra great, if we could provide users a way to migrate with zero downtime (if more effort.)
Describe the solution you'd like
We can leverage Linq2Db and Akka.Persistence.Linq2Db's pipeline to provide a solution.
By providing 3 Read and Write modes, we can allow for a 'seamless' deployment, ideal state would be:
The JVM akka-persistence-jdbc module (upon which this is based) is planning on adding event_manifest
and ser_manifest
as columns to their database.
We already have a manifest
field that is used to hold serializer manifest information, however we do not have anything similar to event_manifest.
We should get a better understanding of how this new field will be used and implement it alongside any migrations necessary.
We don't currently have ICurrentAllEventsQuery
Implemented for Linq2Db Journal. We should add this functionality so that users who require this functionality can migrate.
We do not have a configuration or a compatibility spec to work with Akka.Persistence.MySql.
In theory, a user would be able to manually configure the tables with delete-compatibility-mode
, but we should provide easy configuration and ideally provide unit tests.
We should make sure such integration/implementation uses a MySql connector and Implementation packages that are non-encumbering (i.e. we should steer users away from adopting the Oracle MySqlConnector, but rather an MIT or Apache based one, consider MariaDB or similar instead of vanilla MySql)
In order to avoid breaking changes for existing Linq2Db users when we upgrade.
Creating this based on akkadotnet/Akka.Persistence.PostgreSql#69
We can do this once Linq2Db has a Postgres 10 ProviderName
available, similar to how we handle SQLite's identity column rules today.
Is your feature request related to a problem? Please describe.
The use of a Journal_Metadata
table in Akka.Persistence.Sql.Common is a legacy of it's design based on leveldb.
akka-persistence-jdbc (upon which this is based) does not use such a table; Journal sequence numbers are instead read off the last deleted row (which is kept in the table regardless of soft-delete setting, with IsDeleted
set to true/1
)
Akka.Persistence.Linq2Db currently has a compatibility mode for this, to allow users to still achieve most of the benefits of Akka.Persistence.Linq2Db's architecture without losing their existing journal table (and moving back if needed.)
journal_metadata
However, in this mode there are still drawbacks, as it means deletes must do 2 writes to journal
(update IsDeleted, delete all but last soft deleted record) as well as a write to journal_metadata
, all in one transaction. Under heavy load in prod where a lot of deletions are happening (think at-least-once-delivery type buffers that are transient and have lots of churn) you can still have some issues around deletions getting in the way of other writes.
And as noted in #67 and #68 , if a tag table is to be used, an additional table will be involved in the transaction.
For best results, if users are going to use tag tables, they should -not- use sql common compatibility mode.
Once the journal deletes records (thus applying it's delete style to both journal
and journal_metadata
), you can turn off compatibility mode. However, there is no 'automated' or 'semi-automated' migration to do this
Describe the solution you'd like
I think it would be nice to create a script utility to 'migrate' the journal metadata state over to the new format, similar to the index helper app Proof of concept.
We may require a 'blank' record to put back into the journal, that if accidentally 'undeleted' (isDeleted
set back to false) will not cause an undue recovery failure (or, perhaps, a recovery failure that makes it clear to the users, they should not have done that.)
Version Information
Version of Akka.NET?
1.5.14
Which Akka.NET Modules?
Akka.Persistence.Sql
Describe the bug
I am using the new LINQ to db persistence with hocon config. I am initializing my tables with auto-initialize = true.
But this only happens if any actors attempt to persist something. In my case I also start a read journal. Then read journal complains the tables don't exist yet. Since no events are persisted yet. Surprisingly though the tables are created after read journal throws.
I have configured auto-initialize for the read journal as below
akka {
persistence {
journal {
plugin = "akka.persistence.journal.sql"
sql {
class = "Akka.Persistence.Sql.Journal.SqlWriteJournal, Akka.Persistence.Sql"
connection-string = ${config.connection-string}
provider-name = "SQLite.MS"
auto-initialize = true
}
}
query.journal.sql {
class = "Akka.Persistence.Sql.Query.SqlReadJournalProvider, Akka.Persistence.Sql"
connection-string = ${config.connection-string}
provider-name = "SQLite.MS"
auto-initialize = true
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.sql"
sql {
class = "Akka.Persistence.Sql.Snapshot.SqlSnapshotStore, Akka.Persistence.Sql"
connection-string = ${config.connection-string}
provider-name = "SQLite.MS"
auto-initialize = true
}
}
...
But I saw that auto-initialize isn't a supported config option for query.journal.sql. From the code I saw that, initilializing is done by
SqlWriteJournal. So either provide a programmatic way to force generating tables or better make sure read journal has also ability to generate the tables in case it is started before.
To Reproduce
Just try to use a read journal without persisting any events.
Links to working reproductions on Github / Gitlab are very much appreciated
Expected behavior
I would like readjournal not to fail with table don't exist error without persisting a dummy event.
Actual behavior
Read journal throws an exception when reading events. But somehow tables are created then.
[18:05:38 ERR] An exception occured inside SelectAsync while executing Task. Supervision strategy: Stop
System.AggregateException: One or more errors occurred. (SQLite Error 1: 'no such table: journal'.)
---> Microsoft.Data.Sqlite.SqliteException (0x80004005): SQLite Error 1: 'no such table: journal'.
at Microsoft.Data.Sqlite.SqliteException.ThrowExceptionForRC(Int32 rc, sqlite3 db)
at Microsoft.Data.Sqlite.SqliteCommand.PrepareAndEnumerateStatements()+MoveNext()
at Microsoft.Data.Sqlite.SqliteCommand.GetStatements()+MoveNext()
at Microsoft.Data.Sqlite.SqliteDataReader.NextResult()
at Microsoft.Data.Sqlite.SqliteCommand.ExecuteReader(CommandBehavior behavior)
at Microsoft.Data.Sqlite.SqliteCommand.ExecuteReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken)
at Microsoft.Data.Sqlite.SqliteCommand.ExecuteDbDataReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken)
at LinqToDB.Data.DataConnection.ExecuteReaderAsync(CommandBehavior commandBehavior, CancellationToken cancellationToken)
at LinqToDB.Data.DataConnection.ExecuteDataReaderAsync(CommandBehavior commandBehavior, CancellationToken cancellationToken)
at LinqToDB.Data.DataConnection.ExecuteDataReaderAsync(CommandBehavior commandBehavior, CancellationToken cancellationToken)
at LinqToDB.Data.DataConnection.QueryRunner.ExecuteReaderAsync(CancellationToken cancellationToken)
at LinqToDB.Linq.QueryRunner.ExecuteQueryAsync[T](Query query, IDataContext dataContext, Mapper`1 mapper, Expression expression, Object[] ps, Object[] preambles, Int32 queryNumber, Func`2 func, TakeSkipDelegate skipAction, TakeSkipDelegate takeAction, CancellationToken cancellationToken)
at LinqToDB.Linq.QueryRunner.ExecuteQueryAsync[T](Query query, IDataContext dataContext, Mapper`1 mapper, Expression expression, Object[] ps, Object[] preambles, Int32 queryNumber, Func`2 func, TakeSkipDelegate skipAction, TakeSkipDelegate takeAction, CancellationToken cancellationToken)
at LinqToDB.Linq.ExpressionQuery`1.GetForEachAsync(Action`1 action, CancellationToken cancellationToken)
at LinqToDB.Linq.ExpressionQuery`1.GetForEachAsync(Action`1 action, CancellationToken cancellationToken)
at LinqToDB.AsyncExtensions.ToListAsync[TSource](IQueryable`1 source, CancellationToken token)
at Akka.Persistence.Sql.Query.Dao.BaseByteReadArrayJournalDao.<>c__DisplayClass6_0.<<JournalSequence>b__1>d.MoveNext()
--- End of stack trace from previous location ---
at Akka.Persistence.Sql.Extensions.ConnectionFactoryExtensions.ExecuteWithTransactionAsync[T](AkkaPersistenceDataConnectionFactory factory, IsolationLevel level, CancellationToken token, Func`3 handler)
at Akka.Persistence.Sql.Extensions.ConnectionFactoryExtensions.ExecuteWithTransactionAsync[T](AkkaPersistenceDataConnectionFactory factory, IsolationLevel level, CancellationToken token, Func`3 handler)
at Akka.Persistence.Sql.Extensions.ConnectionFactoryExtensions.ExecuteWithTransactionAsync[T](AkkaPersistenceDataConnectionFactory factory, IsolationLevel level, CancellationToken token, Func`3 handler)
at Akka.Persistence.Sql.Extensions.ConnectionFactoryExtensions.ExecuteWithTransactionAsync[T](AkkaPersistenceDataConnectionFactory factory, IsolationLevel level, CancellationToken token, Func`3 handler)
at Akka.Persistence.Sql.Query.Dao.BaseByteReadArrayJournalDao.<JournalSequence>b__6_0(<>f__AnonymousType4`3 input)
--- End of inner exception stack trace ---
at Akka.Actor.PipeToSupport.PipeTo[T](T
Environment
Linux, docker .NET 8
Are you running on Linux? Windows? Docker? Which version of .NET?
Describe the bug
When tag write and read mode is set to TagTable, some code path misread the configuration and still thinks that they are reading from a csv database row instead of tag table.
So, the purpose of ConsumeSequenceForTagInsert
was as follows:
Because we are batching all of our writes, but bulk copy is -much- faster than row-by-row, the idea was that in cases of:
ItemWithTags ItemWithTags ItemNoTags ItemNoTags ItemNoTags ItemNoTags ItemWithTags ItemNoTags
etc... we would still be able to take advantage of bulk-copy for the no-tag cases, and at least lessen the write throughput penalty.
Originally posted by @to11mtm in #138 (comment)
We don't currently have IAllEventsQuery
implemented on Linq2Db journal. Some Persistence.Query users may require this functionality to migrate over.
Version Information
AKKA 1.5.16
Modules:
AKKA.Persistence.SQL 1.5.13,
AKKA Streams 1.5.16,
AKKA Serilog 1.5.12.1
Describe the performance issue
After migrating to AKKA 1.5, .NET 8 and the new AKKA.Persistence.SQL, the memory usage has increased to 1 GB (sometimes 2 GB between separate runs, with no changes).
Before the upgrade, the project was running AKKA 1.4.49, .NET 6 and the AKKA.Persistence.SqlServer 1.4.35 and the memory allocation was flattening at 400 MB (mainly due to asp.net core hosting).
I did the upgrade performing the complete database schema upgrade with the migration of Tags to the seaprate table and enabling the UUID in the EventJournal.
Data and Specs
Memory dump, sample persistence configuration and a screenshot with the diagnostic events available here
Expected behavior
Similar memory alocation as before
Actual behavior
Memory allocation has increased to 1 GB, and doesn't increase further (as opposed to 400 MB before the upgrade), and sometimes for unexplained reasons it caps at 2 GB.
Environment
Running on Windows 11, .NET 8, SQL Server 2022, hosting the ActorSystem in an asp.net core environment.
Not using the new AKKA Hosting, for historical reasons I create the ActorSystem through a delegate when the asp.net server starts.
No Cluster, no Remoting.
Additional context
Since the migraiton, I keep seeing this line in the debug console log, every 1-2 seconds:
[22:54:19 DBG] Start timer [Akka.Persistence.Sql.Query.InternalProtocol.QueryOrderingIdsTimerKey] with generation [9]
[22:54:21 DBG] Start timer [Akka.Persistence.Sql.Query.InternalProtocol.QueryOrderingIdsTimerKey] with generation [10]
[22:54:22 DBG] Start timer [Akka.Persistence.Sql.Query.InternalProtocol.QueryOrderingIdsTimerKey] with generation [11]
[22:54:27 DBG] Start timer [Akka.Persistence.Sql.Query.InternalProtocol.QueryOrderingIdsTimerKey] with generation [12]
[22:54:29 DBG] Start timer [Akka.Persistence.Sql.Query.InternalProtocol.QueryOrderingIdsTimerKey] with generation [13]
[22:54:30 DBG] Start timer [Akka.Persistence.Sql.Query.InternalProtocol.QueryOrderingIdsTimerKey] with generation [14]
I don't know if it is relevant, the Serilog also has a sink to a Loki instance, in addition to the Console sink.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.