NiFi 无法为 HTTP 和特定的 Elasticsearch 处理器写入 elasticsearch

NiFi fails to write to elasticsearch for both HTTP and specific Elasticsearch processor

我正在尝试学习 NiFi 写入 Elastic 的介绍性教程。

https://linkbynet.github.io/elasticsearch/tuning/2017/02/07/Bitcoin-ELK-NiFi.html

我的 NiFi 版本是 1.11.4(当前)和 Elastic 7.7.1(当前),分别作为 dockerfile:https://gist.github.com/geoHeil/7e7426a4d2b2fd363c036d9b12069598.

完整版的 nifi 流 xml 也可在 https://github.com/geoHeil/streaming-reference/blob/master/for_nifi/conf/flow.xml.gz

获得

我遇到以下问题:

Query: ${timestamp:multiply(1000):format(“yyyy-MM-dd”)}, transferring to failure: org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException: Invalid Expression: bitstamp-${timestamp:multiply(1000):format(“yyyy-MM-dd”)} due to Unexpected token '“yyyy-MM-dd”' at line 1, column 34.

但是elastic的日志显示:["java.lang.IllegalStateException: Received message from unsupported version: [5.0.0] minimal compatible version is: [6.8.0]",

2. when simplifying the indexing to: `bitstamp-${timestamp}`, fails with:

```
PutElasticsearch5[id=97ed752a-0172-1000-0000-000024edfeac] Failed to insert into Elasticsearch due to None of the configured nodes are available: [{#transport#-1}{9Tgt3qGwSAm5hNsRIEiS-Q}{172.18.0.6}{172.18.0.6:9300}]. More detailed information may be available in the NiFi logs.: NoNodeAvailableException[None of the configured nodes are available: [{#transport#-1}{9Tgt3qGwSAm5hNsRIEiS-Q}{172.18.0.6}{172.18.0.6:9300}]]
```
And the logs keep complaining about an incompatible API. Still, I do not understand where 1) is a parsing failure of the expression langugage initially.

尝试仅使用 HTTP API 来解决二进制不兼容问题(并坚持使用 (2) 来解决问题):

PutElasticsearchHttp[id=9801caaa-0172-1000-ffff-ffffe063c238] Routing to failure due to exception: unexpected end of stream on http://elasticsearch:9300/...: java.io.IOException: unexpected end of stream on http://elasticsearch:9300/...

nifi_1            | 2020-06-10 19:59:34,717 ERROR [Timer-Driven Process Thread-9] o.a.n.p.e.PutElasticsearchHttp PutElasticsearchHttp[id=9801caaa-0172-1000-ffff-ffffe063c238] Routing to failure due to exception: java.io.IOException: unexpected end of stream on http://elasticsearch:9300/...: java.io.IOException: unexpected end of stream on http://elasticsearch:9300/...
nifi_1            | java.io.IOException: unexpected end of stream on http://elasticsearch:9300/...
nifi_1            |     at okhttp3.internal.http1.Http1ExchangeCodec.readResponseHeaders(Http1ExchangeCodec.java:236)
nifi_1            |     at okhttp3.internal.connection.Exchange.readResponseHeaders(Exchange.java:115)
nifi_1            |     at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:94)
nifi_1            |     at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
nifi_1            |     at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:43)
nifi_1            |     at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
nifi_1            |     at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
nifi_1            |     at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:94)
nifi_1            |     at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
nifi_1            |     at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
nifi_1            |     at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
nifi_1            |     at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
nifi_1            |     at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:88)
nifi_1            |     at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
nifi_1            |     at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
nifi_1            |     at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:229)
nifi_1            |     at okhttp3.RealCall.execute(RealCall.java:81)
nifi_1            |     at org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.sendRequestToElasticsearch(AbstractElasticsearchHttpProcessor.java:296)
nifi_1            |     at org.apache.nifi.processors.elasticsearch.PutElasticsearchHttp.onTrigger(PutElasticsearchHttp.java:315)
nifi_1            |     at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
nifi_1            |     at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
nifi_1            |     at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
nifi_1            |     at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent.run(TimerDrivenSchedulingAgent.java:117)
nifi_1            |     at org.apache.nifi.engine.FlowEngine.run(FlowEngine.java:110)
nifi_1            |     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
nifi_1            |     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
nifi_1            |     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
nifi_1            |     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
nifi_1            |     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
nifi_1            |     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
nifi_1            |     at java.lang.Thread.run(Thread.java:748)
nifi_1            | Caused by: java.io.EOFException: \n not found: limit=24 content=54686973206973206e6f7420616e204854545020706f7274…
nifi_1            |     at okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.java:240)
nifi_1            |     at okhttp3.internal.http1.Http1ExchangeCodec.readHeaderLine(Http1ExchangeCodec.java:242)
nifi_1            |     at okhttp3.internal.http1.Http1ExchangeCodec.readResponseHeaders(Http1ExchangeCodec.java:213)
nifi_1            |     ... 30 common frames omitted

URI 设置为:http://elasticsearch:9300

所以我有两个问题:

NiFi应该如何与elastic对话?最新支持的版本是什么?通常更推荐使用 HTTP(较慢)的处理方式吗? 如何修复损坏的表达式语言字符串:bitstamp-${timestamp:multiply(1000):format(“yyyy-MM-dd”)}

1) 这是一个复制粘贴错误。重新输入引号时,它会很好地解析表达式。

2) 二进制协议确实存在不兼容问题,人们正在努力解决这个问题。 https://issues.apache.org/jira/browse/NIFI-6403?jql=project%20%3D%20NIFI%20AND%20text%20~%20%22elasticsearch%207%22;

事实上,在 HTTP NiFi 插件支持方面甚至似乎存在问题。因此,在某些极端情况下,可能需要手动 HTTP 处理器。

3)端口9200是二进制端口。需要切换到 9300。