NiFi 流式传输到 EMR 上的 Spark

NiFi Streaming to Spark on EMR

Apache and Hortonworks 上使用博客 posts 当两者位于同一台机器上时,我已经能够从 NiFi 流式传输到 Spark。现在我正在尝试从一个 EC2 实例上的 NiFi 流式传输到同一子网和安全组中的 EMR 集群,但我 运行 遇到了问题。 EMR Core 机器报告的具体错误是

Failed to receive data from NiFi
java.net.ConnectException: Connection refused
    at sun.nio.ch.Net.connect0(Native Method)
    at sun.nio.ch.Net.connect(Net.java:454)
    at sun.nio.ch.Net.connect(Net.java:446)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
    at java.nio.channels.SocketChannel.open(SocketChannel.java:189)
    at org.apache.nifi.remote.client.socket.EndpointConnectionPool.establishSiteToSiteConnection(EndpointConnectionPool.java:708)
    at org.apache.nifi.remote.client.socket.EndpointConnectionPool.establishSiteToSiteConnection(EndpointConnectionPool.java:682)
    at org.apache.nifi.remote.client.socket.EndpointConnectionPool.getEndpointConnection(EndpointConnectionPool.java:300)
    at org.apache.nifi.remote.client.socket.SocketClient.createTransaction(SocketClient.java:129)
    at org.apache.nifi.spark.NiFiReceiver$ReceiveRunnable.run(NiFiReceiver.java:149)

在核心机器上使用 netstat 我看到它在站点到站点端口(在我的例子中是 8090)上确实有一个到 NiFi 盒子的开放 TCP 连接。在 NiFi 机器上,在 nifi-app.log 文件中,我看到来自 "Site-to-Site Worker Thread" 的关于我的核心机器建立连接的日志(没有关于任何错误的信息)。所以最初的连接似乎是成功的,但之后就没那么成功了。

当我在本地 运行 我的 Spark 代码时,我在 NiFi EC2 实例上,所以我知道它通常可以正常工作。一旦客户端是 EMR 集群,我就遇到了一些问题,可能与安全相关。

作为解决方法,我可以 post 将文件发送到 S3,然后从 NiFi 启动一个 Spark 步骤(使用 Python 脚本),但我更愿意流式传输数据(并且使用 Kafka 不是一种选择)。还有其他人从 NiFi 流式传输到 EMR 吗?

这 post 类似:Getting data from Nifi to spark streaming 不同之处在于我关闭了安全功能并且我使用的是 http,而不是 https(我遇到的连接被拒绝而不是 401) .

编辑:

nifi.properties:

# Site to Site properties
nifi.remote.input.host=
nifi.remote.input.secure=false
nifi.remote.input.socket.host=
nifi.remote.input.socket.port=8090
nifi.remote.input.http.enabled=true
nifi.remote.input.http.transaction.ttl=30 sec

Bryan Bende 在上面的评论中有解决方案:一旦我将 nifi.remote.input.host 设置为当前机器的 IP 地址,流式传输就开始工作了。