使用 Apache Nifi 修改 csv
Modify csv with Apache Nifi
我从 FetchFTP
处理器收到一个有点奇怪的 .csv
文件。看起来像:
Agency Name:IKEA,,,,,,,,,,,,,,
Advertiser Name: Ingka,,,,,,,,,,,,,,
Campaign Name:All,,,,,,,,,,,,,,
Date Resolution:Days,,,,,,,,,,,,,,
Campaign Dates:N/A,,,,,,,,,,,,,,
Report Date Range:Last X Days (25.06.2020 - 01.07.2020),,,,,,,,,,,,,,
Report Creation Date: 02.07.2020 5:26:18 (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,
You must save the report locally to create a pivot table based on the report data.,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,
Advertiser Name,Advertiser ID,Campaign Name,Campaign ID,Date,Site Name,Site ID,Device Type,Placement Name,Placement ID,Clickthrough URL,* Clicks,* Served Impressions,* Total Recordable Impressions (IAB),* Total Viewable Impressions (IAB)
Ingka,190530,1_flight_0119,947535,25.06.2020,Auditorius SE,101304,Smart Phone,Flight_EK_Auditorius_Video_mobile,27353235,https://www.ikea.com/promo/wifi?utm_source=Auditorius&utm_medium=Video_mobile,0,0,0,0
Ingka,190530,1_flight_0119,947535,28.06.2020,Between Exchange SE,124598,PC,Flight_IQP_Between_Exchange_Banner_728x90_DCO,27359134,,0,0,0,0
Data was updated last on 7/2/2020 12:00:00 AM (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,
Viewability mode is set per individual campaign,,,,,,,,,,,,,,
我无法以这种格式将其保存到数据库中。
我想要的:
- 删除这个无用的块:
Agency Name:IKEA,,,,,,,,,,,,,,
Advertiser Name: Ingka,,,,,,,,,,,,,,
Campaign Name:All,,,,,,,,,,,,,,
Date Resolution:Days,,,,,,,,,,,,,,
Campaign Dates:N/A,,,,,,,,,,,,,,
Report Date Range:Last X Days (25.06.2020 - 01.07.2020),,,,,,,,,,,,,,
Report Creation Date: 02.07.2020 5:26:18 (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,
You must save the report locally to create a pivot table based on the report data.,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,
- 删除这个无用的页脚:
Data was updated last on 7/2/2020 12:00:00 AM (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,
Viewability mode is set per individual campaign,,,,,,,,,,,,,,
- 更改 headers 名称。
发件人:Advertiser Name,Advertiser ID,Campaign Name,Campaign ID,Date,Site Name,Site ID,Device Type,Placement Name,Placement ID,Clickthrough URL,* Clicks,* Served Impressions,* Total Recordable Impressions (IAB),* Total Viewable Impressions (IAB)
收件人:advertiser_name,advertiser_id,campaign_name,campaign_id,report_date,site,site_id,device,placement_name,placement_id,url,clicks,imps,total_record_imps,total_view_imps
有什么工具可以用 Apache Nifi 达到它?
需要对您的数据进行清理以生成有效的 CSV 格式。您可以使用 ExecuteScript
或 ExecuteStreamCommand
处理器来执行数据清理脚本,比如 python,它将把传入的数据清理成您想要的格式。
下面的代码片段(header 标准化和数据清理)将让您了解如何使用为 Python 配置的 ExecuteScript
处理器访问流文件内容作为脚本引擎 -
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processors.script import ExecuteScript
from org.python.core.util.FileUtil import wrap
from io import StringIO
import re
# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
with wrap(inputStream) as f:
lines = f.readlines()
outer_new_value_list = []
is_csv_data = False
for csv_row in lines:
if not is_csv_data:
if csv_row.startswith("Advertiser Name,Advertiser ID,"):
is_csv_data = True
else:
continue
if is_csv_data:
if csv_row.startswith("Data was updated last on"):
break
else:
outer_new_value_list.append(csv_row)
outer_new_value_list[0] = outer_new_value_list[0].replace(' ', '_').replace('*', '').replace('-', '_').lower()
with wrap(outputStream, 'w') as filehandle:
filehandle.writelines("%s" % line for line in outer_new_value_list)
# end class
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile, PyStreamCallback())
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
# implicit return at the end
我从 FetchFTP
处理器收到一个有点奇怪的 .csv
文件。看起来像:
Agency Name:IKEA,,,,,,,,,,,,,,
Advertiser Name: Ingka,,,,,,,,,,,,,,
Campaign Name:All,,,,,,,,,,,,,,
Date Resolution:Days,,,,,,,,,,,,,,
Campaign Dates:N/A,,,,,,,,,,,,,,
Report Date Range:Last X Days (25.06.2020 - 01.07.2020),,,,,,,,,,,,,,
Report Creation Date: 02.07.2020 5:26:18 (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,
You must save the report locally to create a pivot table based on the report data.,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,
Advertiser Name,Advertiser ID,Campaign Name,Campaign ID,Date,Site Name,Site ID,Device Type,Placement Name,Placement ID,Clickthrough URL,* Clicks,* Served Impressions,* Total Recordable Impressions (IAB),* Total Viewable Impressions (IAB)
Ingka,190530,1_flight_0119,947535,25.06.2020,Auditorius SE,101304,Smart Phone,Flight_EK_Auditorius_Video_mobile,27353235,https://www.ikea.com/promo/wifi?utm_source=Auditorius&utm_medium=Video_mobile,0,0,0,0
Ingka,190530,1_flight_0119,947535,28.06.2020,Between Exchange SE,124598,PC,Flight_IQP_Between_Exchange_Banner_728x90_DCO,27359134,,0,0,0,0
Data was updated last on 7/2/2020 12:00:00 AM (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,
Viewability mode is set per individual campaign,,,,,,,,,,,,,,
我无法以这种格式将其保存到数据库中。 我想要的:
- 删除这个无用的块:
Agency Name:IKEA,,,,,,,,,,,,,,
Advertiser Name: Ingka,,,,,,,,,,,,,,
Campaign Name:All,,,,,,,,,,,,,,
Date Resolution:Days,,,,,,,,,,,,,,
Campaign Dates:N/A,,,,,,,,,,,,,,
Report Date Range:Last X Days (25.06.2020 - 01.07.2020),,,,,,,,,,,,,,
Report Creation Date: 02.07.2020 5:26:18 (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,
You must save the report locally to create a pivot table based on the report data.,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,
- 删除这个无用的页脚:
Data was updated last on 7/2/2020 12:00:00 AM (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,
Viewability mode is set per individual campaign,,,,,,,,,,,,,,
- 更改 headers 名称。
发件人:Advertiser Name,Advertiser ID,Campaign Name,Campaign ID,Date,Site Name,Site ID,Device Type,Placement Name,Placement ID,Clickthrough URL,* Clicks,* Served Impressions,* Total Recordable Impressions (IAB),* Total Viewable Impressions (IAB)
收件人:advertiser_name,advertiser_id,campaign_name,campaign_id,report_date,site,site_id,device,placement_name,placement_id,url,clicks,imps,total_record_imps,total_view_imps
有什么工具可以用 Apache Nifi 达到它?
需要对您的数据进行清理以生成有效的 CSV 格式。您可以使用 ExecuteScript
或 ExecuteStreamCommand
处理器来执行数据清理脚本,比如 python,它将把传入的数据清理成您想要的格式。
下面的代码片段(header 标准化和数据清理)将让您了解如何使用为 Python 配置的 ExecuteScript
处理器访问流文件内容作为脚本引擎 -
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processors.script import ExecuteScript
from org.python.core.util.FileUtil import wrap
from io import StringIO
import re
# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
with wrap(inputStream) as f:
lines = f.readlines()
outer_new_value_list = []
is_csv_data = False
for csv_row in lines:
if not is_csv_data:
if csv_row.startswith("Advertiser Name,Advertiser ID,"):
is_csv_data = True
else:
continue
if is_csv_data:
if csv_row.startswith("Data was updated last on"):
break
else:
outer_new_value_list.append(csv_row)
outer_new_value_list[0] = outer_new_value_list[0].replace(' ', '_').replace('*', '').replace('-', '_').lower()
with wrap(outputStream, 'w') as filehandle:
filehandle.writelines("%s" % line for line in outer_new_value_list)
# end class
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile, PyStreamCallback())
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
# implicit return at the end