Apache NiFi:ExecuteStreamCommand 生成两个流文件

Apache NiFi: ExecuteStreamCommand generating two flow files

我目前 运行 使用 PYthon 遇到 Apache NiFi ExecuteStreamCommand 的问题。我有一个脚本可以读取 csv 并将其转换为 pandas-Dataframes,然后再转换为 JSON。由于列的命名不一致,该脚本将 csv 文件拆分为多个 DataFrame。我当前的脚本如下所示:

import pandas as pd
import sys

input = sys.stdin.readlines()

#searching for subfiles and saving them to a list with files ...

appendDataFrames = []

for dataFrames in range(len(files)):
   df = pd.DataFrame(files[dataFrame])
   #several improvements of DataFrame ...
   appendDataFrames.append(df)

output = pd.concat(appendDataFrames)
JSONOutPut = output.to_json(orient='records', date_format='iso', date_unit='s')
sys.stdout.write(JSONOutPut)

在我的下一个处理器的队列中,我现在可以看到一个 FlowFile 作为 JSON(如预期的那样)。 我的问题是,是否可以将每个 JSON 写入单独的 FlowFiles,以便我的下一个处理器能够分开处理它们?我需要这样做,因为下一个处理器是 InferAvroSchema,并且由于所有 JSON 都有不同的模式,所以这是没有机会的。我错了吗?或者有什么办法可以解决这个问题?

下面的代码将无法工作,因为它无论如何都在同一个流文件中,而我的 InferAvroSchema 无法处理这个分离。

import pandas as pd
import sys

input = sys.stdin.readlines()

#searching for subfiles and saving them to a list with files ...

appendDataFrames = []

for dataFrames in range(len(files)):
   df = pd.DataFrame(files[dataFrame])
   #several improvements of DataFrame ...
   JSONOutPut = df.to_json(orient='records', date_format='iso', date_unit='s')
   sys.stdout.write(JSONOutPut)

提前致谢!

使用 ExecuteStreamCommand 时,您无法拆分输出,因为您必须写入标准输出。

但是您可以在输出中写入一些定界符并使用 SplitContent 与下一个处理器相同的定界符。

我刚刚修改了我的代码如下:

import pandas as pd
import sys

input = sys.stdin.readlines()

#searching for subfiles and saving them to a list with files ...

appendDataFrames = []

for dataFrames in range(len(files)):
   df = pd.DataFrame(files[dataFrame])
   #several improvements of DataFrame ...
   JSONOutPut = df.to_json(orient='records', date_format='iso', date_unit='s')
   sys.stdout.write(JSONOutPut)
   sys.stdout.write("#;#")

并添加了一个 SplitContent 处理器,例如: