使用 Apache NiFi ExecuteStreamCommand 将 CSV 转换为 JSON - Python
Transform CSV to JSON with Apache NiFi ExecuteStreamCommand - Python
我目前在 Apache NiFi 处理器 ExecuteStreamCommand 和 Python-脚本的实现方面遇到问题。
我写了下面的代码将 50 个不同的 csv 文件转换成 json。之后我要把那些 JSON 写入 HDFS。
import json
import pandas as pd
df = pd.read_csv(r'***.csv',
sep='\t',
skiprows=2)
df = df.dropna(axis=1, how='all')
df = df.drop_duplicates(keep='first')
del df['Charge']
df = df.rename(columns={df.columns[0]: "Zeitstempel", df.columns[1]: "Maschine"})
df.columns = map(str.lower, df.columns)
df['zeitstempel'] = pd.to_datetime(df['zeitstempel'], format='%d.%m.%y, %X')
df['zeitstempel'] = df['zeitstempel'].astype(str)
columns = list(df.columns)
for column in range(len(columns)):
if str(columns[column]).startswith('_'):
columns[column] = columns[column][1:]
df.columns = columns
machine = df["maschine"][0]
day = str(df["zeitstempel"][0])[5:7]
month = str(df["zeitstempel"][0])[8:10]
year = str(df["zeitstempel"][0])[0:4]
fileName = machine + "_" + year + "_" + month + "_" + day + ".json"
filePath = "***" + fileName
df.to_json(filePath, orient='records', date_format='iso', date_unit='s', lines=True)
该脚本在我的本地目录上运行良好,但我需要如何更改 NiFi 的输入和输出?
NiFi 流程如下:ListFile > FetchFile > ExecuteStreamCommand > PutHDFS.
我尝试了如下代码:
#!/usr/bin/env python2
import json
import pandas as pd
df = pd.read_csv(sys.stdin,
sep='\t',
skiprows=2)
df = df.dropna(axis=1, how='all')
df = df.drop_duplicates(keep='first')
del df['Charge']
df = df.rename(columns={df.columns[0]: "Zeitstempel", df.columns[1]: "Maschine"})
df.columns = map(str.lower, df.columns)
df['zeitstempel'] = pd.to_datetime(df['zeitstempel'], format='%d.%m.%y, %X')
df['zeitstempel'] = df['zeitstempel'].astype(str)
columns = list(df.columns)
for column in range(len(columns)):
if str(columns[column]).startswith('_'):
columns[column] = columns[column][1:]
df.columns = columns
machine = df["maschine"][0]
day = str(df["zeitstempel"][0])[5:7]
month = str(df["zeitstempel"][0])[8:10]
year = str(df["zeitstempel"][0])[0:4]
fileName = machine + "_" + year + "_" + month + "_" + day + ".json"
df.to_json(sys.stdout, orient='records', date_format='iso', date_unit='s', lines=True)
并将处理器配置为:
提前感谢来自德国的您!
尼克
像这样配置你的 ExecuteStreamCommand
处理器 -
另请查看官方文档 - ExecuteStreamCommand
我目前在 Apache NiFi 处理器 ExecuteStreamCommand 和 Python-脚本的实现方面遇到问题。
我写了下面的代码将 50 个不同的 csv 文件转换成 json。之后我要把那些 JSON 写入 HDFS。
import json
import pandas as pd
df = pd.read_csv(r'***.csv',
sep='\t',
skiprows=2)
df = df.dropna(axis=1, how='all')
df = df.drop_duplicates(keep='first')
del df['Charge']
df = df.rename(columns={df.columns[0]: "Zeitstempel", df.columns[1]: "Maschine"})
df.columns = map(str.lower, df.columns)
df['zeitstempel'] = pd.to_datetime(df['zeitstempel'], format='%d.%m.%y, %X')
df['zeitstempel'] = df['zeitstempel'].astype(str)
columns = list(df.columns)
for column in range(len(columns)):
if str(columns[column]).startswith('_'):
columns[column] = columns[column][1:]
df.columns = columns
machine = df["maschine"][0]
day = str(df["zeitstempel"][0])[5:7]
month = str(df["zeitstempel"][0])[8:10]
year = str(df["zeitstempel"][0])[0:4]
fileName = machine + "_" + year + "_" + month + "_" + day + ".json"
filePath = "***" + fileName
df.to_json(filePath, orient='records', date_format='iso', date_unit='s', lines=True)
该脚本在我的本地目录上运行良好,但我需要如何更改 NiFi 的输入和输出?
NiFi 流程如下:ListFile > FetchFile > ExecuteStreamCommand > PutHDFS.
我尝试了如下代码:
#!/usr/bin/env python2
import json
import pandas as pd
df = pd.read_csv(sys.stdin,
sep='\t',
skiprows=2)
df = df.dropna(axis=1, how='all')
df = df.drop_duplicates(keep='first')
del df['Charge']
df = df.rename(columns={df.columns[0]: "Zeitstempel", df.columns[1]: "Maschine"})
df.columns = map(str.lower, df.columns)
df['zeitstempel'] = pd.to_datetime(df['zeitstempel'], format='%d.%m.%y, %X')
df['zeitstempel'] = df['zeitstempel'].astype(str)
columns = list(df.columns)
for column in range(len(columns)):
if str(columns[column]).startswith('_'):
columns[column] = columns[column][1:]
df.columns = columns
machine = df["maschine"][0]
day = str(df["zeitstempel"][0])[5:7]
month = str(df["zeitstempel"][0])[8:10]
year = str(df["zeitstempel"][0])[0:4]
fileName = machine + "_" + year + "_" + month + "_" + day + ".json"
df.to_json(sys.stdout, orient='records', date_format='iso', date_unit='s', lines=True)
并将处理器配置为:
提前感谢来自德国的您!
尼克
像这样配置你的 ExecuteStreamCommand
处理器 -
另请查看官方文档 - ExecuteStreamCommand