Nifi ConvertAvroToORC 处理器无法转换具有由浮点数和浮点数组成的数组的 avro 文件
Nifi ConvertAvroToORC processor can't convert avro file which has array which consists of floats and array of float
- 我有avro方案(注意嵌套数组结构):
{
"namespace":"nifi",
"name":"scheme_name",
"type":"record",
"fields":[
{
"name":"values",
"type":{
"type":"array",
"items":{
"type":"array",
"items": ["float", {"type": "array", "items": ["float", "string", "null"]}]
}
}
}
]
}
- 我有 json 文件:
{"values": [[1, 1.1, 1.2, 1.3, [-1, -1.1, -1.2, -1.3], -2, 3], [2, 2.1, 2.2, 2.3, [-2, -2.1, -2.2, -2.3], -3, 4]]}
- 我有下一个 nifi 处理器组(见图)。 GetFile - 只需获取 json,您可以在上面看到。 ConvertRecord - 只需将 JsonTreeReader 收到的 json 转换为 AvroRecordSetWriter 收到的 avro。 JsonTreeReader 和 AvroRecordSetWriter 具有架构注册表:AvroSchemaRegistry(其中包含您可以在上面看到的 avro 方案)。
在将 avro 转换为 orc nifi 的步骤中抛出异常:
2018-10-17 13:51:56,809 ERROR [Timer-Driven Process Thread-8] o.a.n.processors.hive.ConvertAvroToORC ConvertAvroToORC[id=814f08dc-0166-1000-a46c-f69042e8ae94] ConvertAvroToORC[id=814f08dc-0166-1000-a46c-f69042e8ae94] failed to process session due to java.lang.IllegalArgumentException: Object Type for class org.apache.avro.generic.GenericData$Array not in Union declaration; Processor Administratively Yielded for 1 sec: java.lang.IllegalArgumentException: Object Type for class org.apache.avro.generic.GenericData$Array not in Union declaration
java.lang.IllegalArgumentException: Object Type for class org.apache.avro.generic.GenericData$Array not in Union declaration
at org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils.convertToORCObject(NiFiOrcUtils.java:88)
at org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils.lambda$convertToORCObject(NiFiOrcUtils.java:149)
at java.util.stream.ReferencePipeline.accept(ReferencePipeline.java:193)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils.convertToORCObject(NiFiOrcUtils.java:149)
at org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils.lambda$convertToORCObject(NiFiOrcUtils.java:149)
at java.util.stream.ReferencePipeline.accept(ReferencePipeline.java:193)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils.convertToORCObject(NiFiOrcUtils.java:149)
at org.apache.nifi.processors.hive.ConvertAvroToORC.lambda$onTrigger[=12=](ConvertAvroToORC.java:245)
at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:2885)
at org.apache.nifi.processors.hive.ConvertAvroToORC.onTrigger(ConvertAvroToORC.java:209)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent.run(TimerDrivenSchedulingAgent.java:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2018-10-17 13:51:56,812 WARN [Timer-Driven Process Thread-8] o.a.n.controller.tasks.ConnectableTask Administratively Yielding ConvertAvroToORC[id=814f08dc-0166-1000-a46c-f69042e8ae94] due to uncaught Exception: java.lang.IllegalArgumentException: Object Type for class org.apache.avro.generic.GenericData$Array not in Union declaration
java.lang.IllegalArgumentException: Object Type for class org.apache.avro.generic.GenericData$Array not in Union declaration
at org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils.convertToORCObject(NiFiOrcUtils.java:88)
at org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils.lambda$convertToORCObject(NiFiOrcUtils.java:149)
at java.util.stream.ReferencePipeline.accept(ReferencePipeline.java:193)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils.convertToORCObject(NiFiOrcUtils.java:149)
at org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils.lambda$convertToORCObject(NiFiOrcUtils.java:149)
at java.util.stream.ReferencePipeline.accept(ReferencePipeline.java:193)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils.convertToORCObject(NiFiOrcUtils.java:149)
at org.apache.nifi.processors.hive.ConvertAvroToORC.lambda$onTrigger[=12=](ConvertAvroToORC.java:245)
at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:2885)
at org.apache.nifi.processors.hive.ConvertAvroToORC.onTrigger(ConvertAvroToORC.java:209)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent.run(TimerDrivenSchedulingAgent.java:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2018-10-17 13:52:01,247 INFO [NiFi Web Server-214] o.a.n.c.s.StandardProcessScheduler Stopping ConvertRecord[id=814a4abe-0166-1000-0755-4d43aef3dc4a]
2018-10-17 13:52:01,247 INFO [NiFi Web Server-214] o.a.n.controller.StandardProcessorNode Stopping processor: class org.apache.nifi.processors.standard.ConvertRecord
2018-10-17 13:52:01,247 INFO [Timer-Driven Process Thread-8] o.a.n.c.s.TimerDrivenSchedulingAgent Stopped scheduling ConvertRecord[id=814a4abe-0166-1000-0755-4d43aef3dc4a] to run
所以,请告诉我我哪里错了?
环境:
OS:SUSE Linux 企业服务器 12 SP3(12.3 版)
或 Windows 7 企业 SP1
Nifi版本:1.7.1
- 我有avro方案(注意嵌套数组结构):
{
"namespace":"nifi",
"name":"scheme_name",
"type":"record",
"fields":[
{
"name":"values",
"type":{
"type":"array",
"items":{
"type":"array",
"items": ["float", {"type": "array", "items": ["float", "string", "null"]}]
}
}
}
]
}
- 我有 json 文件:
{"values": [[1, 1.1, 1.2, 1.3, [-1, -1.1, -1.2, -1.3], -2, 3], [2, 2.1, 2.2, 2.3, [-2, -2.1, -2.2, -2.3], -3, 4]]}
- 我有下一个 nifi 处理器组(见图)。 GetFile - 只需获取 json,您可以在上面看到。 ConvertRecord - 只需将 JsonTreeReader 收到的 json 转换为 AvroRecordSetWriter 收到的 avro。 JsonTreeReader 和 AvroRecordSetWriter 具有架构注册表:AvroSchemaRegistry(其中包含您可以在上面看到的 avro 方案)。
在将 avro 转换为 orc nifi 的步骤中抛出异常:
2018-10-17 13:51:56,809 ERROR [Timer-Driven Process Thread-8] o.a.n.processors.hive.ConvertAvroToORC ConvertAvroToORC[id=814f08dc-0166-1000-a46c-f69042e8ae94] ConvertAvroToORC[id=814f08dc-0166-1000-a46c-f69042e8ae94] failed to process session due to java.lang.IllegalArgumentException: Object Type for class org.apache.avro.generic.GenericData$Array not in Union declaration; Processor Administratively Yielded for 1 sec: java.lang.IllegalArgumentException: Object Type for class org.apache.avro.generic.GenericData$Array not in Union declaration
java.lang.IllegalArgumentException: Object Type for class org.apache.avro.generic.GenericData$Array not in Union declaration
at org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils.convertToORCObject(NiFiOrcUtils.java:88)
at org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils.lambda$convertToORCObject(NiFiOrcUtils.java:149)
at java.util.stream.ReferencePipeline.accept(ReferencePipeline.java:193)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils.convertToORCObject(NiFiOrcUtils.java:149)
at org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils.lambda$convertToORCObject(NiFiOrcUtils.java:149)
at java.util.stream.ReferencePipeline.accept(ReferencePipeline.java:193)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils.convertToORCObject(NiFiOrcUtils.java:149)
at org.apache.nifi.processors.hive.ConvertAvroToORC.lambda$onTrigger[=12=](ConvertAvroToORC.java:245)
at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:2885)
at org.apache.nifi.processors.hive.ConvertAvroToORC.onTrigger(ConvertAvroToORC.java:209)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent.run(TimerDrivenSchedulingAgent.java:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2018-10-17 13:51:56,812 WARN [Timer-Driven Process Thread-8] o.a.n.controller.tasks.ConnectableTask Administratively Yielding ConvertAvroToORC[id=814f08dc-0166-1000-a46c-f69042e8ae94] due to uncaught Exception: java.lang.IllegalArgumentException: Object Type for class org.apache.avro.generic.GenericData$Array not in Union declaration
java.lang.IllegalArgumentException: Object Type for class org.apache.avro.generic.GenericData$Array not in Union declaration
at org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils.convertToORCObject(NiFiOrcUtils.java:88)
at org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils.lambda$convertToORCObject(NiFiOrcUtils.java:149)
at java.util.stream.ReferencePipeline.accept(ReferencePipeline.java:193)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils.convertToORCObject(NiFiOrcUtils.java:149)
at org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils.lambda$convertToORCObject(NiFiOrcUtils.java:149)
at java.util.stream.ReferencePipeline.accept(ReferencePipeline.java:193)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils.convertToORCObject(NiFiOrcUtils.java:149)
at org.apache.nifi.processors.hive.ConvertAvroToORC.lambda$onTrigger[=12=](ConvertAvroToORC.java:245)
at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:2885)
at org.apache.nifi.processors.hive.ConvertAvroToORC.onTrigger(ConvertAvroToORC.java:209)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent.run(TimerDrivenSchedulingAgent.java:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2018-10-17 13:52:01,247 INFO [NiFi Web Server-214] o.a.n.c.s.StandardProcessScheduler Stopping ConvertRecord[id=814a4abe-0166-1000-0755-4d43aef3dc4a]
2018-10-17 13:52:01,247 INFO [NiFi Web Server-214] o.a.n.controller.StandardProcessorNode Stopping processor: class org.apache.nifi.processors.standard.ConvertRecord
2018-10-17 13:52:01,247 INFO [Timer-Driven Process Thread-8] o.a.n.c.s.TimerDrivenSchedulingAgent Stopped scheduling ConvertRecord[id=814a4abe-0166-1000-0755-4d43aef3dc4a] to run
所以,请告诉我我哪里错了?
环境: OS:SUSE Linux 企业服务器 12 SP3(12.3 版) 或 Windows 7 企业 SP1 Nifi版本:1.7.1