文章目錄
C# 如何快速新增大量資料至 MySQL
最近在重現系統上遇到的狀況,初步懷疑是資料量過大,造成相關處理效能不佳,而連帶影響到系統後續運作,但這是初步懷疑,為了印證這個懷疑,首先需要產生大量資料,透過 C# 搭配 Bogus 產生假資料後再 insert 進 MySql 實在太耗費時間了,所以今天就來紀錄利用 MySqlBulkLoader 來快速 bulk insert 大量資料的方式。
MySqlBulkLoader 是 MySQL Connector/NET 提供的一個類別,可以快速將資料 insert 進 MySql MySqlBulkLoader 可以透過 csv 與 stream 將資料寫入 MySql
基本環境說明
- macOS Sonoma 14.2.1 (Apple M2 Pro)
- OrbStack Version 1.2.0 (16496)
- .NET SDK 8.0.100
- JetBrains Rider 2023.3.2
Container Images
- mysql:8.2
NuGet Library
- Bogus 35.0.1
- MySqlConnector 2.3.1
- BenchmarkDotNet 0.13.11
- CsvHelper 30.0.1
docker-compose.yml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersversion: '3.1' services: mysql: image: mysql:8.2 command: --default-authentication-plugin=mysql_native_password restart: always ports: - 3306:3306 environment: MYSQL_ROOT_PASSWORD: pass.123 MYSQL_DATABASE: test mysql DDL
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersCREATE TABLE orders ( id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, order_date DATE NOT NULL, product_id INT NOT NULL, order_type TINYINT NOT NULL, amount DECIMAL(12,6) NOT NULL, PRIMARY KEY (id) );
設定方式
C# 連線字串設定:需加上
AllowLoadLocalInfile=true;
語法:
1Server={ip or host};Port={port};Database={db name};Username={username};Password={password};Allow User Variables=true;AllowLoadLocalInfile=true;範例:
1Server=localhost;Port=3306;Database=test;Username=root;Password=pass.123;Allow User Variables=true;AllowLoadLocalInfile=true;"未設定錯誤
錯誤訊息
1234567Unhandled exception. System.NotSupportedException: To use MySqlBulkLoader.Local=true, set AllowLoadLocalInfile=true in the connection string. See https://fl.vu/mysql-load-dataat MySqlConnector.MySqlBulkLoader.LoadAsync(IOBehavior ioBehavior, CancellationToken cancellationToken) in /_/src/MySqlConnector/MySqlBulkLoader.cs:line 205at MySqlConnector.MySqlBulkLoader.Load() in /_/src/MySqlConnector/MySqlBulkLoader.cs:line 146at Program.<<Main>$>g__StoreToMysql|0_1(Order[] orders) in /Users/yowko.tsai/POCs/ClickHouse/DataGenerator/Program.cs:line 190at Program.<<Main>$>g__GenTestRecords|0_0(Int32 batchcount, Int32 batchsize) in /Users/yowko.tsai/POCs/ClickHouse/DataGenerator/Program.cs:line 127at Program.<Main>$(String[] args) in /Users/yowko.tsai/POCs/ClickHouse/DataGenerator/Program.cs:line 52at Program.<Main>(String[] args)錯誤截圖
MySql 設定啟用
local_infile
語法:
1SET GLOBAL local_infile = 1;未設定錯誤
錯誤訊息
12345678910111213Unhandled exception. MySqlConnector.MySqlException (0x80004005): Loading local data is disabled; this must be enabled on both the client and server sidesat MySqlConnector.Core.ServerSession.ReceiveReplyAsync(IOBehavior ioBehavior, CancellationToken cancellationToken) in /_/src/MySqlConnector/Core/ServerSession.cs:line 936at MySqlConnector.Core.ResultSet.ReadResultSetHeaderAsync(IOBehavior ioBehavior) in /_/src/MySqlConnector/Core/ResultSet.cs:line 37at MySqlConnector.MySqlDataReader.ActivateResultSet(CancellationToken cancellationToken) in /_/src/MySqlConnector/MySqlDataReader.cs:line 130at MySqlConnector.MySqlDataReader.InitAsync(CommandListPosition commandListPosition, ICommandPayloadCreator payloadCreator, IDictionary`2 cachedProcedures, IMySqlCommand command, CommandBehavior behavior, Activity activity, IOBehavior ioBehavior, CancellationToken cancellationToken) in /_/src/MySqlConnector/MySqlDataReader.cs:line 483at MySqlConnector.Core.CommandExecutor.ExecuteReaderAsync(CommandListPosition commandListPosition, ICommandPayloadCreator payloadCreator, CommandBehavior behavior, Activity activity, IOBehavior ioBehavior, CancellationToken cancellationToken) in /_/src/MySqlConnector/Core/CommandExecutor.cs:line 56at MySqlConnector.MySqlCommand.ExecuteNonQueryAsync(IOBehavior ioBehavior, CancellationToken cancellationToken) in /_/src/MySqlConnector/MySqlCommand.cs:line 309at MySqlConnector.MySqlBulkLoader.LoadAsync(IOBehavior ioBehavior, CancellationToken cancellationToken) in /_/src/MySqlConnector/MySqlBulkLoader.cs:line 212at MySqlConnector.MySqlBulkLoader.Load() in /_/src/MySqlConnector/MySqlBulkLoader.cs:line 146at Program.<<Main>$>g__StoreToMysql|0_1(Order[] orders) in /Users/yowko.tsai/POCs/ClickHouse/DataGenerator/Program.cs:line 190at Program.<<Main>$>g__GenTestRecords|0_0(Int32 batchcount, Int32 batchsize) in /Users/yowko.tsai/POCs/ClickHouse/DataGenerator/Program.cs:line 127at Program.<Main>$(String[] args) in /Users/yowko.tsai/POCs/ClickHouse/DataGenerator/Program.cs:line 52at Program.<Main>(String[] args)錯誤截圖
C# 程式碼
使用 csv
使用 csv 需要將資料寫入暫存檔案,再將暫存檔案路徑傳入
MySqlBulkLoader
,這邊使用CsvHelper
來產生 csv 檔案This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersvar filename = "./orders.csv"; using var writer = new StreamWriter(filename); { using var csv = new CsvWriter(writer, CultureInfo.InvariantCulture); csv.WriteRecords(GetOrders(BatchSize)); } var connectionString = "Server=localhost;Port=3306;Database=test;Username=root;Password=pass.123;Allow User Variables=true;AllowLoadLocalInfile=true;";// using var connection = new MySqlConnection(connectionString); { connection.Open(); var bl = new MySqlBulkLoader(connection) { TableName = "orders", FileName = filename, FieldTerminator = ",", LineTerminator = "\n", NumberOfLinesToSkip = 1, FieldQuotationOptional = true, Columns = { "order_date", "product_id", "order_type", "amount" } }; var inserted = bl.Load(); //Console.WriteLine(inserted + " rows inserted."); } 使用 stream
使用 stream 需要將資料寫入
MemoryStream
,再將MemoryStream
傳入MySqlBulkLoader
,可能 memory 佔用量會比較大,但不需要產生暫存檔案This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersusing var memoryStream = new MemoryStream(); using var streamWriter = new StreamWriter(memoryStream); using var csvWriter = new CsvWriter(streamWriter, CultureInfo.InvariantCulture); csvWriter.WriteRecords(GetOrders(BatchSize)); streamWriter.Flush(); memoryStream.Position = 0; var connectionString = "Server=localhost;Port=3306;Database=test;Username=root;Password=pass.123;Allow User Variables=true;AllowLoadLocalInfile=true;";// using var connection = new MySqlConnection(connectionString); { connection.Open(); var bl = new MySqlBulkLoader(connection) { TableName = "orders", SourceStream = memoryStream, FieldTerminator = ",", LineTerminator = "\n", NumberOfLinesToSkip = 1, FieldQuotationOptional = true, Columns = { "order_date", "product_id", "order_type", "amount" } }; var inserted = bl.Load(); //Console.WriteLine(inserted + " rows inserted."); }
完整程式碼
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters// See https://aka.ms/new-console-template for more information using System.Globalization; using BenchmarkDotNet.Attributes; using BenchmarkDotNet.Running; using Bogus; using CsvHelper; using CsvHelper.Configuration.Attributes; using MySqlConnector; Console.WriteLine("Hello, World!"); var summary = BenchmarkRunner.Run<MySqlService>(); [MemoryDiagnoser(true)] public class MySqlService { [Params(1000, 10000, 100000)] public int BatchSize { get; set; } [Benchmark] public void MySqlBulkLoaderCsv() { var filename = "./orders.csv"; using var writer = new StreamWriter(filename); { using var csv = new CsvWriter(writer, CultureInfo.InvariantCulture); csv.WriteRecords(GetOrders(BatchSize)); } var connectionString = "Server=localhost;Port=3306;Database=test;Username=root;Password=pass.123;Allow User Variables=true;AllowLoadLocalInfile=true;";// using var connection = new MySqlConnection(connectionString); { connection.Open(); var bl = new MySqlBulkLoader(connection) { TableName = "orders", FileName = filename, FieldTerminator = ",", LineTerminator = "\n", NumberOfLinesToSkip = 1, FieldQuotationOptional = true, Columns = { "order_date", "product_id", "order_type", "amount" } }; var inserted = bl.Load(); //Console.WriteLine(inserted + " rows inserted."); } } [Benchmark] public void MySqlBulkLoaderStream() { using var memoryStream = new MemoryStream(); using var streamWriter = new StreamWriter(memoryStream); using var csvWriter = new CsvWriter(streamWriter, CultureInfo.InvariantCulture); csvWriter.WriteRecords(GetOrders(BatchSize)); streamWriter.Flush(); memoryStream.Position = 0; var connectionString = "Server=localhost;Port=3306;Database=test;Username=root;Password=pass.123;Allow User Variables=true;AllowLoadLocalInfile=true;";// using var connection = new MySqlConnection(connectionString); { connection.Open(); var bl = new MySqlBulkLoader(connection) { TableName = "orders", SourceStream = memoryStream, FieldTerminator = ",", LineTerminator = "\n", NumberOfLinesToSkip = 1, FieldQuotationOptional = true, Columns = { "order_date", "product_id", "order_type", "amount" } }; var inserted = bl.Load(); //Console.WriteLine(inserted + " rows inserted."); } } [Benchmark] public void MySqlInsert() { var connectionString = "Server=localhost;Port=3306;Database=test;Username=root;Password=pass.123;Allow User Variables=true;"; using var connection = new MySqlConnection(connectionString); connection.Open(); try { var insertStatement = new MySqlCommand( "INSERT INTO orders (id, order_date, product_id, order_type, amount) VALUES (@id, @order_date, @product_id, @order_type, @amount)", connection); foreach (var order in GetOrders(BatchSize)) { insertStatement.Parameters.Clear(); insertStatement.Parameters.AddWithValue("@order_date", order.OrderDate); insertStatement.Parameters.AddWithValue("@product_id", order.ProductId); insertStatement.Parameters.AddWithValue("@order_type", order.OrderType); insertStatement.Parameters.AddWithValue("@amount", order.Amount); insertStatement.ExecuteNonQuery(); } } catch (Exception e) { Console.WriteLine(e); throw; } finally { // 關閉資料庫連線 connection.Close(); } } Order[] GetOrders(int BatchSize) { var startDate = new DateTime(2021, 01, 01, 00, 00, 00, DateTimeKind.Utc); var order = new Faker<Order>() .RuleFor(a => a.Id, f => f.Random.ULong()) .RuleFor(a => a.OrderDate, f => startDate.AddDays(f.Random.Number(0, 365 * 3))) .RuleFor(a => a.ProductId, f => f.Random.Number(1, 10000)) .RuleFor(a => a.OrderType, f => f.Random.SByte(1, 10)) .RuleFor(a => a.Amount, f => f.Random.Decimal(0M, 100000M)); return order.Generate(BatchSize).ToArray(); } } public class Order { [Ignore] public ulong Id { get; set; } public DateTime OrderDate { get; set; } public int ProductId { get; set; } public sbyte OrderType { get; set; } public decimal Amount { get; set; } }
心得
- client 的連線字串與 MySql server 都要額外設定才能使用
MySqlBulkLoader
- 相較於 MySql .NET library 沒有 bulk insert 速度提升非常多 (以我的測試來看從 1000 筆資料的 80 倍差距,到 100000 筆資料的 205 倍差距)
- csv 與 stream 兩種方式都可以使用,但 stream 在資料量較多時會快一些也不需要額外處理暫存檔,但會佔用較多記憶體
完整程式碼:GitHub:yowko/MySqlBulkInsert
參考資訊
文章作者 Yowko Tsai
上次更新 2023-12-22
授權合約
本部落格 (Yowko's Notes) 所有的文章內容(包含圖片),任何轉載行為,必須通知並獲本部落格作者 (Yowko Tsai) 的同意始得轉載,且轉載皆須註明出處與作者。
Yowko's Notes 由 Yowko Tsai 製作,以創用CC 姓名標示-非商業性-相同方式分享 3.0 台灣 授權條款 釋出。