Comments (3)
Currently Bulk operations are each single Transaction which are automatically committed.
This is explained in README:
Each of these operations are separate transactions.
So when using multiple operations in single procedure and if one would break because of some Db constraint, previous would stay executed.
So you can not use additionally Transaction nor Commit, as you have already figured out.
Just call directly Context.BulkInsert(assets);
So it not a bug, however I will add label: enhacement.
I have tried once to make it work with Transactions but had some errors so I didn't implement it at that moment.
I am curious what exactly changes did you make?
If they are not in collision with other use case scenarios then they could be integrated into the library which would enable you to use it directly from nugget.
from efcore.bulkextensions.
Sorry about missing that explanation in the README. What i did to make the BulkInsert to work with my project was done only to SqlBulkOperation.cs. I added 4 functions to the class GetSqlTransaction, OpenAndGetSqlConnection, OpenAndGetSqlConnectionAsync and GetSqlBulkCopy which can be found from the bottom. These are then used in Insert and InsertAsync methods. Projects current unit tests were running fine after the mods.
You can find the full modified SqlBulkOperation.cs class below.
internal static class SqlBulkOperation
{
public static void Insert<T>(DbContext context, IList<T> entities, TableInfo tableInfo, Action<decimal> progress)
{
var sqlConnection = OpenAndGetSqlConnection(context);
var transaction = context.Database.CurrentTransaction;
try
{
using (var sqlBulkCopy = GetSqlBulkCopy(sqlConnection, transaction))
{
tableInfo.SetSqlBulkCopyConfig(sqlBulkCopy, entities, progress);
using (var reader = ObjectReaderEx.Create(entities, tableInfo.ShadowProperties, context, tableInfo.PropertyColumnNamesDict.Keys.ToArray()))
{
sqlBulkCopy.WriteToServer(reader);
}
}
}
finally
{
if (transaction == null)
{
sqlConnection.Close();
}
}
}
public static async Task InsertAsync<T>(DbContext context, IList<T> entities, TableInfo tableInfo, Action<decimal> progress)
{
var sqlConnection = await OpenAndGetSqlConnectionAsync(context);
var transaction = context.Database.CurrentTransaction;
try
{
using (var sqlBulkCopy = GetSqlBulkCopy(sqlConnection, transaction))
{
tableInfo.SetSqlBulkCopyConfig(sqlBulkCopy, entities, progress);
using (var reader = ObjectReaderEx.Create(entities, tableInfo.ShadowProperties, context, tableInfo.PropertyColumnNamesDict.Keys.ToArray()))
{
await sqlBulkCopy.WriteToServerAsync(reader).ConfigureAwait(false);
}
}
}
finally
{
if (transaction == null)
{
sqlConnection.Close();
}
}
}
public static void Merge<T>(DbContext context, IList<T> entities, TableInfo tableInfo, OperationType operationType, Action<decimal> progress) where T : class
{
tableInfo.InsertToTempTable = true;
tableInfo.CheckHasIdentity(context);
context.Database.ExecuteSqlCommand(SqlQueryBuilder.CreateTableCopy(tableInfo.FullTableName, tableInfo.FullTempTableName));
if (tableInfo.BulkConfig.SetOutputIdentity)
{
context.Database.ExecuteSqlCommand(SqlQueryBuilder.CreateTableCopy(tableInfo.FullTableName, tableInfo.FullTempOutputTableName));
}
try
{
SqlBulkOperation.Insert<T>(context, entities, tableInfo, progress);
context.Database.ExecuteSqlCommand(SqlQueryBuilder.MergeTable(tableInfo, operationType));
context.Database.ExecuteSqlCommand(SqlQueryBuilder.DropTable(tableInfo.FullTempTableName));
if (tableInfo.BulkConfig.SetOutputIdentity && tableInfo.HasSinglePrimaryKey)
{
tableInfo.UpdateOutputIdentity(context, entities);
context.Database.ExecuteSqlCommand(SqlQueryBuilder.DropTable(tableInfo.FullTempOutputTableName));
}
}
catch (Exception ex)
{
if (tableInfo.BulkConfig.SetOutputIdentity && tableInfo.HasSinglePrimaryKey)
{
context.Database.ExecuteSqlCommand(SqlQueryBuilder.DropTable(tableInfo.FullTempOutputTableName));
}
context.Database.ExecuteSqlCommand(SqlQueryBuilder.DropTable(tableInfo.FullTempTableName));
throw ex;
}
}
public static async Task MergeAsync<T>(DbContext context, IList<T> entities, TableInfo tableInfo, OperationType operationType, Action<decimal> progress) where T : class
{
tableInfo.InsertToTempTable = true;
await tableInfo.CheckHasIdentityAsync(context).ConfigureAwait(false);
await context.Database.ExecuteSqlCommandAsync(SqlQueryBuilder.CreateTableCopy(tableInfo.FullTableName, tableInfo.FullTempTableName)).ConfigureAwait(false);
if (tableInfo.BulkConfig.SetOutputIdentity && tableInfo.HasIdentity)
{
await context.Database.ExecuteSqlCommandAsync(SqlQueryBuilder.CreateTableCopy(tableInfo.FullTableName, tableInfo.FullTempOutputTableName)).ConfigureAwait(false);
}
try
{
await SqlBulkOperation.InsertAsync<T>(context, entities, tableInfo, progress).ConfigureAwait(false);
await context.Database.ExecuteSqlCommandAsync(SqlQueryBuilder.MergeTable(tableInfo, operationType)).ConfigureAwait(false);
await context.Database.ExecuteSqlCommandAsync(SqlQueryBuilder.DropTable(tableInfo.FullTempTableName)).ConfigureAwait(false);
if (tableInfo.BulkConfig.SetOutputIdentity && tableInfo.HasIdentity)
{
await tableInfo.UpdateOutputIdentityAsync(context, entities).ConfigureAwait(false);
await context.Database.ExecuteSqlCommandAsync(SqlQueryBuilder.DropTable(tableInfo.FullTempOutputTableName)).ConfigureAwait(false);
}
}
catch (Exception ex)
{
if (tableInfo.BulkConfig.SetOutputIdentity && tableInfo.HasIdentity)
{
await context.Database.ExecuteSqlCommandAsync(SqlQueryBuilder.DropTable(tableInfo.FullTempOutputTableName)).ConfigureAwait(false);
}
await context.Database.ExecuteSqlCommandAsync(SqlQueryBuilder.DropTable(tableInfo.FullTempTableName)).ConfigureAwait(false);
throw ex;
}
}
private static SqlTransaction GetSqlTransaction(IDbContextTransaction transaction)
{
return transaction.GetDbTransaction() as SqlTransaction;
}
private static SqlConnection OpenAndGetSqlConnection(DbContext context)
{
if (context.Database.GetDbConnection().State != ConnectionState.Open)
{
context.Database.GetDbConnection().Open();
}
return context.Database.GetDbConnection() as SqlConnection;
}
private static async Task<SqlConnection> OpenAndGetSqlConnectionAsync(DbContext context)
{
if (context.Database.GetDbConnection().State != ConnectionState.Open)
{
await context.Database.GetDbConnection().OpenAsync().ConfigureAwait(false);
}
return context.Database.GetDbConnection() as SqlConnection;
}
private static SqlBulkCopy GetSqlBulkCopy(SqlConnection sqlConnection, IDbContextTransaction transaction)
{
return transaction != null
? new SqlBulkCopy(sqlConnection, SqlBulkCopyOptions.Default, GetSqlTransaction(transaction))
: new SqlBulkCopy(sqlConnection);
}
}
from efcore.bulkextensions.
I have added this logic to the library and published new version to nugget.
You can test if it works for you, and if all is fine we can close this issue.
from efcore.bulkextensions.
Related Issues (20)
- BulkInsertOrUpdateAsync performance questions HOT 3
- Using EF Core 8 Complex Property results in error "Sequence contains no matching element"
- When using PostgreSQL to batch insert DateTime.Now data, there might be a time offset.
- BulkExtensions determines target DbContext incorrectly when calls to different database servers in parallel HOT 1
- VULN: CVE-2024-32655 in Npgsql dependency HOT 3
- BulkInsertOrUpdate doesn't return Identity when no insert or update happen
- not present in the dictionary.
- can BulkInsert support my custom ValueGenerator for Id column HOT 1
- BulkSaveChangesAsync leaves tracked entities in state of "Added" even after successful save. HOT 2
- Spatial support in SQLite for Point
- BulkRead: System.NotSupportedException: 'Specified method is not supported.'
- BulkInsert produces sytanx error on PostgreSQL (version 6.x) HOT 1
- Bulk operation fails when owned type has enum member on PostgreSQL
- `SharedTypeEntity` don't work with bulk operations HOT 1
- [MySql/MariaDb] Insert an boolean type has invalid result (Always insert true in a bit column) HOT 2
- Question : Set value on shadow properties "on save" HOT 1
- Unable to cast object of type 'System.Int64' to type 'System.Nullable`1[System.Int32]' HOT 1
- PostgreSQL Table-Per-Hierarchy Inheritance Is Not Working HOT 1
- Truncate method with custom table name HOT 1
- BulkInsertAsync is assigning negative values to inserted entries' primary keys HOT 9
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from efcore.bulkextensions.