Apache Nifi MergeContent 输出数据不一致?
Apache Nifi MergeContent output data inconsistent?
刚开始使用 nifi。需要设计方面的帮助。
我正在尝试在 HDFS 目录中使用虚拟 csv 文件(目前)创建一个简单的流,并将一些文本数据添加到每个流文件中的每条记录。
传入文件:
dummy1.csv
dummy2.csv
dummy3.csv
内容:
"Eldon Base for stackable storage shelf, platinum",Muhammed MacIntyre,3,-213.25,38.94,35,Nunavut,Storage & Organization,0.8
"1.7 Cubic Foot Compact ""Cube"" Office Refrigerators",BarryFrench,293,457.81,208.16,68.02,Nunavut,Appliances,0.58
"Cardinal Slant-D Ring Binder, Heavy Gauge Vinyl",Barry French,293,46.71,8.69,2.99,Nunavut,Binders and Binder Accessories,0.39
...
期望输出:
d17a3259-0718-4c7b-bee8-924266aebcc7,Mon Jun 04 16:36:56 EDT 2018,Fellowes Recycled Storage Drawers,Allen Rosenblatt,11137,395.12,111.03,8.64,Northwest Territories,Storage & Organization,0.78
25f17667-9216-4f1d-b69c-23403cd13464,Mon Jun 04 16:36:56 EDT 2018,Satellite Sectional Post Binders,Barry Weirich,11202,79.59,43.41,2.99,Northwest Territories,Binders and Binder Accessories,0.39
ce0b569f-5d93-4a54-b55e-09c18705f973,Mon Jun 04 16:36:56 EDT 2018,Deflect-o DuraMat Antistatic Studded Beveled Mat for Medium Pile Carpeting,Doug Bickford,11456,399.37,105.34,24.49,Northwest Territories,Office Furnishings,0.61
流量
拆分文本-
替换文本-
MergeContent-
(这可能是实现我想要得到的东西的一种糟糕方法,但我在某个地方看到 uuid 在生成唯一会话 ID 时是最好的选择。所以想到将传入数据中的每一行提取到流文件并生成 uuid)
但不知何故,如您所见,数据顺序混乱了。前 3 行在输出中不相同。但是,我正在使用的测试数据(50000 个条目)似乎在其他行中有数据。多次测试通常显示数据顺序在第 2001 行之后发生变化。
是的,我确实在这里搜索了类似的问题,并尝试在合并中使用碎片整理方法,但它没有用。如果有人能解释这里发生了什么,我将不胜感激,我如何才能以相同的方式使用每条记录的唯一 session_id,timestamp 获取数据。是否需要更改或修改某些参数以获得正确的输出?如果还有更好的方法,我愿意接受建议。
首先感谢您如此详尽详尽的回复。我想你消除了我对处理器如何工作的很多疑虑!
The ordering of the merge is only guaranteed in defragment mode because it will put the flow files in order according to their fragment index. I'm not sure why that wouldn't be working, but if you could create a template of a flow with sample data that showed the problem it would be helpful to debug.
我会尝试使用干净的模板再次复制此方法。可能是一些参数问题和 HDFS 编写器无法写入。
I'm not sure if the intent of your flow is to just re-merge the original CSV that was split, or to merge together several different CSVs. Defragment mode will only re-merge the original CSV, so if ListHDFS picked up 10 CSVs, after splitting and re-merging, you should again have 10 CSVs.
是的,这正是我需要的。将数据拆分并加入到相应的文件中。我还没有明确地(还)需要再次加入输出。
The approach of splitting a CSV down to 1 line per flow file to manipulate each line is a common approach, however it won't perform very well if you have many large CSV files. A more efficient approach would be to try and manipulate the data in place without splitting. This can generally be done with the record-oriented processors.
- 我纯粹凭直觉使用这种方法,并没有意识到这是一种常用方法。有时数据文件可能非常大,这意味着单个文件中有超过一百万条记录。这不会是集群中 i/o 的问题吗?因为这意味着每个记录=一个流文件=一个唯一的 uuid。 nifi 可以处理的流文件数量是多少? (我知道这取决于集群配置,并将尝试从 hdp 管理员那里获取有关集群的更多信息)
- "try and manipulate the data in place without splitting" 你有什么建议?你能给出一个例子或模板或处理器来使用吗?
In this case you would need to define a schema for your CSV which included all the columns in your data, plus the session id and timestamp. Then using an UpdateRecord processor you would use record path expressions like /session_id = ${UUID()} and /timestamp = ${now()}. This would stream the content line by line and update each record and write it back out, keeping it all as one flow file.
这看起来很有希望。你能分享一个简单的模板从 hdfs 中提取文件>处理>写入 hdfs 文件但不拆分吗?
由于限制,我不愿意分享模板。但是让我看看我是否可以创建一个通用模板,我会分享
谢谢你的智慧! :)
刚开始使用 nifi。需要设计方面的帮助。 我正在尝试在 HDFS 目录中使用虚拟 csv 文件(目前)创建一个简单的流,并将一些文本数据添加到每个流文件中的每条记录。
传入文件:
dummy1.csv
dummy2.csv
dummy3.csv
内容:
"Eldon Base for stackable storage shelf, platinum",Muhammed MacIntyre,3,-213.25,38.94,35,Nunavut,Storage & Organization,0.8
"1.7 Cubic Foot Compact ""Cube"" Office Refrigerators",BarryFrench,293,457.81,208.16,68.02,Nunavut,Appliances,0.58
"Cardinal Slant-D Ring Binder, Heavy Gauge Vinyl",Barry French,293,46.71,8.69,2.99,Nunavut,Binders and Binder Accessories,0.39
...
期望输出:
d17a3259-0718-4c7b-bee8-924266aebcc7,Mon Jun 04 16:36:56 EDT 2018,Fellowes Recycled Storage Drawers,Allen Rosenblatt,11137,395.12,111.03,8.64,Northwest Territories,Storage & Organization,0.78
25f17667-9216-4f1d-b69c-23403cd13464,Mon Jun 04 16:36:56 EDT 2018,Satellite Sectional Post Binders,Barry Weirich,11202,79.59,43.41,2.99,Northwest Territories,Binders and Binder Accessories,0.39
ce0b569f-5d93-4a54-b55e-09c18705f973,Mon Jun 04 16:36:56 EDT 2018,Deflect-o DuraMat Antistatic Studded Beveled Mat for Medium Pile Carpeting,Doug Bickford,11456,399.37,105.34,24.49,Northwest Territories,Office Furnishings,0.61
流量
(这可能是实现我想要得到的东西的一种糟糕方法,但我在某个地方看到 uuid 在生成唯一会话 ID 时是最好的选择。所以想到将传入数据中的每一行提取到流文件并生成 uuid)
但不知何故,如您所见,数据顺序混乱了。前 3 行在输出中不相同。但是,我正在使用的测试数据(50000 个条目)似乎在其他行中有数据。多次测试通常显示数据顺序在第 2001 行之后发生变化。
是的,我确实在这里搜索了类似的问题,并尝试在合并中使用碎片整理方法,但它没有用。如果有人能解释这里发生了什么,我将不胜感激,我如何才能以相同的方式使用每条记录的唯一 session_id,timestamp 获取数据。是否需要更改或修改某些参数以获得正确的输出?如果还有更好的方法,我愿意接受建议。
首先感谢您如此详尽详尽的回复。我想你消除了我对处理器如何工作的很多疑虑!
The ordering of the merge is only guaranteed in defragment mode because it will put the flow files in order according to their fragment index. I'm not sure why that wouldn't be working, but if you could create a template of a flow with sample data that showed the problem it would be helpful to debug.
我会尝试使用干净的模板再次复制此方法。可能是一些参数问题和 HDFS 编写器无法写入。
I'm not sure if the intent of your flow is to just re-merge the original CSV that was split, or to merge together several different CSVs. Defragment mode will only re-merge the original CSV, so if ListHDFS picked up 10 CSVs, after splitting and re-merging, you should again have 10 CSVs.
是的,这正是我需要的。将数据拆分并加入到相应的文件中。我还没有明确地(还)需要再次加入输出。
The approach of splitting a CSV down to 1 line per flow file to manipulate each line is a common approach, however it won't perform very well if you have many large CSV files. A more efficient approach would be to try and manipulate the data in place without splitting. This can generally be done with the record-oriented processors.
- 我纯粹凭直觉使用这种方法,并没有意识到这是一种常用方法。有时数据文件可能非常大,这意味着单个文件中有超过一百万条记录。这不会是集群中 i/o 的问题吗?因为这意味着每个记录=一个流文件=一个唯一的 uuid。 nifi 可以处理的流文件数量是多少? (我知道这取决于集群配置,并将尝试从 hdp 管理员那里获取有关集群的更多信息)
- "try and manipulate the data in place without splitting" 你有什么建议?你能给出一个例子或模板或处理器来使用吗?
In this case you would need to define a schema for your CSV which included all the columns in your data, plus the session id and timestamp. Then using an UpdateRecord processor you would use record path expressions like /session_id = ${UUID()} and /timestamp = ${now()}. This would stream the content line by line and update each record and write it back out, keeping it all as one flow file.
这看起来很有希望。你能分享一个简单的模板从 hdfs 中提取文件>处理>写入 hdfs 文件但不拆分吗?
由于限制,我不愿意分享模板。但是让我看看我是否可以创建一个通用模板,我会分享
谢谢你的智慧! :)