使用 Apache NiFi ExecuteStreamCommand 将 CSV 转换为 JSON - Python

Transform CSV to JSON with Apache NiFi ExecuteStreamCommand - Python

我目前在 Apache NiFi 处理器 ExecuteStreamCommand 和 Python-脚本的实现方面遇到问题。

我写了下面的代码将 50 个不同的 csv 文件转换成 json。之后我要把那些 JSON 写入 HDFS。

import json
import pandas as pd

df = pd.read_csv(r'***.csv',
                 sep='\t',
                 skiprows=2)

df = df.dropna(axis=1, how='all')
df = df.drop_duplicates(keep='first')
del df['Charge']
df = df.rename(columns={df.columns[0]: "Zeitstempel", df.columns[1]: "Maschine"})
df.columns = map(str.lower, df.columns)

df['zeitstempel'] = pd.to_datetime(df['zeitstempel'], format='%d.%m.%y, %X')
df['zeitstempel'] = df['zeitstempel'].astype(str)
columns = list(df.columns)

for column in range(len(columns)):
    if str(columns[column]).startswith('_'):
        columns[column] = columns[column][1:]

df.columns = columns

machine = df["maschine"][0]
day = str(df["zeitstempel"][0])[5:7]
month = str(df["zeitstempel"][0])[8:10]
year = str(df["zeitstempel"][0])[0:4]
fileName = machine + "_" + year + "_" + month + "_" + day + ".json"
filePath = "***" + fileName

df.to_json(filePath, orient='records', date_format='iso', date_unit='s', lines=True)

该脚本在我的本地目录上运行良好,但我需要如何更改 NiFi 的输入和输出?

NiFi 流程如下:ListFile > FetchFile > ExecuteStreamCommand > PutHDFS.

我尝试了如下代码:

#!/usr/bin/env python2

import json
import pandas as pd

df = pd.read_csv(sys.stdin,
                 sep='\t',
                 skiprows=2)

df = df.dropna(axis=1, how='all')
df = df.drop_duplicates(keep='first')
del df['Charge']
df = df.rename(columns={df.columns[0]: "Zeitstempel", df.columns[1]: "Maschine"})
df.columns = map(str.lower, df.columns)

df['zeitstempel'] = pd.to_datetime(df['zeitstempel'], format='%d.%m.%y, %X')
df['zeitstempel'] = df['zeitstempel'].astype(str)
columns = list(df.columns)

for column in range(len(columns)):
    if str(columns[column]).startswith('_'):
        columns[column] = columns[column][1:]

df.columns = columns

machine = df["maschine"][0]
day = str(df["zeitstempel"][0])[5:7]
month = str(df["zeitstempel"][0])[8:10]
year = str(df["zeitstempel"][0])[0:4]
fileName = machine + "_" + year + "_" + month + "_" + day + ".json"

df.to_json(sys.stdout, orient='records', date_format='iso', date_unit='s', lines=True)

并将处理器配置为:

提前感谢来自德国的您!

尼克

像这样配置你的 ExecuteStreamCommand 处理器 -

另请查看官方文档 - ExecuteStreamCommand