如何在 kiba etl 脚本 (kiba gem) 中进行聚合转换?
How to do a aggregation transformation in a kiba etl script (kiba gem)?
我想写一个 Kiba Etl 脚本,它有一个从 CSV 到 Destination CSV 的源和一个转换规则列表,其中第二个转换器是一个聚合,其中操作如 select name,sum (欧元)按名称分组
Kiba ETL 脚本文件
source CsvSource, 'users.csv', col_sep: ';', headers: true, header_converters: :symbol
transform VerifyFieldsPresence, [:name, :euro]
transform AggregateFields, { sum: :euro, group_by: :name}
transform RenameField,from: :euro, to: :total_amount
destination CsvDestination, 'result.csv', [:name, :total_amount]
users.csv
date;euro;name
7/3/2015;10;Jack
7/3/2015;85;Jill
8/3/2015;6;Jack
8/3/2015;12;Jill
9/3/2015;99;Mack
result.csv(预期结果)
total_amount;name
16;Jack
97;Jill
99;Mack
由于 etl 转换器一次在一行上一个接一个地执行,但是我的第二个转换器行为取决于整个行集合,我无法在传递给转换的 class 中访问它方法。
transform AggregateFields, { sum: :euro, group_by: :name }
是否可以使用 kiba 实现此行为 gem
提前致谢
编辑:现在是 2020 年,Kiba ETL v3 包含一个更好的方法来做到这一点。查看这篇文章 https://thibautbarrere.com/2020/03/05/new-in-kiba-etl-v3 了解所有相关信息。
Kiba 作者来了!您可以通过多种不同的方式实现这一点,主要取决于数据大小和您的实际需求。这里有几种可能性。
在您的 Kiba 脚本中使用变量进行聚合
require 'awesome_print'
transform do |r|
r[:amount] = BigDecimal.new(r[:amount])
r
end
total_amounts = Hash.new(0)
transform do |r|
total_amounts[r[:name]] += r[:amount]
r
end
post_process do
# pretty print here, but you could save to a CSV too
ap total_amounts
end
这是最简单的方法,但非常灵活。
虽然它会将您的聚合保留在内存中,因此这可能足够好,也可能不够好,具体取决于您的情况。请注意,目前 Kiba 是单线程的(但 "Kiba Pro" 将是多线程的),因此暂时不需要为聚合添加锁或使用线程安全结构。
从 post_process 块调用 TextQL
另一种快速简便的聚合方法是先生成一个非聚合的 CSV 文件,然后利用 TextQl 实际进行聚合,如下所示:
destination CsvSource, 'non-aggregated-output.csv', [:name, :amount]
post_process do
query = <<SQL
select
name,
/* apparently sqlite has reduced precision, round to 2 for now */
round(sum(amount), 2) as total_amount
from tbl group by name
SQL
textql('non-aggregated-output.csv', query, 'aggregated-output.csv')
end
定义了以下助手:
def system!(cmd)
raise "Failed to run command #{command}" unless system(command)
end
def textql(source_file, query, output_file)
system! "cat #{source_file} | textql -header -output-header=true -sql \"#{query}\" > #{output_file}"
# this one uses csvfix to pretty print the table
system! "cat #{output_file} | csvfix ascii_table"
end
计算时要注意精度。
正在写入内存中聚合目标
在这里可以使用的一个有用的技巧是用 class 包装给定的目的地来进行聚合。它可能是这样的:
class InMemoryAggregate
def initialize(sum:, group_by:, destination:)
@aggregate = Hash.new(0)
@sum = sum
@group_by = group_by
# this relies a bit on the internals of Kiba, but not too much
@destination = destination.shift.new(*destination)
end
def write(row)
# do not write, but count here instead
@aggregate[row[@group_by]] += row[@sum]
end
def close
# use close to actually do the writing
@aggregate.each do |k,v|
# reformat BigDecimal additions here
value = '%0.2f' % v
@destination.write(@group_by => k, @sum => value)
end
@destination.close
end
end
你可以这样使用:
# convert your string into an actual number
transform do |r|
r[:amount] = BigDecimal.new(r[:amount])
r
end
destination CsvDestination, 'non-aggregated.csv', [:name, :amount]
destination InMemoryAggregate,
sum: :amount, group_by: :name,
destination: [
CsvDestination, 'aggregated.csv', [:name, :amount]
]
post_process do
system!("cat aggregated.csv | csvfix ascii_table")
end
此版本的优点在于您可以将聚合器重复用于不同的目的地(如数据库或其他任何目的地)。
请注意,这会将所有聚合保留在内存中,就像第一个版本一样。
插入具有聚合功能的商店
另一种方法(如果您的数据量很大,则特别有用)是将生成的数据发送到能够为您聚合数据的对象中。它可以是常规 SQL 数据库、Redis 或任何更奇特的东西,然后您可以根据需要查询它们。
正如我所说,实施将在很大程度上取决于您的实际需求。希望您能在这里找到适合您的东西!
我想写一个 Kiba Etl 脚本,它有一个从 CSV 到 Destination CSV 的源和一个转换规则列表,其中第二个转换器是一个聚合,其中操作如 select name,sum (欧元)按名称分组
Kiba ETL 脚本文件
source CsvSource, 'users.csv', col_sep: ';', headers: true, header_converters: :symbol
transform VerifyFieldsPresence, [:name, :euro]
transform AggregateFields, { sum: :euro, group_by: :name}
transform RenameField,from: :euro, to: :total_amount
destination CsvDestination, 'result.csv', [:name, :total_amount]
users.csv
date;euro;name
7/3/2015;10;Jack
7/3/2015;85;Jill
8/3/2015;6;Jack
8/3/2015;12;Jill
9/3/2015;99;Mack
result.csv(预期结果)
total_amount;name
16;Jack
97;Jill
99;Mack
由于 etl 转换器一次在一行上一个接一个地执行,但是我的第二个转换器行为取决于整个行集合,我无法在传递给转换的 class 中访问它方法。
transform AggregateFields, { sum: :euro, group_by: :name }
是否可以使用 kiba 实现此行为 gem
提前致谢
编辑:现在是 2020 年,Kiba ETL v3 包含一个更好的方法来做到这一点。查看这篇文章 https://thibautbarrere.com/2020/03/05/new-in-kiba-etl-v3 了解所有相关信息。
Kiba 作者来了!您可以通过多种不同的方式实现这一点,主要取决于数据大小和您的实际需求。这里有几种可能性。
在您的 Kiba 脚本中使用变量进行聚合
require 'awesome_print'
transform do |r|
r[:amount] = BigDecimal.new(r[:amount])
r
end
total_amounts = Hash.new(0)
transform do |r|
total_amounts[r[:name]] += r[:amount]
r
end
post_process do
# pretty print here, but you could save to a CSV too
ap total_amounts
end
这是最简单的方法,但非常灵活。
虽然它会将您的聚合保留在内存中,因此这可能足够好,也可能不够好,具体取决于您的情况。请注意,目前 Kiba 是单线程的(但 "Kiba Pro" 将是多线程的),因此暂时不需要为聚合添加锁或使用线程安全结构。
从 post_process 块调用 TextQL
另一种快速简便的聚合方法是先生成一个非聚合的 CSV 文件,然后利用 TextQl 实际进行聚合,如下所示:
destination CsvSource, 'non-aggregated-output.csv', [:name, :amount]
post_process do
query = <<SQL
select
name,
/* apparently sqlite has reduced precision, round to 2 for now */
round(sum(amount), 2) as total_amount
from tbl group by name
SQL
textql('non-aggregated-output.csv', query, 'aggregated-output.csv')
end
定义了以下助手:
def system!(cmd)
raise "Failed to run command #{command}" unless system(command)
end
def textql(source_file, query, output_file)
system! "cat #{source_file} | textql -header -output-header=true -sql \"#{query}\" > #{output_file}"
# this one uses csvfix to pretty print the table
system! "cat #{output_file} | csvfix ascii_table"
end
计算时要注意精度。
正在写入内存中聚合目标
在这里可以使用的一个有用的技巧是用 class 包装给定的目的地来进行聚合。它可能是这样的:
class InMemoryAggregate
def initialize(sum:, group_by:, destination:)
@aggregate = Hash.new(0)
@sum = sum
@group_by = group_by
# this relies a bit on the internals of Kiba, but not too much
@destination = destination.shift.new(*destination)
end
def write(row)
# do not write, but count here instead
@aggregate[row[@group_by]] += row[@sum]
end
def close
# use close to actually do the writing
@aggregate.each do |k,v|
# reformat BigDecimal additions here
value = '%0.2f' % v
@destination.write(@group_by => k, @sum => value)
end
@destination.close
end
end
你可以这样使用:
# convert your string into an actual number
transform do |r|
r[:amount] = BigDecimal.new(r[:amount])
r
end
destination CsvDestination, 'non-aggregated.csv', [:name, :amount]
destination InMemoryAggregate,
sum: :amount, group_by: :name,
destination: [
CsvDestination, 'aggregated.csv', [:name, :amount]
]
post_process do
system!("cat aggregated.csv | csvfix ascii_table")
end
此版本的优点在于您可以将聚合器重复用于不同的目的地(如数据库或其他任何目的地)。
请注意,这会将所有聚合保留在内存中,就像第一个版本一样。
插入具有聚合功能的商店
另一种方法(如果您的数据量很大,则特别有用)是将生成的数据发送到能够为您聚合数据的对象中。它可以是常规 SQL 数据库、Redis 或任何更奇特的东西,然后您可以根据需要查询它们。
正如我所说,实施将在很大程度上取决于您的实际需求。希望您能在这里找到适合您的东西!