从 Apache flink 接收到位于远程服务器中的 Ignite
Sink From Apache flink to Ignite which is located in a remote server
我在我的应用程序中使用 flink 作为流处理器,我想将结果转储到位于远程服务器中的 apache ignite 中。在 ignite.xml 的 TcpDiscoverySpi 部分,我指定了 ignite 服务器的 ip 地址;但是,没有连接到服务器。
这是我的 ignite.xml:
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<value>remoteIpAddress:47500</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
我使用下面的 link 作为 igniteSink class:
https://github.com/bpark/flink-ignite-demo/blob/master/src/main/java/com/github/bpark/IgniteSink.java
下面的代码用于点燃下沉流:
private void sinkToCache(DataStream<Map<String,String>> dsRow){
IgniteSink<Map<String,String>> igniteSink = new IgniteSink<>();
//igniteSink.start();
igniteSink.setAllowOverwrite(true);
igniteSink.setAutoFlushFrequency(10);
dsRow.addSink(igniteSink);
}
这里是家属:
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-indexing</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-flink</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spring</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-log4j</artifactId>
<version>2.9.1</version>
</dependency>
不幸的是,当我看到服务器的日志时,没有建立到服务器的连接。我很确定端口没有问题,防火墙也没有问题,因为我可以远程登录服务器并在日志中查看从我的电脑到服务器的已建立 Tcp 连接的日志。
知道我的代码有什么问题吗?
我终于可以通过更改 sinkToCache 中的代码来解决问题,使其以 clientMode 启动。
这是我的客户端节点配置:
TcpDiscoverySpi spi = new TcpDiscoverySpi();
spi.setAckTimeout(500);
spi.setSocketTimeout(2000);
TcpDiscoveryMulticastIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder();
List<String> address = new java.util.ArrayList<>
(Collections.singletonList("remoteIpAddress:47500..47509"));
ipFinder.setAddresses(address);
//spi.setIpFinder(ipFinder);
spi.setLocalPort(47500);
TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
commSpi.setLocalPort(47100);
commSpi.setSlowClientQueueLimit(3000);
IgniteConfiguration cfg = new IgniteConfiguration();
// Override default discovery SPI.
cfg.setDiscoverySpi(spi);
cfg.setFailureHandler(new StopNodeFailureHandler());
cfg.setCommunicationSpi(commSpi);
cfg.setClientMode(true);
Ignition.start(cfg);
我在我的应用程序中使用 flink 作为流处理器,我想将结果转储到位于远程服务器中的 apache ignite 中。在 ignite.xml 的 TcpDiscoverySpi 部分,我指定了 ignite 服务器的 ip 地址;但是,没有连接到服务器。 这是我的 ignite.xml:
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<value>remoteIpAddress:47500</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
我使用下面的 link 作为 igniteSink class: https://github.com/bpark/flink-ignite-demo/blob/master/src/main/java/com/github/bpark/IgniteSink.java
下面的代码用于点燃下沉流:
private void sinkToCache(DataStream<Map<String,String>> dsRow){
IgniteSink<Map<String,String>> igniteSink = new IgniteSink<>();
//igniteSink.start();
igniteSink.setAllowOverwrite(true);
igniteSink.setAutoFlushFrequency(10);
dsRow.addSink(igniteSink);
}
这里是家属:
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-indexing</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-flink</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spring</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-log4j</artifactId>
<version>2.9.1</version>
</dependency>
不幸的是,当我看到服务器的日志时,没有建立到服务器的连接。我很确定端口没有问题,防火墙也没有问题,因为我可以远程登录服务器并在日志中查看从我的电脑到服务器的已建立 Tcp 连接的日志。 知道我的代码有什么问题吗?
我终于可以通过更改 sinkToCache 中的代码来解决问题,使其以 clientMode 启动。
这是我的客户端节点配置:
TcpDiscoverySpi spi = new TcpDiscoverySpi();
spi.setAckTimeout(500);
spi.setSocketTimeout(2000);
TcpDiscoveryMulticastIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder();
List<String> address = new java.util.ArrayList<>
(Collections.singletonList("remoteIpAddress:47500..47509"));
ipFinder.setAddresses(address);
//spi.setIpFinder(ipFinder);
spi.setLocalPort(47500);
TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
commSpi.setLocalPort(47100);
commSpi.setSlowClientQueueLimit(3000);
IgniteConfiguration cfg = new IgniteConfiguration();
// Override default discovery SPI.
cfg.setDiscoverySpi(spi);
cfg.setFailureHandler(new StopNodeFailureHandler());
cfg.setCommunicationSpi(commSpi);
cfg.setClientMode(true);
Ignition.start(cfg);