你的位置:首页 > 数据库

[数据库][Oracle] Bulk Insert Data


命名空间:Oracle.DataAccess.Client

组件:Oracle.DataAccess.dll(2.112.1.0)

ODP.NET 版本:ODP.NET for .NET Framework 2.0 或 ODP.NET for .NET Framework 4

工具:Microsoft Visual Studio Ultimate 2013 + Oracle SQL Developer 1.5.5 + Oracle Database 11g Enterprise Edition 11.2.0.1.0(32位) + TNS for 32-bit Windows 11.2.0.1.0

 

方式一:ArrayBind

当插入一条数据时,SQL语句如下:

INSERT INTO table_name VALUES (:col1, :col2, :col3, :col4, :col5)

 1 public void InsertDataRow(Dictionary<string, object> dataRow) 2 { 3 	StringBuilder sbCmdText = new StringBuilder(); 4 	sbCmdText.AppendFormat("INSERT INTO {0}(", m_TableName); 5 	sbCmdText.Append(string.Join(",", dataRow.Keys.ToArray())); 6 	sbCmdText.Append(") VALUES ("); 7 	sbCmdText.Append(":" + string.Join(",:", dataRow.Keys.ToArray())); 8 	sbCmdText.Append(")"); 9  10 	using (OracleConnection conn = new OracleConnection()) 11 	{ 12 		using (OracleCommand cmd = conn.CreateCommand()) 13 		{ 14 			cmd.CommandType = CommandType.Text; 15 			cmd.CommandText = sbCmdText.ToString(); 16 			OracleParameter parameter = null; 17 			OracleDbType dbType = OracleDbType.Object; 18 			foreach (string colName in dataRow.Keys) 19 			{ 20 				dbType = GetOracleDbType(dataRow[colName]); 21 				parameter = new OracleParameter(colName, dbType); 22 				parameter.Direction = ParameterDirection.Input; 23 				parameter.OracleDbTypeEx = dbType; 24 				parameter.Value = dataRow[colName]; 25 				cmd.Parameters.Add(parameter); 26 			} 27 			conn.Open(); 28 			int result = cmd.ExecuteNonQuery(); 29 		} 30 	} 31 }

此时,每一个 OracleParameter 的 Value 值都赋予单个字段的 一个具体值,这种也是最为传统的插入数据的方法。

Oracle V6 中 OCI 编程接口加入了数组接口特性。

当采用 ArrayBind 时,OraleParameter 的 Value 值则是赋予单个字段的 一个数组,即多条数据的该字段组合成的一个数组。此时 Oracle 仅需要执行一次 SQL 语句,即可在内存中批量解析并导入数据,减少程序与数据库之间来回的操作,其优点就是数据导入的总体时间明显减少,尤其是进程占用CPU的时间。

如果数据源是 DataTable 类型,首先把 DataTable 数据源,转换成 object[][] 类型,然后绑定 OracleParameter 的 Value 值为对应字段的一个 Object[] 数组即可;参考代码如下:

 1 /// <summary> 2 /// 批量插入大数据量 3 /// </summary> 4 /// <param name="columnData">列名-列数据字典</param> 5 /// <param name="dataCount">数据量</param> 6 /// <returns>插入数据量</returns> 7 public int InsertBigData(Dictionary<string, object> columnData, int dataCount) 8 { 9 	int result = 0; 10 	if (columnData == null || columnData.Count < 1) 11 	{ 12 		return result; 13 	} 14 	string[] colHeaders = columnData.Keys.ToArray(); 15 	StringBuilder sbCmdText = new StringBuilder(); 16 	if (columnData.Count > 0) 17 	{ 18 		// 拼接INSERT的SQL语句 19 		sbCmdText.AppendFormat("INSERT INTO {0}(", m_TableName); 20 		sbCmdText.Append(string.Join(",", colHeaders)); 21 		sbCmdText.Append(") VALUES ("); 22 		sbCmdText.Append(m_ParameterPrefix + string.Join("," + m_ParameterPrefix, colHeaders)); 23 		sbCmdText.Append(")"); 24 		OracleConnection connection = null; 25 		try 26 		{ 27 			connection = new OracleConnection(GetConnectionString()); 28 			using (OracleCommand command = connection.CreateCommand()) 29 			{ 30 				command.ArrayBindCount = dataCount; 31 				command.BindByName = true; 32 				command.CommandType = CommandType.Text; 33 				command.CommandText = sbCmdText.ToString(); 34 				command.CommandTimeout = 1800; 35 				OracleParameter parameter; 36 				OracleDbType dbType = OracleDbType.Object; 37 				foreach (string colName in colHeaders) 38 				{ 39 					dbType = GetOracleDbType(columnData[colName]); 40 					parameter = new OracleParameter(colName, dbType); 41 					parameter.Direction = ParameterDirection.Input; 42 					parameter.OracleDbTypeEx = dbType; 43 					parameter.Value = columnData[colName]; 44 					command.Parameters.Add(parameter); 45 				} 46 				connection.Open(); 47 				OracleTransaction trans = connection.BeginTransaction(); 48 				try 49 				{ 50 					command.Transaction = trans; 51 					result = command.ExecuteNonQuery(); 52 					trans.Commit(); 53 				} 54 				catch (Exception ex) 55 				{ 56 					trans.Rollback(); 57 					throw ex; 58 				} 59 			} 60 		} 61 		finally 62 		{ 63 			if (connection != null) 64 			{ 65 				connection.Close(); 66 				connection.Dispose(); 67 			} 68 			GC.Collect(); 69 			GC.WaitForFullGCComplete(); 70 		} 71 	} 72 	return result; 73 }

 1 /// <summary> 2 /// 根据数据类型获取OracleDbType 3 /// </summary> 4 /// <param name="value">数据</param> 5 /// <returns>数据的Oracle类型</returns> 6 private static OracleDbType GetOracleDbType(object value) 7 { 8 	OracleDbType dataType = OracleDbType.Object; 9 	if (value is string[]) 10 	{ 11 		dataType = OracleDbType.Varchar2; 12 	} 13 	else if (value is DateTime[]) 14 	{ 15 		dataType = OracleDbType.TimeStamp; 16 	} 17 	else if (value is int[] || value is short[]) 18 	{ 19 		dataType = OracleDbType.Int32; 20 	} 21 	else if (value is long[]) 22 	{ 23 		dataType = OracleDbType.Int64; 24 	} 25 	else if (value is decimal[] || value is double[] || value is float[]) 26 	{ 27 		dataType = OracleDbType.Decimal; 28 	} 29 	else if (value is Guid[]) 30 	{ 31 		dataType = OracleDbType.Varchar2; 32 	} 33 	else if (value is bool[] || value is Boolean[]) 34 	{ 35 		dataType = OracleDbType.Byte; 36 	} 37 	else if (value is byte[]) 38 	{ 39 		dataType = OracleDbType.Blob; 40 	} 41 	else if (value is char[]) 42 	{ 43 		dataType = OracleDbType.Char; 44 	} 45 	return dataType; 46 }

GetOracleDbType
说明:如果采用分次(每次1万数据)执行 InsertBigData 方法,速度反而比一次性执行 InsertBigData 方法慢,详见下面测试结果;

测试结果:

无索引,数据类型:4列NVARCHAR2,2列NUMBER

30+万(7.36M):一次性导入用时 15:623,每次10000导入用时 

60+万(14.6M):一次性导入用时 28:207,每次10000导入用时 1:2:300

100+万(24.9M):一次性导入报如下异常

image

此时实际上从资源监视器上可以得知仍有可用内存,但是仍旧报 OutOfMemoryException,所以猜测应该是一个 bug;

如果每次10000导入用时 2:9:252

如果每次50000导入用时 58:101

附加 InsertBigData 方法使用示例:

 1 // 每10000数据导入一次 2 Dictionary<string, object> columnsData = new Dictionary<string, object>(); 3 int dataCount = m_SourceDataTable.Rows.Count; 4 int times = dataCount / 10000 + (dataCount % 10000 == 0 ? 0 : 1); 5 for (int i = 0; i < times; i++) 6 { 7 	int startIndex = i * 10000; 8 	int endIndex = (i + 1) * 10000; 9 	endIndex = endIndex > dataCount ? dataCount : endIndex; 10 	int currDataCount = endIndex - startIndex; 11 	columnsData.Add("COL1", new string[currDataCount]); 12 	columnsData.Add("COL2", new string[currDataCount]); 13 	columnsData.Add("COL3", new decimal[currDataCount]); 14 	columnsData.Add("COL4", new string[currDataCount]); 15 	columnsData.Add("COL5", new decimal[currDataCount]); 16 	columnsData.Add("COL6", new string[currDataCount]); 17 	for (int rowIndex = startIndex; rowIndex < endIndex; rowIndex++) 18 	{ 19 		int dicRowIndex = rowIndex - startIndex;// 列数据行索引 20 		foreach (string colName in columnsData.Keys) 21 		{ 22 			object cell = m_SourceDataTable.Rows[rowIndex][colName]; 23 			string cellStr = (cell + "").TrimEnd(new char[] { '\0', ' ' }); 24 			if (colName == "COL3" || colName == "COL5") 25 			{ 26 				decimal value = 0; 27 				decimal.TryParse(cellStr, out value); 28 				((decimal[])columnsData[colName])[dicRowIndex] = value; 29 			} 30 			else 31 			{ 32 				((string[])columnsData[colName])[dicRowIndex] = cellStr; 33 			} 34 		} 35 	} 36 	m_DAL.InsertBigData(columnsData, currDataCount); 37  38 	columnsData.Clear(); 39 	GC.Collect(); 40 	GC.WaitForFullGCComplete(); 41 }

View Code

方式二:OracleBulkCopy

说明:

1. OracleBulkCopy 采用 direct path 方式导入;

2. 不支持 transaction,无法 Rollback;

3. 如果该表存在触发器时,无法使用 OracleBulkCopy(报异常信息 Oracle Error: ORA-26086),除非先禁用该表的所有触发器;

4. 过程中会自动启用 NOT NULL、UNIQUE 和 PRIMARY KEY 三种约束,其中 NOT NULL 约束在列数组绑定时验证,任何违反 NOT NULL 约束条件的行数据都会舍弃;UNIQUE 约束是在导入完成后重建索引时验证,但是在 bulk copy 时,允许违反索引约束,并在完成后将索引设置成禁用(UNUSABLE)状态;而且,如果索引一开始状态就是禁用(UNUSABLE)状态时,OracleBulkCopy 是会报错的。

参考代码如下:

 1 /// <summary> 2 /// 批量插入数据 3 /// 该方法需要禁用该表所有触发器,并且插入的数据如果为空,是不会采用默认值 4 /// </summary> 5 /// <param name="table">数据表</param> 6 /// <param name="targetTableName">数据库目标表名</param> 7 /// <returns></returns> 8 public bool InsertBulkData(DataTable table, string targetTableName) 9 { 10 	bool result = false; 11 	string connStr = GetConnectionString(); 12 	using (OracleConnection connection = new OracleConnection(connStr)) 13 	{ 14 		using (OracleBulkCopy bulkCopy = new OracleBulkCopy(connStr, OracleBulkCopyOptions.Default)) 15 		{ 16 			if (table != null && table.Rows.Count > 0) 17 			{ 18 				bulkCopy.DestinationTableName = targetTableName; 19 				for (int i = 0; i < table.Columns.Count; i++) 20 				{ 21 					string col = table.Columns[i].ColumnName; 22 					bulkCopy.ColumnMappings.Add(col, col); 23 				} 24 				connection.Open(); 25 				bulkCopy.WriteToServer(table); 26 				result = true; 27 			} 28 			bulkCopy.Close(); 29 			bulkCopy.Dispose(); 30 		} 31 	} 32  33 	return result; 34 }

测试结果:

数据类型:4列NVARCHAR2,2列NUMBER

30+万(7.36M):用时 14:590

60+万(14.6M):用时 28:28

1048576(24.9M):用时 52:971

附加,禁用表的所有外键SQL:

ALTER TABLE table_name DISABLE ALL TRIGGERS

总结

1、在30+万和60+万数据时,ArrayBind一次性导入和OracleBulkCopy时间相差不是很大,但是ArrayBind方式一般都需要转换数据形式,占用了一些时间,而 OracleBulkCopy 则只需要简单处理一下 DataTable 数据源即可导入;

2、当数据量达到100+万时,ArrayBind很容易出现内存不足异常,此时只能采用分批次执行导入,根据测试结果可知,次数越少,速度越快;而采用 OracleBulkCopy 方式则很少出现内存不足现象,由此可见 OracleBulkCopy 占用内存比 ArrayBind 方式少;

参考资料:

1、ArrayBind http://www.oracle.com/technetwork/issue-archive/2009/09-sep/o59odpnet-085168.html

2、ArrayBind http://www.soaspx.com/dotnet/csharp/csharp_20130911_10501.html

3、Oracle数据导入方法 http://dbanotes.net/Oracle/All_About_Oracle_Data_Loading.htm

4、介绍OracleBulkCopy类 https://docs.oracle.com/cd/E11882_01/win.112/e23174/OracleBulkCopyClass.htm#ODPNT7446

5、http://dba.stackexchange.com/questions/7287/what-specifically-does-oraclebulkcopy-do-and-how-can-i-optimize-its-performance