Reboon Reboon - 2 years ago 329
SQL Question

C# fastest way to insert data into SQL database

I am receiving (streamed) data from an external source (over Lightstreamer) into my C# application.
My C# application receives data from the listener. The data from the listener are stored in a queue (ConcurrentQueue).
The queue is getting cleaned every 0.5 seconds with TryDequeue into a DataTable. The DataTable will then be copy into a SQL database using SqlBulkCopy.
The SQL database processes the newly data arrived from the staging table into the final table.
I currently receive around 300'000 rows per day (can increae within the next weeks strongly) and my goal is to stay under 1 second from the time I receive the data until they are available in the final SQL table.
Currently the maximum rows per seconds I have to process is around 50 rows.

Unfortunately, since receiving more and more data, my logic is getting slower in performance (still far under 1 second, but I wanna keep improving). The main bottleneck (so far) is the processing of the staging data (on the SQL database) into the final table.
In order to improve the performance, I would like to switch the staging table into a memory-optimized table. The final table is already a memory-optimized table so they will work together fine for sure.

My questions:

  1. Is there a way to use SqlBulkCopy (out of C#) with memory-optimized tables? (as far as I know there is no way yet)

  2. Any suggestions for the fastest way to write the received data from my C# application into the memory-optimized staging table?

EDIT (with solution):

After the comments/answers and performance evaluations I decided to give up the bulk insert and use SQLCommand to handover a IEnumerable with my data as table-valued parameter into a native compiled stored procedure to store the data directly in my memory-optimized final table (as well as a copy into the "staging" table which now serves as archive). Performance increased significantly (even I did not consider parallelizing the inserts yet (will be at a later stage)).

Here is part of the code:

Memory-optimized user-defined table type (to handover the data from C# into SQL (stored procedure):

CREATE TYPE [Staging].[CityIndexIntradayLivePrices] AS TABLE(
[CityIndexInstrumentID] [int] NOT NULL,
[CityIndexTimeStamp] [bigint] NOT NULL,
[BidPrice] [numeric](18, 8) NOT NULL,
[AskPrice] [numeric](18, 8) NOT NULL,
INDEX [IndexCityIndexIntradayLivePrices] NONCLUSTERED
[CityIndexInstrumentID] ASC,
[CityIndexTimeStamp] ASC,
[BidPrice] ASC,
[AskPrice] ASC

Native compiled stored procedures to insert the data into final table and staging (which serves as archive in this case):

create procedure [Staging].[spProcessCityIndexIntradayLivePricesStaging]
@ProcessingID int,
@CityIndexIntradayLivePrices Staging.CityIndexIntradayLivePrices readonly
with native_compilation, schemabinding, execute as owner
begin atomic
with (transaction isolation level=snapshot, language=N'us_english')

-- store prices

insert into TimeSeries.CityIndexIntradayLivePrices
select Objects.ObjectID,
from @CityIndexIntradayLivePrices CityIndexIntradayLivePricesStaging,
where Objects.CityIndexInstrumentID = CityIndexIntradayLivePricesStaging.CityIndexInstrumentID

-- store data in staging table

insert into Staging.CityIndexIntradayLivePricesStaging
select @ProcessingID,
from @CityIndexIntradayLivePrices


IEnumerable filled with the from the queue:

private static IEnumerable<SqlDataRecord> CreateSqlDataRecords()

// set columns (the sequence is important as the sequence will be accordingly to the sequence of columns in the table-value parameter)

SqlMetaData MetaDataCol1;
SqlMetaData MetaDataCol2;
SqlMetaData MetaDataCol3;
SqlMetaData MetaDataCol4;

MetaDataCol1 = new SqlMetaData("CityIndexInstrumentID", SqlDbType.Int);
MetaDataCol2 = new SqlMetaData("CityIndexTimeStamp", SqlDbType.BigInt);
MetaDataCol3 = new SqlMetaData("BidPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale
MetaDataCol4 = new SqlMetaData("AskPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale

// define sql data record with the columns

SqlDataRecord DataRecord = new SqlDataRecord(new SqlMetaData[] { MetaDataCol1, MetaDataCol2, MetaDataCol3, MetaDataCol4 });

// remove each price row from queue and add it to the sql data record

LightstreamerAPI.PriceDTO PriceDTO = new LightstreamerAPI.PriceDTO();

while (IntradayQuotesQueue.TryDequeue(out PriceDTO))

DataRecord.SetInt32(0, PriceDTO.MarketID); // city index market id
DataRecord.SetInt64(1, Convert.ToInt64((PriceDTO.TickDate.Replace(@"\/Date(", "")).Replace(@")\/", ""))); // @ is used to avoid problem with / as escape sequence)
DataRecord.SetDecimal(2, PriceDTO.Bid); // bid price
DataRecord.SetDecimal(3, PriceDTO.Offer); // ask price

yield return DataRecord;



Handling the data every 0.5 seconds:

public static void ChildThreadIntradayQuotesHandler(Int32 CityIndexInterfaceProcessingID)


// open new sql connection

using (SqlConnection TimeSeriesDatabaseSQLConnection = new SqlConnection("Data Source=WINDOWS-BQFV5V4\\DREAMSTEPS;Initial Catalog=TimeSeries;Integrated Security=SSPI;MultipleActiveResultSets=false"))

// open connection


// endless loop to keep thread alive


// ensure queue has rows to process (otherwise no need to continue)

if(IntradayQuotesQueue.Count > 0)

// define stored procedure for sql command

SqlCommand InsertCommand = new SqlCommand("Staging.spProcessCityIndexIntradayLivePricesStaging", TimeSeriesDatabaseSQLConnection);

// set command type to stored procedure

InsertCommand.CommandType = CommandType.StoredProcedure;

// define sql parameters (table-value parameter gets data from CreateSqlDataRecords())

SqlParameter ParameterCityIndexIntradayLivePrices = InsertCommand.Parameters.AddWithValue("@CityIndexIntradayLivePrices", CreateSqlDataRecords()); // table-valued parameter
SqlParameter ParameterProcessingID = InsertCommand.Parameters.AddWithValue("@ProcessingID", CityIndexInterfaceProcessingID); // processing id parameter

// set sql db type to structured for table-value paramter (structured = special data type for specifying structured data contained in table-valued parameters)

ParameterCityIndexIntradayLivePrices.SqlDbType = SqlDbType.Structured;

// execute stored procedure



// wait 0.5 seconds




catch (Exception e)

// handle error (standard error messages and update processing)

ThreadErrorHandling(CityIndexInterfaceProcessingID, "ChildThreadIntradayQuotesHandler (handler stopped now)", e);



Answer Source

Use SQL Server 2016 (it's not RTM yet, but it's already much better than 2014 when it comes to memory-optimized tables). Then use either a memory-optimized table variable or just blast a whole lot of native stored procedure calls in a transaction, each doing one insert, depending on what's faster in your scenario (this varies). A few things to watch out for:

  • Doing multiple inserts in one transaction is vital to save on network roundtrips. While in-memory operations are very fast, SQL Server still needs to confirm every operation.
  • Depending on how you're producing data, you may find that parallelizing the inserts can speed things up (don't overdo it; you'll quickly hit the saturation point). Don't try to be very clever yourself here; leverage async/await and/or Parallel.ForEach.
  • If you're passing a table-valued parameter, the easiest way of doing it is to pass a DataTable as the parameter value, but this is not the most efficient way of doing it -- that would be passing an IEnumerable<SqlDataRecord>. You can use an iterator method to generate the values, so only a constant amount of memory is allocated.

You'll have to experiment a bit to find the optimal way of passing through data; this depends a lot on the size of your data and how you're getting it.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download