如何使用 C# 更快地将 400 万条记录从 Oracle 插入到 Elasticsearch table?
How to insert 4 million records from Oracle to Elasticsearch table faster using C#?
我有以下用 C# 编写的代码,但据此,将数据从 Oracle 数据库迁移到 Elasticsearch 需要 4-5 天。我正在以 100 条为一组插入记录。有没有其他方法可以更快地迁移 400 万条记录(如果可能,可能不到一天)?
public static void Selection()
{
for(int i = 1; i < 4000000; i += 1000)
{
for(int j = i; j < (i+1000); j += 100)
{
OracleCommand cmd = new OracleCommand(BuildQuery(j),
oracle_connection);
OracleDataReader reader = cmd.ExecuteReader();
List<Record> list=CreateRecordList(reader);
insert(list);
}
}
}
private static List<Record> CreateRecordList(OracleDataReader reader)
{
List<Record> l = new List<Record>();
string[] str = new string[7];
try
{
while (reader.Read())
{
for (int i = 0; i < 7; i++)
{
str[i] = reader[i].ToString();
}
Record r = new Record(str[0], str[1], str[2], str[3],
str[4], str[5], str[6]);
l.Add(r);
}
}
catch (Exception er)
{
string msg = er.Message;
}
return l;
}
private static string BuildQuery(int from)
{
int to = from + change - 1;
StringBuilder builder = new StringBuilder();
builder.AppendLine(@"select * from");
builder.AppendLine("(");
builder.AppendLine("select FIELD_1, FIELD_2,
FIELD_3, FIELD_4, FIELD_5, FIELD_6,
FIELD_7, ");
builder.Append(" row_number() over(order by FIELD_1)
rn");
builder.AppendLine(" from tablename");
builder.AppendLine(")");
builder.AppendLine(string.Format("where rn between {0} and {1}",
from, to));
builder.AppendLine("order by rn");
return builder.ToString();
}
public static void insert(List<Record> l)
{
try
{
foreach(Record r in l)
client.Index<Record>(r, "index", "type");
}
catch (Exception er)
{
string msg = er.Message;
}
}
ROW_NUMBER()
函数会对性能产生负面影响,而您 运行 它已经上千次了。您已经在使用 OracleDataReader
-- 它不会一次将所有四百万行拉到您的机器上,它基本上是一次流式传输一两行。
这必须在几分钟或几小时内完成,而不是几天 -- 我们有几个进程以类似的方式在 Sybase 和 SQL 服务器之间移动数百万条记录,并且只需要不到五分钟的时间。
也许试一试:
OracleCommand cmd = new OracleCommand("SELECT ... FROM TableName", oracle_connection);
int batchSize = 500;
using (OracleDataReader reader = cmd.ExecuteReader())
{
List<Record> l = new List<Record>(batchSize);
string[] str = new string[7];
int currentRow = 0;
while (reader.Read())
{
for (int i = 0; i < 7; i++)
{
str[i] = reader[i].ToString();
}
l.Add(new Record(str[0], str[1], str[2], str[3], str[4], str[5], str[6]));
// Commit every time batchSize records have been read
if (++currentRow == batchSize)
{
Commit(l);
l.Clear();
currentRow = 0;
}
}
// commit remaining records
Commit(l);
}
下面是 Commit
的样子:
public void Commit(IEnumerable<Record> records)
{
// TODO: Use ES's BULK features, I don't know the exact syntax
client.IndexMany<Record>(records, "index", "type");
// client.Bulk(b => b.IndexMany(records))... something like this
}
但是您没有以 100 个为单位插入
最后你一次插入一个
(这甚至可能不是插入一个的正确代码)
foreach(Record r in l)
client.Index<Record>(r, "index", "type");
如果一次插入一行,那么所有那些读取时的 girations 什么都不做
你只是在获得下一个 batch
时引入了延迟
读取(几乎)总是比写入快
OracleCommand cmd = new OracleCommand(BuildQuery(all), oracle_connection);
OracleDataReader reader = cmd.ExecuteReader();
while (reader.Read())
{
client.Index<Record>(new Record(reader.GetSting(0),
reader.GetSting(1), reader.GetSting(2), reader.GetSting(3),
reader.GetSting(4), reader.GetSting(5), reader.GetSting(6),
"index", "type");
}
reader.Close();
如果您想并行读写,可以使用 BlockingCollection
但是使用最大大小读取不会比写入提前太多
我有以下用 C# 编写的代码,但据此,将数据从 Oracle 数据库迁移到 Elasticsearch 需要 4-5 天。我正在以 100 条为一组插入记录。有没有其他方法可以更快地迁移 400 万条记录(如果可能,可能不到一天)?
public static void Selection()
{
for(int i = 1; i < 4000000; i += 1000)
{
for(int j = i; j < (i+1000); j += 100)
{
OracleCommand cmd = new OracleCommand(BuildQuery(j),
oracle_connection);
OracleDataReader reader = cmd.ExecuteReader();
List<Record> list=CreateRecordList(reader);
insert(list);
}
}
}
private static List<Record> CreateRecordList(OracleDataReader reader)
{
List<Record> l = new List<Record>();
string[] str = new string[7];
try
{
while (reader.Read())
{
for (int i = 0; i < 7; i++)
{
str[i] = reader[i].ToString();
}
Record r = new Record(str[0], str[1], str[2], str[3],
str[4], str[5], str[6]);
l.Add(r);
}
}
catch (Exception er)
{
string msg = er.Message;
}
return l;
}
private static string BuildQuery(int from)
{
int to = from + change - 1;
StringBuilder builder = new StringBuilder();
builder.AppendLine(@"select * from");
builder.AppendLine("(");
builder.AppendLine("select FIELD_1, FIELD_2,
FIELD_3, FIELD_4, FIELD_5, FIELD_6,
FIELD_7, ");
builder.Append(" row_number() over(order by FIELD_1)
rn");
builder.AppendLine(" from tablename");
builder.AppendLine(")");
builder.AppendLine(string.Format("where rn between {0} and {1}",
from, to));
builder.AppendLine("order by rn");
return builder.ToString();
}
public static void insert(List<Record> l)
{
try
{
foreach(Record r in l)
client.Index<Record>(r, "index", "type");
}
catch (Exception er)
{
string msg = er.Message;
}
}
ROW_NUMBER()
函数会对性能产生负面影响,而您 运行 它已经上千次了。您已经在使用 OracleDataReader
-- 它不会一次将所有四百万行拉到您的机器上,它基本上是一次流式传输一两行。
这必须在几分钟或几小时内完成,而不是几天 -- 我们有几个进程以类似的方式在 Sybase 和 SQL 服务器之间移动数百万条记录,并且只需要不到五分钟的时间。
也许试一试:
OracleCommand cmd = new OracleCommand("SELECT ... FROM TableName", oracle_connection);
int batchSize = 500;
using (OracleDataReader reader = cmd.ExecuteReader())
{
List<Record> l = new List<Record>(batchSize);
string[] str = new string[7];
int currentRow = 0;
while (reader.Read())
{
for (int i = 0; i < 7; i++)
{
str[i] = reader[i].ToString();
}
l.Add(new Record(str[0], str[1], str[2], str[3], str[4], str[5], str[6]));
// Commit every time batchSize records have been read
if (++currentRow == batchSize)
{
Commit(l);
l.Clear();
currentRow = 0;
}
}
// commit remaining records
Commit(l);
}
下面是 Commit
的样子:
public void Commit(IEnumerable<Record> records)
{
// TODO: Use ES's BULK features, I don't know the exact syntax
client.IndexMany<Record>(records, "index", "type");
// client.Bulk(b => b.IndexMany(records))... something like this
}
但是您没有以 100 个为单位插入
最后你一次插入一个
(这甚至可能不是插入一个的正确代码)
foreach(Record r in l)
client.Index<Record>(r, "index", "type");
如果一次插入一行,那么所有那些读取时的 girations 什么都不做
你只是在获得下一个 batch
时引入了延迟
读取(几乎)总是比写入快
OracleCommand cmd = new OracleCommand(BuildQuery(all), oracle_connection);
OracleDataReader reader = cmd.ExecuteReader();
while (reader.Read())
{
client.Index<Record>(new Record(reader.GetSting(0),
reader.GetSting(1), reader.GetSting(2), reader.GetSting(3),
reader.GetSting(4), reader.GetSting(5), reader.GetSting(6),
"index", "type");
}
reader.Close();
如果您想并行读写,可以使用 BlockingCollection
但是使用最大大小读取不会比写入提前太多