使用 Apache Nifi 修改 csv

Modify csv with Apache Nifi

我从 FetchFTP 处理器收到一个有点奇怪的 .csv 文件。看起来像:

Agency Name:IKEA,,,,,,,,,,,,,,
Advertiser Name: Ingka,,,,,,,,,,,,,,
Campaign Name:All,,,,,,,,,,,,,,
Date Resolution:Days,,,,,,,,,,,,,,
Campaign Dates:N/A,,,,,,,,,,,,,,
Report Date Range:Last X Days (25.06.2020 - 01.07.2020),,,,,,,,,,,,,,
Report Creation Date: 02.07.2020 5:26:18 (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,

You must save the report locally to create a pivot table based on the report data.,,,,,,,,,,,,,,

,,,,,,,,,,,,,,,

Advertiser Name,Advertiser ID,Campaign Name,Campaign ID,Date,Site Name,Site ID,Device Type,Placement Name,Placement ID,Clickthrough URL,* Clicks,* Served Impressions,* Total Recordable Impressions (IAB),* Total Viewable Impressions (IAB)
Ingka,190530,1_flight_0119,947535,25.06.2020,Auditorius SE,101304,Smart Phone,Flight_EK_Auditorius_Video_mobile,27353235,https://www.ikea.com/promo/wifi?utm_source=Auditorius&utm_medium=Video_mobile,0,0,0,0
Ingka,190530,1_flight_0119,947535,28.06.2020,Between Exchange SE,124598,PC,Flight_IQP_Between_Exchange_Banner_728x90_DCO,27359134,,0,0,0,0
Data was updated last on 7/2/2020 12:00:00 AM (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,
Viewability mode is set per individual campaign,,,,,,,,,,,,,,

我无法以这种格式将其保存到数据库中。 我想要的:

  1. 删除这个无用的块:
Agency Name:IKEA,,,,,,,,,,,,,,
Advertiser Name: Ingka,,,,,,,,,,,,,,
Campaign Name:All,,,,,,,,,,,,,,
Date Resolution:Days,,,,,,,,,,,,,,
Campaign Dates:N/A,,,,,,,,,,,,,,
Report Date Range:Last X Days (25.06.2020 - 01.07.2020),,,,,,,,,,,,,,
Report Creation Date: 02.07.2020 5:26:18 (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,

You must save the report locally to create a pivot table based on the report data.,,,,,,,,,,,,,,

,,,,,,,,,,,,,,,

  1. 删除这个无用的页脚:
Data was updated last on 7/2/2020 12:00:00 AM (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,
Viewability mode is set per individual campaign,,,,,,,,,,,,,,
  1. 更改 headers 名称。

发件人:Advertiser Name,Advertiser ID,Campaign Name,Campaign ID,Date,Site Name,Site ID,Device Type,Placement Name,Placement ID,Clickthrough URL,* Clicks,* Served Impressions,* Total Recordable Impressions (IAB),* Total Viewable Impressions (IAB)

收件人:advertiser_name,advertiser_id,campaign_name,campaign_id,report_date,site,site_id,device,placement_name,placement_id,url,clicks,imps,total_record_imps,total_view_imps

有什么工具可以用 Apache Nifi 达到它?

需要对您的数据进行清理以生成有效的 CSV 格式。您可以使用 ExecuteScriptExecuteStreamCommand 处理器来执行数据清理脚本,比如 python,它将把传入的数据清理成您想要的格式。

下面的代码片段(header 标准化和数据清理)将让您了解如何使用为 Python 配置的 ExecuteScript 处理器访问流文件内容作为脚本引擎 -

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processors.script import ExecuteScript
from org.python.core.util.FileUtil import wrap
from io import StringIO
import re


# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):
        with wrap(inputStream) as f:
            lines = f.readlines()

        outer_new_value_list = []
        is_csv_data = False
        for csv_row in lines:
            if not is_csv_data:
                if csv_row.startswith("Advertiser Name,Advertiser ID,"):
                    is_csv_data = True
                else:
                    continue
            if is_csv_data:
                if csv_row.startswith("Data was updated last on"):
                    break
                else:
                    outer_new_value_list.append(csv_row)

        outer_new_value_list[0] = outer_new_value_list[0].replace(' ', '_').replace('*', '').replace('-', '_').lower()
        with wrap(outputStream, 'w') as filehandle:
            filehandle.writelines("%s" % line for line in outer_new_value_list)


# end class
flowFile = session.get()
if (flowFile != None):
    flowFile = session.write(flowFile, PyStreamCallback())
    session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
# implicit return at the end